Author: slebresne
Date: Mon Dec 19 09:18:36 2011
New Revision: 1220662
URL: http://svn.apache.org/viewvc?rev=1220662&view=rev
Log:
Use separate writer thread in SSTableSimpleUnsortedWriter
patch by slebresne; reviewed by yukim for CASSANDRA-3619
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1220662&r1=1220661&r2=1220662&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Dec 19 09:18:36 2011
@@ -25,6 +25,7 @@
(CASSANDRA-3538)
* Improve memtable slice iteration performance (CASSANDRA-3545)
* more efficient allocation of small bloom filters (CASSANDRA-3618)
+ * Use separate writer thread in SSTableSimpleUnsortedWriter (CASSANDRA-3619)
1.0.7
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java?rev=1220662&r1=1220661&r2=1220662&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
Mon Dec 19 09:18:36 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.locks.Condition;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
@@ -31,6 +34,7 @@ import org.apache.cassandra.service.Stor
import org.apache.cassandra.utils.HeapAllocator;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SimpleCondition;
/**
* A SSTable writer that doesn't assume rows are in sorted order.
@@ -43,10 +47,15 @@ import org.apache.cassandra.utils.ByteBu
*/
public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
{
- private final Map<DecoratedKey, ColumnFamily> keys = new
TreeMap<DecoratedKey, ColumnFamily>();
+ private static final Buffer SENTINEL = new Buffer();
+
+ private Buffer buffer = new Buffer();
private final long bufferSize;
private long currentSize;
+ private final BlockingQueue<Buffer> writeQueue = new
SynchronousQueue<Buffer>();
+ private final DiskWriter diskWriter = new DiskWriter();
+
/**
* Create a new buffering writer.
* @param directory the directory where to write the sstables
@@ -67,6 +76,7 @@ public class SSTableSimpleUnsortedWriter
{
super(directory, new CFMetaData(keyspace, columnFamily, subComparator
== null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator,
subComparator));
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
+ this.diskWriter.start();
}
protected void writeRow(DecoratedKey key, ColumnFamily columnFamily)
throws IOException
@@ -79,12 +89,12 @@ public class SSTableSimpleUnsortedWriter
protected ColumnFamily getColumnFamily()
{
- ColumnFamily previous = keys.get(currentKey);
+ ColumnFamily previous = buffer.get(currentKey);
// If the CF already exist in memory, we'll just continue adding to it
if (previous == null)
{
previous = ColumnFamily.create(metadata,
TreeMapBackedSortedColumns.factory());
- keys.put(currentKey, previous);
+ buffer.put(currentKey, previous);
}
else
{
@@ -98,20 +108,77 @@ public class SSTableSimpleUnsortedWriter
public void close() throws IOException
{
sync();
+ try
+ {
+ writeQueue.put(SENTINEL);
+ diskWriter.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ checkForWriterException();
}
private void sync() throws IOException
{
- if (keys.isEmpty())
+ if (buffer.isEmpty())
return;
- SSTableWriter writer = getWriter();
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : keys.entrySet())
+ checkForWriterException();
+
+ try
{
- writer.append(entry.getKey(), entry.getValue());
+ writeQueue.put(buffer);
}
- writer.closeAndOpenReader();
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ buffer = new Buffer();
currentSize = 0;
- keys.clear();
+ }
+
+ private void checkForWriterException() throws IOException
+ {
+ // slightly lame way to report exception from the writer, but that
should be good enough
+ if (diskWriter.exception != null)
+ {
+ if (diskWriter.exception instanceof IOException)
+ throw (IOException) diskWriter.exception;
+ else
+ throw new RuntimeException(diskWriter.exception);
+ }
+ }
+
+ // typedef
+ private static class Buffer extends TreeMap<DecoratedKey, ColumnFamily> {}
+
+ private class DiskWriter extends Thread
+ {
+ volatile Exception exception = null;
+
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ Buffer b = writeQueue.take();
+ if (b == SENTINEL)
+ return;
+
+ SSTableWriter writer = getWriter();
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry :
b.entrySet())
+ writer.append(entry.getKey(), entry.getValue());
+ writer.closeAndOpenReader();
+ }
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+ }
}
}