Update CQLSSTableWriter to allow parallel writing of SSTables on the same table 
within the same JVM

Patch by Carl Yeksigian, reviewed by Benjamin Lerer for CASSANDRA-7463


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f13ce558
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f13ce558
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f13ce558

Branch: refs/heads/cassandra-2.1
Commit: f13ce558cc410f959634a6f0d31fcf7bd69be85d
Parents: 748b01d
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Wed Oct 29 13:57:25 2014 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Wed Oct 29 13:57:25 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../io/sstable/AbstractSSTableSimpleWriter.java | 12 ++-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 38 +++++---
 .../io/sstable/CQLSSTableWriterTest.java        | 93 ++++++++++++++++++++
 4 files changed, 131 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d2cb003..9051b34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@
  * Pig: Remove errant LIMIT clause in CqlNativeStorage (CASSANDRA-8166)
  * Throw ConfigurationException when hsha is used with the default
    rpc_max_threads setting of 'unlimited' (CASSANDRA-8116)
+ * Allow concurrent writing of the same table in the same JVM using
+   CQLSSTableWriter (CASSANDRA-7463)
 
 
 2.0.11:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 2c6f82a..af1c43c 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -43,6 +44,7 @@ public abstract class AbstractSSTableSimpleWriter implements 
Closeable
     protected ColumnFamily columnFamily;
     protected ByteBuffer currentSuperColumn;
     protected final CounterId counterid = CounterId.generate();
+    protected static AtomicInteger generation = new AtomicInteger(0);
 
     public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner)
     {
@@ -80,9 +82,15 @@ public abstract class AbstractSSTableSimpleWriter implements 
Closeable
                 return false;
             }
         });
-        int maxGen = 0;
+        int maxGen = generation.getAndIncrement();
         for (Descriptor desc : existing)
-            maxGen = Math.max(maxGen, desc.generation);
+        {
+            while (desc.generation > maxGen)
+            {
+                maxGen = generation.getAndIncrement();
+            }
+        }
+
         return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, 
true).filenameFor(Component.DATA);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 49a1259..93d3dcf 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.cql3.*;
@@ -335,18 +336,31 @@ public class CQLSSTableWriter implements Closeable
         {
             try
             {
-                this.schema = getStatement(schema, CreateTableStatement.class, 
"CREATE TABLE").left.getCFMetaData().rebuild();
-
-                // We need to register the keyspace/table metadata through 
Schema, otherwise we won't be able to properly
-                // build the insert statement in using().
-                KSMetaData ksm = KSMetaData.newKeyspace(this.schema.ksName,
-                                                        
AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
-                                                        
ImmutableMap.of("replication_factor", "1"),
-                                                        true,
-                                                        
Collections.singleton(this.schema));
-
-                Schema.instance.load(ksm);
-                return this;
+                synchronized (CQLSSTableWriter.class)
+                {
+                    this.schema = getStatement(schema, 
CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild();
+
+                    // We need to register the keyspace/table metadata through 
Schema, otherwise we won't be able to properly
+                    // build the insert statement in using().
+                    KSMetaData ksm = 
Schema.instance.getKSMetaData(this.schema.ksName);
+                    if (ksm == null)
+                    {
+                        ksm = KSMetaData.newKeyspace(this.schema.ksName,
+                                
AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+                                ImmutableMap.of("replication_factor", "1"),
+                                true,
+                                Collections.singleton(this.schema));
+                        Schema.instance.load(ksm);
+                    }
+                    else if (Schema.instance.getCFMetaData(this.schema.ksName, 
this.schema.cfName) == null)
+                    {
+                        Schema.instance.load(this.schema);
+                        ksm = KSMetaData.cloneWith(ksm, 
Iterables.concat(ksm.cfMetaData().values(), 
Collections.singleton(this.schema)));
+                        Schema.instance.setKeyspaceDefinition(ksm);
+                        Keyspace.open(ksm.name).initCf(this.schema.cfId, 
this.schema.cfName, false);
+                    }
+                    return this;
+                }
             }
             catch (RequestValidationException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index de814e1..0f123a4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -158,4 +158,97 @@ public class CQLSSTableWriterTest
         };
         assert tempdir.list(filterDataFiles).length > 1 : 
Arrays.toString(tempdir.list(filterDataFiles));
     }
+
+
+    private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
+    private class WriterThread extends Thread
+    {
+        private final File dataDir;
+        private final int id;
+        public volatile Exception exception;
+
+        public WriterThread(File dataDir, int id)
+        {
+            this.dataDir = dataDir;
+            this.id = id;
+        }
+
+        @Override
+        public void run()
+        {
+            String schema = "CREATE TABLE cql_keyspace.table2 ("
+                    + "  k int,"
+                    + "  v int,"
+                    + "  PRIMARY KEY (k, v)"
+                    + ")";
+            String insert = "INSERT INTO cql_keyspace.table2 (k, v) VALUES (?, 
?)";
+            CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                    .inDirectory(dataDir)
+                    .forTable(schema)
+                    .withPartitioner(StorageService.instance.getPartitioner())
+                    .using(insert).build();
+
+            try
+            {
+                for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
+                {
+                    writer.addRow(id, i);
+                }
+                writer.close();
+            }
+            catch (Exception e)
+            {
+                exception = e;
+            }
+        }
+    }
+
+    @Test
+    public void testConcurrentWriters() throws Exception
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table2";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        WriterThread[] threads = new WriterThread[5];
+        for (int i = 0; i < threads.length; i++)
+        {
+            WriterThread thread = new WriterThread(dataDir, i);
+            threads[i] = thread;
+            thread.start();
+        }
+
+        for (WriterThread thread : threads)
+        {
+            thread.join();
+            assert !thread.isAlive() : "Thread should be dead by now";
+            if (thread.exception != null)
+            {
+                throw thread.exception;
+            }
+        }
+
+        SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
+        {
+            public void init(String keyspace)
+            {
+                for (Range<Token> range : 
StorageService.instance.getLocalRanges("cql_keyspace"))
+                    addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
+                setPartitioner(StorageService.getPartitioner());
+            }
+
+            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            {
+                return Schema.instance.getCFMetaData(keyspace, cfName);
+            }
+        }, new OutputHandler.SystemOutput(false, false));
+
+        loader.stream().get();
+
+        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM 
cql_keyspace.table2;");
+        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
+    }
 }

Reply via email to