Author: jbellis
Date: Tue Jul 28 02:45:55 2009
New Revision: 798371
URL: http://svn.apache.org/viewvc?rev=798371&view=rev
Log:
custom CommitLogExecutorService that can fsync per multiple CL additions
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=798371&r1=798370&r2=798371&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Tue Jul 28 02:45:55 2009
@@ -182,7 +182,7 @@
{
if ( !recoveryMode )
{
- executor = new DebuggableThreadPoolExecutor("COMMITLOG-POOL");
+ executor = new CommitLogExecutorService();
setNextFileName();
logWriter_ = CommitLog.createWriter(logFile_);
writeCommitLogHeader();
@@ -372,36 +372,7 @@
*/
CommitLogContext add(final Row row) throws IOException
{
- Callable<CommitLogContext> task = new Callable<CommitLogContext>()
- {
- public CommitLogContext call() throws Exception
- {
- long currentPosition = -1L;
- DataOutputBuffer cfBuffer = new DataOutputBuffer();
- try
- {
- /* serialize the row */
- Row.serializer().serialize(row, cfBuffer);
- currentPosition = logWriter_.getCurrentPosition();
- CommitLogContext cLogCtx = new CommitLogContext(logFile_,
currentPosition);
- /* Update the header */
- maybeUpdateHeader(row);
- logWriter_.writeLong(cfBuffer.getLength());
- logWriter_.append(cfBuffer);
- if (!maybeRollLog())
- {
- logWriter_.sync();
- }
- return cLogCtx;
- }
- catch (IOException e)
- {
- if ( currentPosition != -1 )
- logWriter_.seek(currentPosition);
- throw e;
- }
- }
- };
+ Callable<CommitLogContext> task = new LogRecordAdder(row);
try
{
@@ -552,4 +523,44 @@
}
return false;
}
-}
\ No newline at end of file
+
+ void sync() throws IOException
+ {
+ logWriter_.sync();
+ }
+
+ class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
+ {
+ Row row;
+
+ LogRecordAdder(Row row)
+ {
+ this.row = row;
+ }
+
+ public CommitLog.CommitLogContext call() throws Exception
+ {
+ long currentPosition = -1L;
+ DataOutputBuffer cfBuffer = new DataOutputBuffer();
+ try
+ {
+ /* serialize the row */
+ Row.serializer().serialize(row, cfBuffer);
+ currentPosition = logWriter_.getCurrentPosition();
+ CommitLogContext cLogCtx = new CommitLogContext(logFile_,
currentPosition);
+ /* Update the header */
+ maybeUpdateHeader(row);
+ logWriter_.writeLong(cfBuffer.getLength());
+ logWriter_.append(cfBuffer);
+ maybeRollLog();
+ return cLogCtx;
+ }
+ catch (IOException e)
+ {
+ if ( currentPosition != -1 )
+ logWriter_.seek(currentPosition);
+ throw e;
+ }
+ }
+ }
+}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=798371&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
Tue Jul 28 02:45:55 2009
@@ -0,0 +1,153 @@
+package org.apache.cassandra.db;
+
+import java.util.concurrent.*;
+import java.util.List;
+import java.util.Queue;
+import java.util.ArrayList;
+import java.io.IOException;
+
+public class CommitLogExecutorService extends AbstractExecutorService
+{
+ Queue<CheaterFutureTask> queue;
+
+ public CommitLogExecutorService()
+ {
+ queue = new ConcurrentLinkedQueue<CheaterFutureTask>();
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ while (true)
+ {
+ process();
+ }
+ }
+ };
+ new Thread(runnable).start();
+ }
+
+ private ArrayList<CheaterFutureTask> incompleteTasks = new
ArrayList<CheaterFutureTask>();
+ private ArrayList taskValues = new ArrayList(); // TODO not sure how to
generify this
+ void process()
+ {
+ while (queue.isEmpty())
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // attempt to do a bunch of LogRecordAdder ops before syncing
+ incompleteTasks.clear();
+ taskValues.clear();
+ while (!queue.isEmpty()
+ && queue.peek().getRawCallable() instanceof
CommitLog.LogRecordAdder
+ && incompleteTasks.size() < 20)
+ {
+ CheaterFutureTask task = queue.remove();
+ incompleteTasks.add(task);
+ try
+ {
+ taskValues.add(task.getRawCallable().call());
+ }
+ catch (Exception e)
+ {
+ // it doesn't seem worth bothering future-izing the exception
+ // since if a commitlog op throws, we're probably screwed
anyway
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (incompleteTasks.size() == 0)
+ {
+ // no LRAs; just run the task
+ queue.remove().run();
+ }
+ else
+ {
+ // now sync and set the tasks' values (which allows thread calling
get() to proceed)
+ try
+ {
+ CommitLog.open().sync();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ for (int i = 0; i < incompleteTasks.size(); i++)
+ {
+ incompleteTasks.get(i).set(taskValues.get(i));
+ }
+ }
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
+ {
+ return newTaskFor(Executors.callable(runnable, value));
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
+ {
+ return new CheaterFutureTask(callable);
+ }
+
+ public void execute(Runnable command)
+ {
+ queue.add((CheaterFutureTask)command);
+ }
+
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ public boolean isTerminated()
+ {
+ return false;
+ }
+
+ // cassandra is crash-only so there's no need to implement the shutdown
methods
+ public void shutdown()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+}
+
+class CheaterFutureTask<V> extends FutureTask<V>
+{
+ private Callable rawCallable;
+
+ public CheaterFutureTask(Callable<V> callable)
+ {
+ super(callable);
+ rawCallable = callable;
+ }
+
+ public Callable getRawCallable()
+ {
+ return rawCallable;
+ }
+
+ @Override
+ public void set(V v)
+ {
+ super.set(v);
+ }
+}