compaction stress tool

patch by tjake; reviewed by Marcus Eriksson for CASSANDRA-11844


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47d3b7e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47d3b7e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47d3b7e7

Branch: refs/heads/trunk
Commit: 47d3b7e7a013b485a2906fc7f0f2fc90e1143966
Parents: d40ac78
Author: T Jake Luciani <j...@apache.org>
Authored: Fri May 20 11:23:49 2016 -0400
Committer: T Jake Luciani <j...@apache.org>
Committed: Mon Aug 1 11:06:52 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  53 +--
 .../org/apache/cassandra/db/Directories.java    |  19 +
 src/java/org/apache/cassandra/db/Keyspace.java  |  24 ++
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../db/lifecycle/LifecycleTransaction.java      |  10 +-
 .../cassandra/dht/ByteOrderedPartitioner.java   |   9 +-
 .../org/apache/cassandra/dht/IPartitioner.java  |   8 +
 .../apache/cassandra/dht/LocalPartitioner.java  |   6 +
 .../dht/OrderPreservingPartitioner.java         |  15 +-
 .../apache/cassandra/dht/RandomPartitioner.java |   8 +
 .../hadoop/cql3/CqlBulkRecordWriter.java        |   4 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  40 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 147 ++++++--
 .../cassandra/io/sstable/SSTableLoader.java     |  16 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |  12 +-
 .../io/sstable/SSTableSimpleWriter.java         |   8 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  22 ++
 .../tools/nodetool/CompactionStats.java         |   8 +-
 .../apache/cassandra/utils/GuidGenerator.java   |   9 +-
 .../cassandra/streaming/LongStreamingTest.java  |   6 +-
 test/unit/org/apache/cassandra/MockSchema.java  |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   4 +-
 .../apache/cassandra/dht/LengthPartitioner.java |   8 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |  15 +-
 .../io/sstable/CQLSSTableWriterTest.java        |  34 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |  38 +-
 tools/bin/compaction-stress                     |  57 +++
 .../cassandra/stress/CompactionStress.java      | 365 +++++++++++++++++++
 .../apache/cassandra/stress/StressProfile.java  | 162 ++++++--
 .../stress/generate/PartitionGenerator.java     |  13 +-
 .../operations/userdefined/SchemaInsert.java    |  77 +++-
 .../operations/userdefined/SchemaQuery.java     |   4 +-
 .../operations/userdefined/SchemaStatement.java |  16 +-
 35 files changed, 1026 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 760cc58..f759b7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Create compaction-stress tool (CASSANDRA-11844)
  * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
  * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE 
statements (CASSANDRA-7190)
  * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 85f2767..069c93b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,8 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - A new compaction-stress tool has been added to test the throughput of 
compaction
+     for any cassandra-stress user schema.  see compaction-stress help for how 
to use.
    - Compaction can now take into account overlapping tables that don't take 
part
      in the compaction to look for deleted or overwritten data in the 
compacted tables.
      Then such data is found, it can be safely discarded, which in turn should 
enable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 20dac1e..53f5305 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -381,17 +381,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
-    private ColumnFamilyStore(Keyspace keyspace,
-                             String columnFamilyName,
-                             int generation,
-                             CFMetaData metadata,
-                             Directories directories,
-                             boolean loadSSTables)
-    {
-        this(keyspace, columnFamilyName, generation, metadata, directories, 
loadSSTables, true);
-    }
-
-
     @VisibleForTesting
     public ColumnFamilyStore(Keyspace keyspace,
                               String columnFamilyName,
@@ -399,7 +388,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                               CFMetaData metadata,
                               Directories directories,
                               boolean loadSSTables,
-                              boolean registerBookkeeping)
+                              boolean registerBookeeping,
+                              boolean offline)
     {
         assert directories != null;
         assert metadata != null : "null metadata for " + keyspace + ":" + 
columnFamilyName;
@@ -428,8 +418,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             data.addInitialSSTables(sstables);
         }
 
+        /**
+         * When creating a CFS offline we change the default logic needed by 
CASSANDRA-8671
+         * and link the passed directories to be picked up by the compaction 
strategy
+         */
+        if (offline)
+            this.directories = directories;
+        else
+            this.directories = new Directories(metadata, 
Directories.dataDirectories);
+
+
         // compaction strategy should be created after the CFS has been 
prepared
         compactionStrategyManager = new CompactionStrategyManager(this);
+
+        // Since compaction can re-define data dir we need to reinit 
directories
         this.directories = compactionStrategyManager.getDirectories();
 
         if (maxCompactionThreshold.value() <= 0 || 
minCompactionThreshold.value() <=0)
@@ -442,7 +444,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         for (IndexMetadata info : metadata.getIndexes())
             indexManager.addIndex(info);
 
-        if (registerBookkeeping)
+        if (registerBookeeping)
         {
             // register the mbean
             mbeanName = 
String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
@@ -496,10 +498,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public Directories getDirectories()
     {
-        // todo, hack since we need to know the data directories when 
constructing the compaction strategy
-        if (directories != null)
-            return directories;
-        return new Directories(metadata, initialDirectories);
+        return directories;
     }
 
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleTransaction txn)
@@ -588,8 +587,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                                                                          
CFMetaData metadata,
                                                                          
boolean loadSSTables)
     {
-        // get the max generation number, to prevent generation conflicts
         Directories directories = new Directories(metadata, 
initialDirectories);
+        return createColumnFamilyStore(keyspace, columnFamily, metadata, 
directories, loadSSTables, true, false);
+    }
+
+    /** This is only directly used by offline tools */
+    public static synchronized ColumnFamilyStore 
createColumnFamilyStore(Keyspace keyspace,
+                                                                         
String columnFamily,
+                                                                         
CFMetaData metadata,
+                                                                         
Directories directories,
+                                                                         
boolean loadSSTables,
+                                                                         
boolean registerBookkeeping,
+                                                                         
boolean offline)
+    {
+        // get the max generation number, to prevent generation conflicts
         Directories.SSTableLister lister = 
directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
         List<Integer> generations = new ArrayList<Integer>();
         for (Map.Entry<Descriptor, Set<Component>> entry : 
lister.list().entrySet())
@@ -598,19 +609,19 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             generations.add(desc.generation);
             if (!desc.isCompatible())
                 throw new RuntimeException(String.format("Incompatible SSTable 
found. Current version %s is unable to read file: %s. Please run 
upgradesstables.",
-                        desc.getFormat().getLatestVersion(), desc));
+                                                         
desc.getFormat().getLatestVersion(), desc));
         }
         Collections.sort(generations);
         int value = (generations.size() > 0) ? 
(generations.get(generations.size() - 1)) : 0;
 
-        return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, 
directories, loadSSTables);
+        return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, 
directories, loadSSTables, registerBookkeeping, offline);
     }
 
     /**
      * Removes unnecessary files from the cf directory at startup: these 
include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
      */
-    public static void scrubDataDirectories(CFMetaData metadata) throws 
StartupException
+    public static void  scrubDataDirectories(CFMetaData metadata) throws 
StartupException
     {
         Directories directories = new Directories(metadata, 
initialDirectories);
         Set<File> cleanedDirectories = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 62fd890..3533dbc 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -180,6 +180,12 @@ public class Directories
     {
         this(metadata, dataDirectories);
     }
+
+    public Directories(final CFMetaData metadata, Collection<DataDirectory> 
paths)
+    {
+        this(metadata, paths.toArray(new DataDirectory[paths.size()]));
+    }
+
     /**
      * Create Directories of given ColumnFamily.
      * SSTable directories are created under data_directories defined in 
cassandra.yaml if not exist at this time.
@@ -920,6 +926,19 @@ public class Directories
         return result;
     }
 
+    /**
+     * @return Raw size on disk for all directories
+     */
+    public long getRawDiretoriesSize()
+    {
+        long totalAllocatedSize = 0L;
+
+        for (File path : dataPaths)
+            totalAllocatedSize += FileUtils.folderSize(path);
+
+        return totalAllocatedSize;
+    }
+
     public long getTrueAllocatedSizeIn(File input)
     {
         if (!input.isDirectory())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 37ed1e1..0d78245 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -371,6 +371,30 @@ public class Keyspace
     }
 
     /**
+     * Registers a custom cf instance with this keyspace.
+     * This is required for offline tools what use non-standard directories.
+     */
+    public void initCfCustom(ColumnFamilyStore newCfs)
+    {
+        ColumnFamilyStore cfs = columnFamilyStores.get(newCfs.metadata.cfId);
+
+        if (cfs == null)
+        {
+            // CFS being created for the first time, either on server startup 
or new CF being added.
+            // We don't worry about races here; startup is safe, and adding 
multiple idential CFs
+            // simultaneously is a "don't do that" scenario.
+            ColumnFamilyStore oldCfs = 
columnFamilyStores.putIfAbsent(newCfs.metadata.cfId, newCfs);
+            // CFS mbean instantiation will error out before we hit this, but 
in case that changes...
+            if (oldCfs != null)
+                throw new IllegalStateException("added multiple mappings for 
cf id " + newCfs.metadata.cfId);
+        }
+        else
+        {
+            throw new IllegalStateException("CFS is already initialized: " + 
cfs.name);
+        }
+    }
+
+    /**
      * adds a cf to internal structures, ends up creating disk files).
      */
     public void initCf(CFMetaData metadata, boolean loadSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 83592f0..6b5b8a4 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -126,7 +126,7 @@ public abstract class AbstractCompactionStrategy
             uncheckedTombstoneCompaction = 
DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION;
         }
 
-        directories = new Directories(cfs.metadata, 
Directories.dataDirectories);
+        directories = cfs.getDirectories();
     }
 
     public Directories getDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 2311143..b1eadc5 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -525,9 +526,14 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
         log.untrackNew(table);
     }
 
-    public static boolean removeUnfinishedLeftovers(CFMetaData metadata)
+    public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs)
     {
-        return LogTransaction.removeUnfinishedLeftovers(metadata);
+        return 
LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories());
+    }
+
+    public static boolean removeUnfinishedLeftovers(CFMetaData cfMetaData)
+    {
+        return LogTransaction.removeUnfinishedLeftovers(cfMetaData);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java 
b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index bbf6fd6..af8983d 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
 public class ByteOrderedPartitioner implements IPartitioner
 {
@@ -203,9 +204,13 @@ public class ByteOrderedPartitioner implements IPartitioner
 
     public BytesToken getRandomToken()
     {
-        Random r = new Random();
+       return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public BytesToken getRandomToken(Random random)
+    {
         byte[] buffer = new byte[16];
-        r.nextBytes(buffer);
+        random.nextBytes(buffer);
         return new BytesToken(buffer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java 
b/src/java/org/apache/cassandra/dht/IPartitioner.java
index b559a6f..eb4aafb 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -72,6 +73,13 @@ public interface IPartitioner
      */
     public Token getRandomToken();
 
+    /**
+     * @param random instance of Random to use when generating the token
+     *
+     * @return a randomly generated token
+     */
+    public Token getRandomToken(Random random);
+
     public Token.TokenFactory getTokenFactory();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java 
b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index f9421c5..9922eb0 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.CachedHashDecoratedKey;
@@ -64,6 +65,11 @@ public class LocalPartitioner implements IPartitioner
         throw new UnsupportedOperationException();
     }
 
+    public LocalToken getRandomToken(Random random)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public Token.TokenFactory getTokenFactory()
     {
         return tokenFactory;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java 
b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 96b4ca0..ab552c4 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -21,6 +21,7 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.DecoratedKey;
@@ -37,6 +38,8 @@ import org.apache.cassandra.utils.Pair;
 
 public class OrderPreservingPartitioner implements IPartitioner
 {
+    private static final String rndchars = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+
     public static final StringToken MINIMUM = new StringToken("");
 
     public static final BigInteger CHAR_MASK = new BigInteger("65535");
@@ -106,12 +109,14 @@ public class OrderPreservingPartitioner implements 
IPartitioner
 
     public StringToken getRandomToken()
     {
-        String chars = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
-        Random r = new Random();
+        return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public StringToken getRandomToken(Random random)
+    {
         StringBuilder buffer = new StringBuilder();
-        for (int j = 0; j < 16; j++) {
-            buffer.append(chars.charAt(r.nextInt(chars.length())));
-        }
+        for (int j = 0; j < 16; j++)
+            buffer.append(rndchars.charAt(random.nextInt(rndchars.length())));
         return new StringToken(buffer.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java 
b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 96a96ca..c2ec413 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -91,6 +91,14 @@ public class RandomPartitioner implements IPartitioner
         return new BigIntegerToken(token);
     }
 
+    public BigIntegerToken getRandomToken(Random random)
+    {
+        BigInteger token = 
FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes(random));
+        if ( token.signum() == -1 )
+            token = token.multiply(BigInteger.valueOf(-1L));
+        return new BigIntegerToken(token);
+    }
+
     private final Token.TokenFactory tokenFactory = new Token.TokenFactory() {
         public ByteBuffer toByteArray(Token token)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 2ed37ee..bd157e9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -75,7 +75,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, 
List<ByteBuffer>>
     protected final Configuration conf;
     protected final int maxFailures;
     protected final int bufferSize;
-    protected Closeable writer;
+    protected CQLSSTableWriter writer;
     protected SSTableLoader loader;
     protected Progressable progress;
     protected TaskAttemptContext context;
@@ -174,7 +174,7 @@ public class CqlBulkRecordWriter extends 
RecordWriter<Object, List<ByteBuffer>>
             ExternalClient externalClient = new ExternalClient(conf);
             externalClient.setTableMetadata(CFMetaData.compile(schema, 
keyspace));
 
-            loader = new SSTableLoader(outputDir, externalClient, new 
NullOutputHandler())
+            loader = new SSTableLoader(writer.getInnermostDirectory(), 
externalClient, new NullOutputHandler())
             {
                 @Override
                 public void onSuccess(StreamState finalState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0213fd5..10b4caa 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -22,16 +22,17 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
@@ -41,16 +42,17 @@ import org.apache.cassandra.utils.Pair;
  */
 abstract class AbstractSSTableSimpleWriter implements Closeable
 {
-    protected final File directory;
-    protected final CFMetaData metadata;
+    protected final ColumnFamilyStore cfs;
+    protected final IPartitioner partitioner;
     protected final PartitionColumns columns;
     protected SSTableFormat.Type formatType = 
DatabaseDescriptor.getSSTableFormat();
     protected static AtomicInteger generation = new AtomicInteger(0);
+    protected boolean makeRangeAware = false;
 
-    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, 
PartitionColumns columns)
+    protected AbstractSSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner 
partitioner,  PartitionColumns columns)
     {
-        this.metadata = metadata;
-        this.directory = directory;
+        this.cfs = cfs;
+        this.partitioner = partitioner;
         this.columns = columns;
     }
 
@@ -59,15 +61,25 @@ abstract class AbstractSSTableSimpleWriter implements 
Closeable
         this.formatType = type;
     }
 
+    protected void setRangeAwareWriting(boolean makeRangeAware)
+    {
+        this.makeRangeAware = makeRangeAware;
+    }
+
+
     protected SSTableTxnWriter createWriter()
     {
-        return SSTableTxnWriter.create(metadata,
-                                       createDescriptor(directory, 
metadata.ksName, metadata.cfName, formatType),
+        SerializationHeader header = new SerializationHeader(true, 
cfs.metadata, columns, EncodingStats.NO_STATS);
+
+        if (makeRangeAware)
+            return SSTableTxnWriter.createRangeAware(cfs, 0,  
ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
+
+        return SSTableTxnWriter.create(cfs,
+                                       
createDescriptor(cfs.getDirectories().getDirectoryForNewSSTables(), 
cfs.metadata.ksName, cfs.metadata.cfName, formatType),
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
-                                       new SerializationHeader(true, metadata, 
columns, EncodingStats.NO_STATS),
-                                       Collections.emptySet());
+                                       header);
     }
 
     private static Descriptor createDescriptor(File directory, final String 
keyspace, final String columnFamily, final SSTableFormat.Type fmt)
@@ -107,7 +119,7 @@ abstract class AbstractSSTableSimpleWriter implements 
Closeable
 
     PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
     {
-        return getUpdateFor(metadata.decorateKey(key));
+        return getUpdateFor(partitioner.decorateKey(key));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 76c0e19..a6805df 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -21,16 +21,12 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.TypeCodec;
+import com.sun.org.apache.xpath.internal.operations.Bool;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -44,8 +40,7 @@ import 
org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -54,10 +49,12 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -333,14 +330,30 @@ public class CQLSSTableWriter implements Closeable
         return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED);
     }
     /**
+     * The writer loads data in directories corresponding to how they laid out 
on the server.
+     * <p>
+     * {keyspace}/{table-cfid}/
+     *
+     * This method can be used to fetch the innermost directory with the 
sstable components
+     * @return The directory containing the sstable components
+     */
+    public File getInnermostDirectory()
+    {
+        return writer.cfs.getDirectories().getDirectoryForNewSSTables();
+    }
+
+    /**
      * A Builder for a CQLSSTableWriter object.
      */
     public static class Builder
     {
-        private File directory;
+        private final List<File> directoryList;
+        private ColumnFamilyStore cfs;
 
         protected SSTableFormat.Type formatType = null;
 
+        private Boolean makeRangeAware = false;
+
         private CreateTableStatement.RawStatement schemaStatement;
         private final List<CreateTypeStatement> typeStatements;
         private UpdateStatement.ParsedInsert insertStatement;
@@ -349,8 +362,10 @@ public class CQLSSTableWriter implements Closeable
         private boolean sorted = false;
         private long bufferSizeInMB = 128;
 
-        protected Builder() {
+        protected Builder()
+        {
             this.typeStatements = new ArrayList<>();
+            this.directoryList = new ArrayList<>();
         }
 
         /**
@@ -373,7 +388,7 @@ public class CQLSSTableWriter implements Closeable
          * <p>
          * This is a mandatory option.
          *
-         * @param directory the directory to use, which should exists and be 
writable.
+         * @param directory the directory to use, which should exist and be 
writable.
          * @return this builder.
          *
          * @throws IllegalArgumentException if {@code directory} doesn't exist 
or is not writable.
@@ -385,10 +400,29 @@ public class CQLSSTableWriter implements Closeable
             if (!directory.canWrite())
                 throw new IllegalArgumentException(directory + " exists but is 
not writable");
 
-            this.directory = directory;
+            directoryList.add(directory);
             return this;
         }
 
+        /**
+         * A pre-instanciated ColumnFamilyStore
+         * <p>
+         * This is can be used in place of inDirectory and forTable
+         *
+         * @see #inDirectory(File)
+         *
+         * @param cfs the list of directories to use, which should exist and 
be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if a directory doesn't exist or is 
not writable.
+         */
+        public Builder withCfs(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+            return this;
+        }
+
+
         public Builder withType(String typeDefinition) throws SyntaxException
         {
             typeStatements.add(parseStatement(typeDefinition, 
CreateTypeStatement.class, "CREATE TYPE"));
@@ -431,6 +465,20 @@ public class CQLSSTableWriter implements Closeable
             return this;
         }
 
+
+        /**
+         * Specify if the sstable writer should be vnode range aware.
+         * This will create a sstable per vnode range.
+         *
+         * @param makeRangeAware
+         * @return
+         */
+        public Builder rangeAware(boolean makeRangeAware)
+        {
+            this.makeRangeAware = makeRangeAware;
+            return this;
+        }
+
         /**
          * The INSERT statement defining the order of the values to add for a 
given CQL row.
          * <p>
@@ -499,36 +547,36 @@ public class CQLSSTableWriter implements Closeable
         @SuppressWarnings("resource")
         public CQLSSTableWriter build()
         {
-            if (directory == null)
-                throw new IllegalStateException("No ouptut directory 
specified, you should provide a directory with inDirectory()");
-            if (schemaStatement == null)
+            if (directoryList.isEmpty() && cfs == null)
+                throw new IllegalStateException("No output directories 
specified, you should provide a directory with inDirectory()");
+            if (schemaStatement == null && cfs == null)
                 throw new IllegalStateException("Missing schema, you should 
provide the schema for the SSTable to create with forTable()");
             if (insertStatement == null)
                 throw new IllegalStateException("No insert statement 
specified, you should provide an insert statement through using()");
 
             synchronized (CQLSSTableWriter.class)
             {
-                String keyspace = schemaStatement.keyspace();
+                if (cfs == null)
+                    cfs = createOfflineTable(schemaStatement, typeStatements, 
directoryList);
 
-                if (Schema.instance.getKSMetaData(keyspace) == null)
-                    Schema.instance.load(KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1)));
+                if (partitioner == null)
+                    partitioner = cfs.getPartitioner();
 
-                createTypes(keyspace);
-                CFMetaData cfMetaData = createTable(keyspace);
                 Pair<UpdateStatement, List<ColumnSpecification>> 
preparedInsert = prepareInsert();
-
                 AbstractSSTableSimpleWriter writer = sorted
-                                                     ? new 
SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns())
-                                                     : new 
SSTableSimpleUnsortedWriter(directory, cfMetaData, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
+                                                     ? new 
SSTableSimpleWriter(cfs, partitioner, preparedInsert.left.updatedColumns())
+                                                     : new 
SSTableSimpleUnsortedWriter(cfs, partitioner, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
 
                 if (formatType != null)
                     writer.setSSTableFormatType(formatType);
 
+                writer.setRangeAwareWriting(makeRangeAware);
+
                 return new CQLSSTableWriter(writer, preparedInsert.left, 
preparedInsert.right);
             }
         }
 
-        private void createTypes(String keyspace)
+        private static void createTypes(String keyspace, 
List<CreateTypeStatement> typeStatements)
         {
             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
             Types.RawBuilder builder = Types.rawBuilder(keyspace);
@@ -538,31 +586,50 @@ public class CQLSSTableWriter implements Closeable
             ksm = ksm.withSwapped(builder.build());
             Schema.instance.setKeyspaceMetadata(ksm);
         }
+
+        public static ColumnFamilyStore createOfflineTable(String schema, 
List<File> directoryList)
+        {
+            return createOfflineTable(parseStatement(schema, 
CreateTableStatement.RawStatement.class, "CREATE TABLE"), 
Collections.EMPTY_LIST, directoryList);
+        }
+
         /**
          * Creates the table according to schema statement
-         *
-         * @param keyspace name of the keyspace where table should be created
+         * with specified data directories
          */
-        private CFMetaData createTable(String keyspace)
+        public static ColumnFamilyStore 
createOfflineTable(CreateTableStatement.RawStatement schemaStatement, 
List<CreateTypeStatement> typeStatements, List<File> directoryList)
         {
+            String keyspace = schemaStatement.keyspace();
+
+            if (Schema.instance.getKSMetaData(keyspace) == null)
+                Schema.instance.load(KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1)));
+
+            createTypes(keyspace, typeStatements);
+
             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
 
             CFMetaData cfMetaData = 
ksm.tables.getNullable(schemaStatement.columnFamily());
-            if (cfMetaData == null)
-            {
-                CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(ksm.types).statement;
-                statement.validate(ClientState.forInternalCalls());
+            assert cfMetaData == null;
 
-                cfMetaData = statement.getCFMetaData();
+            CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(ksm.types).statement;
+            statement.validate(ClientState.forInternalCalls());
 
-                Schema.instance.load(cfMetaData);
-                
Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
-            }
+            //Build metatdata with a portable cfId
+            cfMetaData = statement.metadataBuilder()
+                                  
.withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily()))
+                                  .build()
+                                  .params(statement.params());
+
+            Keyspace.setInitialized();
+            Directories directories = new Directories(cfMetaData, 
directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList()));
+
+            Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
+            ColumnFamilyStore cfs =  
ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, 
directories, false, false, true);
+
+            ks.initCfCustom(cfs);
+            Schema.instance.load(cfs.metadata);
+            
Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata)));
 
-            if (partitioner != null)
-                return cfMetaData.copy(partitioner);
-            else
-                return cfMetaData;
+            return cfs;
         }
 
         /**
@@ -587,7 +654,7 @@ public class CQLSSTableWriter implements Closeable
         }
     }
 
-    private static <T extends ParsedStatement> T parseStatement(String query, 
Class<T> klass, String type)
+    public static <T extends ParsedStatement> T parseStatement(String query, 
Class<T> klass, String type)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 043f6fa..15dd925 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -56,13 +56,25 @@ public class SSTableLoader implements StreamEventHandler
 
     public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler)
     {
-        this(directory, client, outputHandler, 1);
+        this(directory, client, outputHandler, 
directory.getParentFile().getName(), 1);
     }
 
+
     public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler, int connectionsPerHost)
     {
+        this(directory, client, outputHandler, 
directory.getParentFile().getName(), connectionsPerHost);
+    }
+
+    public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler, String keyspace)
+    {
+        this(directory, client, outputHandler, keyspace, 1);
+    }
+
+
+    public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler, String keyspace, int connectionsPerHost)
+    {
         this.directory = directory;
-        this.keyspace = directory.getParentFile().getName();
+        this.keyspace = keyspace;
         this.client = client;
         this.outputHandler = outputHandler;
         this.connectionsPerHost = connectionsPerHost;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index fa88817..2563f26 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -34,6 +35,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -60,11 +62,11 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
     private final BlockingQueue<Buffer> writeQueue = new 
SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, 
PartitionColumns columns, long bufferSizeInMB)
+    SSTableSimpleUnsortedWriter(ColumnFamilyStore cfs, IPartitioner 
partitioner, PartitionColumns columns, long bufferSizeInMB)
     {
-        super(directory, metadata, columns);
+        super(cfs, partitioner, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.header = new SerializationHeader(true, metadata, columns, 
EncodingStats.NO_STATS);
+        this.header = new SerializationHeader(true, cfs.metadata, columns, 
EncodingStats.NO_STATS);
         diskWriter.start();
     }
 
@@ -110,7 +112,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
 
     private PartitionUpdate createPartitionUpdate(DecoratedKey key)
     {
-        return new PartitionUpdate(metadata, key, columns, 4)
+        return new PartitionUpdate(cfs.metadata, key, columns, 4)
         {
             @Override
             public void add(Row row)
@@ -204,7 +206,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
                     if (b == SENTINEL)
                         return;
 
-                        try (SSTableTxnWriter writer = createWriter())
+                    try (SSTableTxnWriter writer = createWriter())
                     {
                         for (Map.Entry<DecoratedKey, PartitionUpdate> entry : 
b.entrySet())
                             
writer.append(entry.getValue().unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 7fbd79d..2f6dd33 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -19,12 +19,14 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.IPartitioner;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -43,9 +45,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 
     private SSTableTxnWriter writer;
 
-    protected SSTableSimpleWriter(File directory, CFMetaData metadata, 
PartitionColumns columns)
+    protected SSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner 
partitioner, PartitionColumns columns)
     {
-        super(directory, metadata, columns);
+        super(cfs, partitioner, columns);
     }
 
     private SSTableTxnWriter getOrCreateWriter()
@@ -67,7 +69,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
             if (update != null)
                 writePartition(update);
             currentKey = key;
-            update = new PartitionUpdate(metadata, currentKey, columns, 4);
+            update = new PartitionUpdate(cfs.metadata, currentKey, columns, 4);
         }
 
         assert update != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 09f6a55..5ffde15 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.io.sstable;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -27,6 +28,8 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -101,6 +104,25 @@ public class SSTableTxnWriter extends 
Transactional.AbstractTransactional implem
         return new SSTableTxnWriter(txn, writer);
     }
 
+
+    public static SSTableTxnWriter createRangeAware(ColumnFamilyStore cfs, 
long keyCount, long repairedAt, SSTableFormat.Type type, int sstableLevel, 
SerializationHeader header)
+    {
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
+        SSTableMultiWriter writer;
+        try
+        {
+            writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, 
type, sstableLevel, 0, txn, header);
+        }
+        catch (IOException e)
+        {
+            //We don't know the total size so this should never happen
+            //as we send in 0
+            throw new RuntimeException(e);
+        }
+
+        return new SSTableTxnWriter(txn, writer);
+    }
+
     @SuppressWarnings("resource") // log and writer closed during postCleanup
     public static SSTableTxnWriter create(CFMetaData cfm,
                                           Descriptor descriptor,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java 
b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index 69fcbab..411dc23 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -67,12 +67,15 @@ public class CompactionStats extends NodeToolCmd
             }
         }
         System.out.println();
+        reportCompactionTable(cm.getCompactions(), 
probe.getCompactionThroughput(), humanReadable);
+    }
+
+    public static void reportCompactionTable(List<Map<String,String>> 
compactions, int compactionThroughput, boolean humanReadable)
+    {
         long remainingBytes = 0;
         TableBuilder table = new TableBuilder();
-        List<Map<String, String>> compactions = cm.getCompactions();
         if (!compactions.isEmpty())
         {
-            int compactionThroughput = probe.getCompactionThroughput();
             table.add("id", "compaction type", "keyspace", "table", 
"completed", "total", "unit", "progress");
             for (Map<String, String> c : compactions)
             {
@@ -101,4 +104,5 @@ public class CompactionStats extends NodeToolCmd
             System.out.printf("%25s%10s%n", "Active compaction remaining time 
: ", remainingTime);
         }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/src/java/org/apache/cassandra/utils/GuidGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/GuidGenerator.java 
b/src/java/org/apache/cassandra/utils/GuidGenerator.java
index 33f7330..1e523ea 100644
--- a/src/java/org/apache/cassandra/utils/GuidGenerator.java
+++ b/src/java/org/apache/cassandra/utils/GuidGenerator.java
@@ -69,12 +69,12 @@ public class GuidGenerator {
         return convertToStandardFormat( sb.toString() );
     }
 
-    public static ByteBuffer guidAsBytes()
+    public static ByteBuffer guidAsBytes(Random random)
     {
         StringBuilder sbValueBeforeMD5 = new StringBuilder();
         long time = System.currentTimeMillis();
         long rand = 0;
-        rand = myRand.nextLong();
+        rand = random.nextLong();
         sbValueBeforeMD5.append(s_id)
                         .append(":")
                         .append(Long.toString(time))
@@ -85,6 +85,11 @@ public class GuidGenerator {
         return 
ByteBuffer.wrap(FBUtilities.threadLocalMD5Digest().digest(valueBeforeMD5.getBytes()));
     }
 
+    public static ByteBuffer guidAsBytes()
+    {
+        return guidAsBytes(myRand);
+    }
+
     /*
         * Convert to the standard format for GUID
         * Example: C2FEEEAC-CFCD-11D1-8B05-00600806D9B6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java 
b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 7e53ba2..300be11 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -95,7 +95,7 @@ public class LongStreamingTest
         writer.close();
         System.err.println(String.format("Writer finished after %d 
seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start)));
 
-        File[] dataFiles = dataDir.listFiles((dir, name) -> 
name.endsWith("-Data.db"));
+        File[] dataFiles = writer.getInnermostDirectory().listFiles((dir, 
name) -> name.endsWith("-Data.db"));
         long dataSize = 0l;
         for (File file : dataFiles)
         {
@@ -103,7 +103,7 @@ public class LongStreamingTest
             dataSize += file.length();
         }
 
-        SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
+        SSTableLoader loader = new 
SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
         {
             private String ks;
             public void init(String keyspace)
@@ -130,7 +130,7 @@ public class LongStreamingTest
 
 
         //Stream again
-        loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+        loader = new SSTableLoader(writer.getInnermostDirectory(), new 
SSTableLoader.Client()
         {
             private String ks;
             public void init(String keyspace)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java 
b/test/unit/org/apache/cassandra/MockSchema.java
index 7f9de55..804bccb 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -137,7 +137,7 @@ public class MockSchema
     {
         String cfname = "mockcf" + (id.incrementAndGet());
         CFMetaData metadata = newCFMetaData(ksname, cfname);
-        return new ColumnFamilyStore(ks, cfname, 0, metadata, new 
Directories(metadata), false, false);
+        return new ColumnFamilyStore(ks, cfname, 0, metadata, new 
Directories(metadata), false, false, false);
     }
 
     public static CFMetaData newCFMetaData(String ksname, String cfname)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 595610e..515ce18 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -131,12 +131,10 @@ public class RealTransactionsTest extends SchemaLoader
     {
         cfs.truncateBlocking();
 
-        String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, 
val ascii, val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, 
?, ?)";
 
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                                                       
.inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
-                                                       
.forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
+                                                       .withCfs(cfs)
                                                        
.using(String.format(query, cfs.keyspace.getName(), cfs.name))
                                                        .build())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java 
b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index e2202fe..87ba741 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
@@ -69,7 +70,12 @@ public class LengthPartitioner implements IPartitioner
 
     public BigIntegerToken getRandomToken()
     {
-        return new BigIntegerToken(BigInteger.valueOf(new 
Random().nextInt(15)));
+        return getRandomToken(ThreadLocalRandom.current());
+    }
+
+    public BigIntegerToken getRandomToken(Random random)
+    {
+        return new BigIntegerToken(BigInteger.valueOf(random.nextInt(15)));
     }
 
     private final Token.TokenFactory tokenFactory = new Token.TokenFactory() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index 6df2d65..1cc303c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -22,6 +22,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 
 import com.google.common.io.Files;
+import org.apache.commons.lang.ArrayUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -30,6 +31,7 @@ import org.junit.Test;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tools.Util;
 
 import static org.junit.Assert.assertEquals;
 
@@ -41,6 +43,7 @@ public class CQLSSTableWriterClientTest
     public void setUp()
     {
         this.testDirectory = Files.createTempDir();
+        Util.initDatabaseDescriptor();
         Config.setClientMode(true);
     }
 
@@ -86,16 +89,10 @@ public class CQLSSTableWriterClientTest
         writer.close();
         writer2.close();
 
-        FilenameFilter filter = new FilenameFilter()
-        {
-            @Override
-            public boolean accept(File dir, String name)
-            {
-                return name.endsWith("-Data.db");
-            }
-        };
+        FilenameFilter filter = (dir, name) -> name.endsWith("-Data.db");
 
-        File[] dataFiles = this.testDirectory.listFiles(filter);
+        File[] dataFiles = 
(File[])ArrayUtils.addAll(writer2.getInnermostDirectory().listFiles(filter),
+                                                     
writer.getInnermostDirectory().listFiles(filter));
         assertEquals(2, dataFiles.length);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index caa92f6..877ca11 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.UDHelper;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
@@ -96,7 +97,7 @@ public class CQLSSTableWriterTest
 
             writer.close();
 
-            loadSSTables(dataDir, KS);
+            loadSSTables(writer.getInnermostDirectory(), KS);
 
             UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM cql_keyspace.table1;");
             assertEquals(4, rs.size());
@@ -186,7 +187,7 @@ public class CQLSSTableWriterTest
                 return name.endsWith("-Data.db");
             }
         };
-        assert dataDir.list(filterDataFiles).length > 1 : 
Arrays.toString(dataDir.list(filterDataFiles));
+        assert writer.getInnermostDirectory().list(filterDataFiles).length > 1 
: Arrays.toString(writer.getInnermostDirectory().list(filterDataFiles));
     }
 
 
@@ -220,28 +221,22 @@ public class CQLSSTableWriterTest
     private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
     private class WriterThread extends Thread
     {
-        private final File dataDir;
         private final int id;
+        private final ColumnFamilyStore cfs;
         public volatile Exception exception;
 
-        public WriterThread(File dataDir, int id)
+        public WriterThread(ColumnFamilyStore cfs, int id)
         {
-            this.dataDir = dataDir;
+            this.cfs = cfs;
             this.id = id;
         }
 
         @Override
         public void run()
         {
-            String schema = "CREATE TABLE cql_keyspace2.table2 ("
-                    + "  k int,"
-                    + "  v int,"
-                    + "  PRIMARY KEY (k, v)"
-                    + ")";
             String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES 
(?, ?)";
             CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                    .inDirectory(dataDir)
-                    .forTable(schema)
+                    .withCfs(cfs)
                     .using(insert).build();
 
             try
@@ -269,10 +264,17 @@ public class CQLSSTableWriterTest
         File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
KS + File.separator + TABLE);
         assert dataDir.mkdirs();
 
+        String schema = "CREATE TABLE cql_keyspace2.table2 ("
+                        + "  k int,"
+                        + "  v int,"
+                        + "  PRIMARY KEY (k, v)"
+                        + ")";
+        ColumnFamilyStore cfs = 
CQLSSTableWriter.Builder.createOfflineTable(schema, 
Collections.singletonList(dataDir));
+
         WriterThread[] threads = new WriterThread[5];
         for (int i = 0; i < threads.length; i++)
         {
-            WriterThread thread = new WriterThread(dataDir, i);
+            WriterThread thread = new WriterThread(cfs, i);
             threads[i] = thread;
             thread.start();
         }
@@ -287,7 +289,7 @@ public class CQLSSTableWriterTest
             }
         }
 
-        loadSSTables(dataDir, KS);
+        loadSSTables(cfs.getDirectories().getDirectoryForNewSSTables(), KS);
 
         UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM 
cql_keyspace2.table2;");
         assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
@@ -339,7 +341,7 @@ public class CQLSSTableWriterTest
         }
 
         writer.close();
-        loadSSTables(dataDir, KS);
+        loadSSTables(writer.getInnermostDirectory(), KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + KS + "." + TABLE);
         TypeCodec collectionCodec = 
UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
@@ -410,7 +412,7 @@ public class CQLSSTableWriterTest
         }
 
         writer.close();
-        loadSSTables(dataDir, KS);
+        loadSSTables(writer.getInnermostDirectory(), KS);
 
         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + KS + "." + TABLE);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 72c7467..ba7571f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -33,6 +34,8 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -122,6 +125,8 @@ public class SSTableLoaderTest
         String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, 
val1 ascii, PRIMARY KEY (key, name))";
         String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
 
+
+        File outputDir;
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                        .inDirectory(dataDir)
                                                        
.forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
@@ -129,22 +134,22 @@ public class SSTableLoaderTest
                                                        .build())
         {
             writer.addRow("key1", "col1", "100");
+            outputDir = writer.getInnermostDirectory();
         }
 
-        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
-        cfs.forceBlockingFlush(); // wait for sstables to be on disk else we 
won't be able to stream them
-
         final CountDownLatch latch = new CountDownLatch(1);
-        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), 
new OutputHandler.SystemOutput(false, false));
+        SSTableLoader loader = new SSTableLoader(outputDir, new TestClient(), 
new OutputHandler.SystemOutput(false, false));
         loader.stream(Collections.emptySet(), 
completionStreamListener(latch)).get();
 
-        List<FilteredPartition> partitions = 
Util.getAll(Util.cmd(cfs).build());
+        UntypedResultSet rs = 
QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s;", KEYSPACE1, 
CF_STANDARD1));
 
-        assertEquals(1, partitions.size());
-        assertEquals("key1", 
AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
-        assertEquals(ByteBufferUtil.bytes("100"), 
partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
-                                                                   
.getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
-                                                                   .value());
+        assertEquals(1, rs.size());
+
+        Iterator<UntypedResultSet.Row> iter = rs.iterator();
+        UntypedResultSet.Row row;
+
+        row = iter.next();
+        assertEquals("key1", row.getString("key"));
 
         // The stream future is signalled when the work is complete but before 
releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).
@@ -160,8 +165,9 @@ public class SSTableLoaderTest
         //make sure we have no tables...
         assertTrue(dataDir.listFiles().length == 0);
 
-        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, 
val1 ascii, PRIMARY KEY (key, name))";
-        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+        //Since this is running in the same jvm we need to put it in a tmp 
keyspace
+        String schema = "CREATE TABLE \"%stmp\".\"%s\" (key ascii, name ascii, 
val ascii, val1 ascii, PRIMARY KEY (key, name)) with compression = {}";
+        String query = "INSERT INTO \"%stmp\".\"%s\" (key, name, val) VALUES 
(?, ?, ?)";
 
         CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                   .inDirectory(dataDir)
@@ -170,7 +176,7 @@ public class SSTableLoaderTest
                                                   .withBufferSizeInMB(1)
                                                   .build();
 
-        int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one 
completed sstable before we've closed the writer
+        int NB_PARTITIONS = 4200; // Enough to write >1MB and get at least one 
completed sstable before we've closed the writer
 
         for (int i = 0; i < NB_PARTITIONS; i++)
         {
@@ -182,11 +188,11 @@ public class SSTableLoaderTest
         cfs.forceBlockingFlush(); // wait for sstables to be on disk else we 
won't be able to stream them
 
         //make sure we have some tables...
-        assertTrue(dataDir.listFiles().length > 0);
+        assertTrue(writer.getInnermostDirectory().listFiles().length > 0);
 
         final CountDownLatch latch = new CountDownLatch(2);
         //writer is still open so loader should not load anything
-        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), 
new OutputHandler.SystemOutput(false, false));
+        SSTableLoader loader = new 
SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new 
OutputHandler.SystemOutput(false, false), KEYSPACE1);
         loader.stream(Collections.emptySet(), 
completionStreamListener(latch)).get();
 
         List<FilteredPartition> partitions = 
Util.getAll(Util.cmd(cfs).build());
@@ -196,7 +202,7 @@ public class SSTableLoaderTest
         // now we complete the write and the second loader should load the 
last sstable as well
         writer.close();
 
-        loader = new SSTableLoader(dataDir, new TestClient(), new 
OutputHandler.SystemOutput(false, false));
+        loader = new SSTableLoader(writer.getInnermostDirectory(), new 
TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
         loader.stream(Collections.emptySet(), 
completionStreamListener(latch)).get();
 
         partitions = 
Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/bin/compaction-stress
----------------------------------------------------------------------
diff --git a/tools/bin/compaction-stress b/tools/bin/compaction-stress
new file mode 100755
index 0000000..f169f2f
--- /dev/null
+++ b/tools/bin/compaction-stress
@@ -0,0 +1,57 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    # Locations (in order) to use when searching for an include file.
+    for include in "`dirname "$0"`/cassandra.in.sh" \
+                   "$HOME/.cassandra.in.sh" \
+                   /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+"$JAVA" -server -ea -cp "$CLASSPATH" $JVM_OPTS \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+         org.apache.cassandra.stress.CompactionStress $@
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47d3b7e7/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java 
b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
new file mode 100644
index 0000000..664f8d2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress;
+
+import java.io.File;
+import java.io.IOError;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import io.airlift.command.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.tools.nodetool.CompactionStats;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+/**
+ * Tool that allows fast route to loading data for arbitrary schemas to disk
+ * and compacting them.
+ */
+public abstract class CompactionStress implements Runnable
+{
+    @Inject
+    public HelpOption helpOption;
+
+    @Option(name = { "-p", "--profile" }, description = "Path to stress yaml 
file", required = true)
+    String profile;
+
+    @Option(name = { "-d", "--datadir" }, description = "Data directory (can 
be used many times to specify multiple data dirs)", required = true)
+    List<String> dataDirs;
+
+    @Option(name = {"-v", "--vnodes"}, description = "number of local tokens 
to generate (default 256)")
+    Integer numTokens = 256;
+
+    List<File> getDataDirectories()
+    {
+        List<File> dataDirectories = new ArrayList<>(dataDirs.size());
+        for (String dataDir : dataDirs)
+        {
+            File outputDir = new File(dataDir);
+
+            if (!outputDir.exists())
+            {
+                System.err.println("Invalid output dir (missing): " + 
outputDir);
+                System.exit(1);
+            }
+
+            if (!outputDir.isDirectory())
+            {
+                System.err.println("Invalid output dir (not a directory): " + 
outputDir);
+                System.exit(2);
+            }
+
+            if (!outputDir.canWrite())
+            {
+                System.err.println("Invalid output dir (no write permissions): 
" + outputDir);
+                System.exit(3);
+            }
+
+            dataDirectories.add(outputDir);
+        }
+
+        return dataDirectories;
+    }
+
+    ColumnFamilyStore initCf(StressProfile stressProfile, boolean loadSSTables)
+    {
+        Util.initDatabaseDescriptor();
+
+        generateTokens(stressProfile.seedStr, 
StorageService.instance.getTokenMetadata(), numTokens);
+
+        CreateTableStatement.RawStatement createStatement = 
stressProfile.getCreateStatement();
+        List<File> dataDirectories = getDataDirectories();
+
+        ColumnFamilyStore cfs = 
CQLSSTableWriter.Builder.createOfflineTable(createStatement, 
Collections.EMPTY_LIST, dataDirectories);
+
+        if (loadSSTables)
+        {
+            Directories.SSTableLister lister = 
cfs.getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+            List<SSTableReader> sstables = new ArrayList<>();
+
+            //Offline open sstables
+            for (Map.Entry<Descriptor, Set<Component>> entry : 
lister.list().entrySet())
+            {
+                Set<Component> components = entry.getValue();
+                if (!components.contains(Component.DATA))
+                    continue;
+
+                try
+                {
+                    SSTableReader sstable = 
SSTableReader.openNoValidation(entry.getKey(), components, cfs);
+                    sstables.add(sstable);
+                }
+                catch (Exception e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    System.err.println(String.format("Error Loading %s: %s", 
entry.getKey(), e.getMessage()));
+                }
+            }
+
+            cfs.disableAutoCompaction();
+
+            //Register with cfs
+            cfs.addSSTables(sstables);
+        }
+
+        return cfs;
+    }
+
+    StressProfile getStressProfile()
+    {
+        try
+        {
+            File yamlFile = new File(profile);
+            return StressProfile.load(yamlFile.exists() ? yamlFile.toURI() : 
URI.create(profile));
+        }
+        catch ( IOError e)
+        {
+            e.printStackTrace();
+            System.err.print("Invalid profile URI : " + profile);
+            System.exit(4);
+        }
+
+        return null;
+    }
+
+    /**
+     * Populate tokenMetadata consistently across runs.
+     *
+     * We need consistency to write and compact the same data offline
+     * in the case of a range aware sstable writer.
+     */
+    private void generateTokens(String seed, TokenMetadata tokenMetadata, 
Integer numTokens)
+    {
+        Random random = new Random(seed.hashCode());
+
+        IPartitioner p = tokenMetadata.partitioner;
+        tokenMetadata.clearUnsafe();
+        for (int i = 1; i <= numTokens; i++)
+        {
+            InetAddress addr = FBUtilities.getBroadcastAddress();
+            List<Token> tokens = Lists.newArrayListWithCapacity(numTokens);
+            for (int j = 0; j < numTokens; ++j)
+                tokens.add(p.getRandomToken(random));
+
+            tokenMetadata.updateNormalTokens(tokens, addr);
+        }
+    }
+
+    public abstract void run();
+
+
+    @Command(name = "compact", description = "Compact data in directory")
+    public static class Compaction extends CompactionStress
+    {
+
+        @Option(name = {"-m", "--maximal"}, description = "Force maximal 
compaction (default true)")
+        Boolean maximal = false;
+
+        @Option(name = {"-t", "--threads"}, description = "Number of compactor 
threads to use for bg compactions (default 4)")
+        Integer threads = 4;
+
+        public void run()
+        {
+            //Setup
+            SystemKeyspace.finishStartup(); //needed for early-open
+            CompactionManager.instance.setMaximumCompactorThreads(threads);
+            CompactionManager.instance.setCoreCompactorThreads(threads);
+            CompactionManager.instance.setRate(0);
+
+            StressProfile stressProfile = getStressProfile();
+            ColumnFamilyStore cfs = initCf(stressProfile, true);
+            cfs.getCompactionStrategyManager().compactionLogger.enable();
+
+            List<Future<?>> futures = new ArrayList<>(threads);
+            if (maximal)
+            {
+                futures = CompactionManager.instance.submitMaximal(cfs, 
FBUtilities.nowInSeconds(), false);
+            }
+            else
+            {
+                cfs.enableAutoCompaction();
+                cfs.getCompactionStrategyManager().enable();
+                for (int i = 0; i < threads; i++)
+                    
futures.addAll(CompactionManager.instance.submitBackground(cfs));
+            }
+
+            long working;
+            //Report compaction stats while working
+            while ((working = futures.stream().filter(f -> 
!f.isDone()).count()) > 0 || CompactionManager.instance.getActiveCompactions() 
> 0 || (!maximal && 
cfs.getCompactionStrategyManager().getEstimatedRemainingTasks() > 0))
+            {
+                //Re-up any bg jobs
+                if (!maximal)
+                {
+                    for (long i = working; i < threads; i++)
+                        
futures.addAll(CompactionManager.instance.submitBackground(cfs));
+                }
+
+                reportCompactionStats();
+                Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+            }
+
+            System.out.println("Finished! Shutting down...");
+            CompactionManager.instance.forceShutdown();
+
+            //Wait for cleanup to finish before forcing
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            LifecycleTransaction.removeUnfinishedLeftovers(cfs);
+        }
+    }
+
+    void reportCompactionStats()
+    {
+        System.out.println("========");
+        System.out.println(String.format("Pending compactions: %d\n", 
CompactionManager.instance.getPendingTasks()));
+        
CompactionStats.reportCompactionTable(CompactionManager.instance.getCompactions(),
 0, true);
+    }
+
+
+    @Command(name = "write", description = "write data directly to disk")
+    public static class DataWriter extends CompactionStress
+    {
+        private static double BYTES_IN_GB = 1024 * 1014 * 1024;
+
+        @Option(name = { "-g", "--gbsize"}, description = "Total GB size on 
disk you wish to write", required = true)
+        Integer totalSizeGb;
+
+        @Option(name = { "-t", "--threads" }, description = "Number of sstable 
writer threads (default 2)")
+        Integer threads = 2;
+
+        @Option(name = { "-c", "--partition-count"}, description = "Number of 
partitions to loop over (default 1000000)")
+        Integer partitions = 1000000;
+
+        @Option(name = { "-b", "--buffer-size-mb"}, description = "Buffer in 
MB writes before writing new sstable (default 128)")
+        Integer bufferSize = 128;
+
+        @Option(name = { "-r", "--range-aware"}, description = "Splits the 
local ranges in number of data directories and makes sure we never write the 
same token in two different directories (default true)")
+        Boolean makeRangeAware = true;
+
+        public void run()
+        {
+            StressProfile stressProfile = getStressProfile();
+            ColumnFamilyStore cfs = initCf(stressProfile, false);
+            Directories directories = cfs.getDirectories();
+
+            StressSettings settings = StressSettings.parse(new String[]{ 
"write", "-pop seq=1.." + partitions });
+            SeedManager seedManager = new SeedManager(settings);
+            PartitionGenerator generator = stressProfile.getOfflineGenerator();
+            WorkManager workManager = new 
WorkManager.FixedWorkManager(Long.MAX_VALUE);
+
+            ExecutorService executorService = 
Executors.newFixedThreadPool(threads);
+            CountDownLatch finished = new CountDownLatch(threads);
+
+            for (int i = 0; i < threads; i++)
+            {
+                //Every thread needs it's own writer
+                final SchemaInsert insert = 
stressProfile.getOfflineInsert(null, generator, seedManager, settings);
+                final CQLSSTableWriter tableWriter = insert.createWriter(cfs, 
bufferSize, makeRangeAware);
+                executorService.submit(() -> {
+                    try
+                    {
+                        insert.runOffline(tableWriter, workManager);
+                    }
+                    catch (Exception e)
+                    {
+                        e.printStackTrace();
+                    }
+                    finally
+                    {
+                        FileUtils.closeQuietly(tableWriter);
+                        finished.countDown();
+                    }
+                });
+            }
+
+            double currentSizeGB;
+            while ((currentSizeGB = directories.getRawDiretoriesSize() / 
BYTES_IN_GB) < totalSizeGb)
+            {
+                if (finished.getCount() == 0)
+                    break;
+
+                System.out.println(String.format("Written %.2fGB of %dGB", 
currentSizeGB, totalSizeGb));
+
+                Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+            }
+
+            workManager.stop();
+            Uninterruptibles.awaitUninterruptibly(finished);
+
+            currentSizeGB = directories.getRawDiretoriesSize() / BYTES_IN_GB;
+            System.out.println(String.format("Finished writing %.2fGB", 
currentSizeGB));
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        Cli.CliBuilder<Runnable> builder = 
Cli.<Runnable>builder("compaction-stress")
+                                           .withDescription("benchmark for 
compaction")
+                                           .withDefaultCommand(Help.class)
+                                           .withCommands(Help.class, 
DataWriter.class, Compaction.class);
+
+        Cli<Runnable> stress = builder.build();
+
+        try
+        {
+            stress.parse(args).run();
+        }
+        catch (Throwable t)
+        {
+            t.printStackTrace();
+            System.exit(6);
+        }
+
+        System.exit(0);
+    }
+}
+
+

Reply via email to