Author: jbellis
Date: Tue Jul 28 02:45:55 2009
New Revision: 798371

URL: http://svn.apache.org/viewvc?rev=798371&view=rev
Log:
custom CommitLogExecutorService that can fsync per multiple CL additions
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182

Added:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=798371&r1=798370&r2=798371&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java 
Tue Jul 28 02:45:55 2009
@@ -182,7 +182,7 @@
     {
         if ( !recoveryMode )
         {
-            executor = new DebuggableThreadPoolExecutor("COMMITLOG-POOL");
+            executor = new CommitLogExecutorService();
             setNextFileName();            
             logWriter_ = CommitLog.createWriter(logFile_);
             writeCommitLogHeader();
@@ -372,36 +372,7 @@
     */
     CommitLogContext add(final Row row) throws IOException
     {
-        Callable<CommitLogContext> task = new Callable<CommitLogContext>()
-        {
-            public CommitLogContext call() throws Exception
-            {
-                long currentPosition = -1L;
-                DataOutputBuffer cfBuffer = new DataOutputBuffer();
-                try
-                {
-                    /* serialize the row */
-                    Row.serializer().serialize(row, cfBuffer);
-                    currentPosition = logWriter_.getCurrentPosition();
-                    CommitLogContext cLogCtx = new CommitLogContext(logFile_, 
currentPosition);
-                    /* Update the header */
-                    maybeUpdateHeader(row);
-                    logWriter_.writeLong(cfBuffer.getLength());
-                    logWriter_.append(cfBuffer);
-                    if (!maybeRollLog())
-                    {
-                        logWriter_.sync();
-                    }
-                    return cLogCtx;
-                }
-                catch (IOException e)
-                {
-                    if ( currentPosition != -1 )
-                        logWriter_.seek(currentPosition);
-                    throw e;
-                }
-            }
-        };
+        Callable<CommitLogContext> task = new LogRecordAdder(row);
 
         try
         {
@@ -552,4 +523,44 @@
         }
         return false;
     }
-}
\ No newline at end of file
+
+    void sync() throws IOException
+    {
+        logWriter_.sync();
+    }
+
+    class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
+    {
+        Row row;
+
+        LogRecordAdder(Row row)
+        {
+            this.row = row;
+        }
+
+        public CommitLog.CommitLogContext call() throws Exception
+        {
+            long currentPosition = -1L;
+            DataOutputBuffer cfBuffer = new DataOutputBuffer();
+            try
+            {
+                /* serialize the row */
+                Row.serializer().serialize(row, cfBuffer);
+                currentPosition = logWriter_.getCurrentPosition();
+                CommitLogContext cLogCtx = new CommitLogContext(logFile_, 
currentPosition);
+                /* Update the header */
+                maybeUpdateHeader(row);
+                logWriter_.writeLong(cfBuffer.getLength());
+                logWriter_.append(cfBuffer);
+                maybeRollLog();
+                return cLogCtx;
+            }
+            catch (IOException e)
+            {
+                if ( currentPosition != -1 )
+                    logWriter_.seek(currentPosition);
+                throw e;
+            }
+        }
+    }
+}

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=798371&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
 Tue Jul 28 02:45:55 2009
@@ -0,0 +1,153 @@
+package org.apache.cassandra.db;
+
+import java.util.concurrent.*;
+import java.util.List;
+import java.util.Queue;
+import java.util.ArrayList;
+import java.io.IOException;
+
+public class CommitLogExecutorService extends AbstractExecutorService
+{
+    Queue<CheaterFutureTask> queue;
+
+    public CommitLogExecutorService()
+    {
+        queue = new ConcurrentLinkedQueue<CheaterFutureTask>();
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    process();
+                }
+            }
+        };
+        new Thread(runnable).start();
+    }
+
+    private ArrayList<CheaterFutureTask> incompleteTasks = new 
ArrayList<CheaterFutureTask>();
+    private ArrayList taskValues = new ArrayList(); // TODO not sure how to 
generify this
+    void process()
+    {
+        while (queue.isEmpty())
+        {
+            try
+            {
+                Thread.sleep(1);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // attempt to do a bunch of LogRecordAdder ops before syncing
+        incompleteTasks.clear();
+        taskValues.clear();
+        while (!queue.isEmpty()
+               && queue.peek().getRawCallable() instanceof 
CommitLog.LogRecordAdder
+               && incompleteTasks.size() < 20)
+        {
+            CheaterFutureTask task = queue.remove();
+            incompleteTasks.add(task);
+            try
+            {
+                taskValues.add(task.getRawCallable().call());
+            }
+            catch (Exception e)
+            {
+                // it doesn't seem worth bothering future-izing the exception
+                // since if a commitlog op throws, we're probably screwed 
anyway
+                throw new RuntimeException(e);
+            }
+        }
+
+        if (incompleteTasks.size() == 0)
+        {
+            // no LRAs; just run the task
+            queue.remove().run();
+        }
+        else
+        {
+            // now sync and set the tasks' values (which allows thread calling 
get() to proceed)
+            try
+            {
+                CommitLog.open().sync();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            for (int i = 0; i < incompleteTasks.size(); i++)
+            {
+                incompleteTasks.get(i).set(taskValues.get(i));
+            }
+        }
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
+    {
+        return newTaskFor(Executors.callable(runnable, value));
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
+    {
+        return new CheaterFutureTask(callable);
+    }
+
+    public void execute(Runnable command)
+    {
+        queue.add((CheaterFutureTask)command);
+    }
+
+    public boolean isShutdown()
+    {
+        return false;
+    }
+
+    public boolean isTerminated()
+    {
+        return false;
+    }
+
+    // cassandra is crash-only so there's no need to implement the shutdown 
methods
+    public void shutdown()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public List<Runnable> shutdownNow()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+}
+
+class CheaterFutureTask<V> extends FutureTask<V>
+{
+    private Callable rawCallable;
+
+    public CheaterFutureTask(Callable<V> callable)
+    {
+        super(callable);
+        rawCallable = callable;
+    }
+
+    public Callable getRawCallable()
+    {
+        return rawCallable;
+    }
+
+    @Override
+    public void set(V v)
+    {
+        super.set(v);
+    }
+}


Reply via email to