Update CQLSSTableWriter to allow parallel writing of SSTables on the same table 
within the same JVM

Patch by Carl Yeksigian, reviewed by Benjamin Lerer for CASSANDRA-7463


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbb6b8ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbb6b8ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbb6b8ab

Branch: refs/heads/trunk
Commit: fbb6b8abfa359be3475ba55912d939096637b8f8
Parents: c4c9303
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Thu Oct 30 10:49:42 2014 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Thu Oct 30 10:49:42 2014 -0500

----------------------------------------------------------------------
 .../io/sstable/AbstractSSTableSimpleWriter.java | 12 ++-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 41 +++++----
 .../io/sstable/CQLSSTableWriterTest.java        | 93 ++++++++++++++++++++
 3 files changed, 127 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbb6b8ab/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 11d6d5e..2854308 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -45,6 +46,7 @@ public abstract class AbstractSSTableSimpleWriter implements 
Closeable
     protected ByteBuffer currentSuperColumn;
     protected final CounterId counterid = CounterId.generate();
     private SSTableFormat.Type formatType = 
DatabaseDescriptor.getSSTableFormat();
+    protected static AtomicInteger generation = new AtomicInteger(0);
 
 
     public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner)
@@ -83,9 +85,15 @@ public abstract class AbstractSSTableSimpleWriter implements 
Closeable
                 return false;
             }
         });
-        int maxGen = 0;
+        int maxGen = generation.getAndIncrement();
         for (Descriptor desc : existing)
-            maxGen = Math.max(maxGen, desc.generation);
+        {
+            while (desc.generation > maxGen)
+            {
+                maxGen = generation.getAndIncrement();
+            }
+        }
+
         return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, 
Descriptor.Type.TEMP, fmt).filenameFor(Component.DATA);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbb6b8ab/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index f7d467e..d4b4eab 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.cql3.*;
@@ -338,25 +339,31 @@ public class CQLSSTableWriter implements Closeable
         {
             try
             {
-                this.schema = getStatement(schema, CreateTableStatement.class, 
"CREATE TABLE").left.getCFMetaData().rebuild();
-
-                // We need to register the keyspace/table metadata through 
Schema, otherwise we won't be able to properly
-                // build the insert statement in using().
-                if (Schema.instance.getKSMetaData(this.schema.ksName) == null)
-                {
-                    KSMetaData ksm = KSMetaData.newKeyspace(this.schema.ksName,
-                                                            
AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
-                                                            
ImmutableMap.of("replication_factor", "1"),
-                                                            true,
-                                                            
Collections.singleton(this.schema));
-                    Schema.instance.load(ksm);
-                }
-                else if (Schema.instance.getCFMetaData(this.schema.ksName, 
this.schema.cfName) == null)
+                synchronized (CQLSSTableWriter.class)
                 {
-                    Schema.instance.load(this.schema);
-                }
+                    this.schema = getStatement(schema, 
CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild();
 
-                return this;
+                    // We need to register the keyspace/table metadata through 
Schema, otherwise we won't be able to properly
+                    // build the insert statement in using().
+                    KSMetaData ksm = 
Schema.instance.getKSMetaData(this.schema.ksName);
+                    if (ksm == null)
+                    {
+                        ksm = KSMetaData.newKeyspace(this.schema.ksName,
+                                
AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+                                ImmutableMap.of("replication_factor", "1"),
+                                true,
+                                Collections.singleton(this.schema));
+                        Schema.instance.load(ksm);
+                    }
+                    else if (Schema.instance.getCFMetaData(this.schema.ksName, 
this.schema.cfName) == null)
+                    {
+                        Schema.instance.load(this.schema);
+                        ksm = KSMetaData.cloneWith(ksm, 
Iterables.concat(ksm.cfMetaData().values(), 
Collections.singleton(this.schema)));
+                        Schema.instance.setKeyspaceDefinition(ksm);
+                        Keyspace.open(ksm.name).initCf(this.schema.cfId, 
this.schema.cfName, false);
+                    }
+                    return this;
+                }
             }
             catch (RequestValidationException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbb6b8ab/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 250545a..2a41794 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -163,4 +163,97 @@ public class CQLSSTableWriterTest
         };
         assert dataDir.list(filterDataFiles).length > 1 : 
Arrays.toString(dataDir.list(filterDataFiles));
     }
+
+
+    private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
+    private class WriterThread extends Thread
+    {
+        private final File dataDir;
+        private final int id;
+        public volatile Exception exception;
+
+        public WriterThread(File dataDir, int id)
+        {
+            this.dataDir = dataDir;
+            this.id = id;
+        }
+
+        @Override
+        public void run()
+        {
+            String schema = "CREATE TABLE cql_keyspace.table2 ("
+                    + "  k int,"
+                    + "  v int,"
+                    + "  PRIMARY KEY (k, v)"
+                    + ")";
+            String insert = "INSERT INTO cql_keyspace.table2 (k, v) VALUES (?, 
?)";
+            CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                    .inDirectory(dataDir)
+                    .forTable(schema)
+                    .withPartitioner(StorageService.instance.getPartitioner())
+                    .using(insert).build();
+
+            try
+            {
+                for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
+                {
+                    writer.addRow(id, i);
+                }
+                writer.close();
+            }
+            catch (Exception e)
+            {
+                exception = e;
+            }
+        }
+    }
+
+    @Test
+    public void testConcurrentWriters() throws Exception
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table2";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        WriterThread[] threads = new WriterThread[5];
+        for (int i = 0; i < threads.length; i++)
+        {
+            WriterThread thread = new WriterThread(dataDir, i);
+            threads[i] = thread;
+            thread.start();
+        }
+
+        for (WriterThread thread : threads)
+        {
+            thread.join();
+            assert !thread.isAlive() : "Thread should be dead by now";
+            if (thread.exception != null)
+            {
+                throw thread.exception;
+            }
+        }
+
+        SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
+        {
+            public void init(String keyspace)
+            {
+                for (Range<Token> range : 
StorageService.instance.getLocalRanges("cql_keyspace"))
+                    addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
+                setPartitioner(StorageService.getPartitioner());
+            }
+
+            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            {
+                return Schema.instance.getCFMetaData(keyspace, cfName);
+            }
+        }, new OutputHandler.SystemOutput(false, false));
+
+        loader.stream().get();
+
+        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM 
cql_keyspace.table2;");
+        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
+    }
 }

Reply via email to