Author: slebresne
Date: Mon Dec 19 09:18:36 2011
New Revision: 1220662

URL: http://svn.apache.org/viewvc?rev=1220662&view=rev
Log:
Use separate writer thread in SSTableSimpleUnsortedWriter
patch by slebresne; reviewed by yukim for CASSANDRA-3619

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1220662&r1=1220661&r2=1220662&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Dec 19 09:18:36 2011
@@ -25,6 +25,7 @@
    (CASSANDRA-3538)
  * Improve memtable slice iteration performance (CASSANDRA-3545)
  * more efficient allocation of small bloom filters (CASSANDRA-3618)
+ * Use separate writer thread in SSTableSimpleUnsortedWriter (CASSANDRA-3619)
 
 
 1.0.7

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java?rev=1220662&r1=1220661&r2=1220662&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
 Mon Dec 19 09:18:36 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.locks.Condition;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
@@ -31,6 +34,7 @@ import org.apache.cassandra.service.Stor
 import org.apache.cassandra.utils.HeapAllocator;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SimpleCondition;
 
 /**
  * A SSTable writer that doesn't assume rows are in sorted order.
@@ -43,10 +47,15 @@ import org.apache.cassandra.utils.ByteBu
  */
 public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 {
-    private final Map<DecoratedKey, ColumnFamily> keys = new 
TreeMap<DecoratedKey, ColumnFamily>();
+    private static final Buffer SENTINEL = new Buffer();
+
+    private Buffer buffer = new Buffer();
     private final long bufferSize;
     private long currentSize;
 
+    private final BlockingQueue<Buffer> writeQueue = new 
SynchronousQueue<Buffer>();
+    private final DiskWriter diskWriter = new DiskWriter();
+
     /**
      * Create a new buffering writer.
      * @param directory the directory where to write the sstables
@@ -67,6 +76,7 @@ public class SSTableSimpleUnsortedWriter
     {
         super(directory, new CFMetaData(keyspace, columnFamily, subComparator 
== null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, 
subComparator));
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
+        this.diskWriter.start();
     }
 
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) 
throws IOException
@@ -79,12 +89,12 @@ public class SSTableSimpleUnsortedWriter
 
     protected ColumnFamily getColumnFamily()
     {
-        ColumnFamily previous = keys.get(currentKey);
+        ColumnFamily previous = buffer.get(currentKey);
         // If the CF already exist in memory, we'll just continue adding to it
         if (previous == null)
         {
             previous = ColumnFamily.create(metadata, 
TreeMapBackedSortedColumns.factory());
-            keys.put(currentKey, previous);
+            buffer.put(currentKey, previous);
         }
         else
         {
@@ -98,20 +108,77 @@ public class SSTableSimpleUnsortedWriter
     public void close() throws IOException
     {
         sync();
+        try
+        {
+            writeQueue.put(SENTINEL);
+            diskWriter.join();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        checkForWriterException();
     }
 
     private void sync() throws IOException
     {
-        if (keys.isEmpty())
+        if (buffer.isEmpty())
             return;
 
-        SSTableWriter writer = getWriter();
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : keys.entrySet())
+        checkForWriterException();
+
+        try
         {
-            writer.append(entry.getKey(), entry.getValue());
+            writeQueue.put(buffer);
         }
-        writer.closeAndOpenReader();
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        buffer = new Buffer();
         currentSize = 0;
-        keys.clear();
+    }
+
+    private void checkForWriterException() throws IOException
+    {
+        // slightly lame way to report exception from the writer, but that 
should be good enough
+        if (diskWriter.exception != null)
+        {
+            if (diskWriter.exception instanceof IOException)
+                throw (IOException) diskWriter.exception;
+            else
+                throw new RuntimeException(diskWriter.exception);
+        }
+    }
+
+    // typedef
+    private static class Buffer extends TreeMap<DecoratedKey, ColumnFamily> {}
+
+    private class DiskWriter extends Thread
+    {
+        volatile Exception exception = null;
+
+        public void run()
+        {
+            try
+            {
+                while (true)
+                {
+                    Buffer b = writeQueue.take();
+                    if (b == SENTINEL)
+                        return;
+
+                    SSTableWriter writer = getWriter();
+                    for (Map.Entry<DecoratedKey, ColumnFamily> entry : 
b.entrySet())
+                        writer.append(entry.getKey(), entry.getValue());
+                    writer.closeAndOpenReader();
+                }
+            }
+            catch (Exception e)
+            {
+                exception = e;
+            }
+        }
     }
 }


Reply via email to