This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e81b2f54b4cb05ea25720a8a481ec951a20b809a
Merge: aa644c9dfa 016dd6ca37
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Mon Jan 22 12:44:59 2024 +0100

    Merge branch 'cassandra-5.0' into trunk

 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/config/Config.java   |   18 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   88 +-
 .../cassandra/db/ColumnFamilyStoreMBean.java       |   41 +-
 .../org/apache/cassandra/db/SSTableImporter.java   |   72 +-
 .../apache/cassandra/db/memtable/TrieMemtable.java |   19 +-
 .../db/streaming/CassandraStreamReceiver.java      |    2 +-
 src/java/org/apache/cassandra/index/Index.java     |   13 +-
 .../cassandra/index/SecondaryIndexManager.java     |    7 +-
 .../cassandra/index/sai/SSTableContextManager.java |    2 +-
 .../index/sai/StorageAttachedIndexBuilder.java     |    2 +-
 .../index/sai/StorageAttachedIndexGroup.java       |   12 +-
 .../index/sai/disk/format/IndexDescriptor.java     |   35 +-
 .../sai/disk/v1/segment/SegmentTrieBuffer.java     |    4 +-
 .../cassandra/index/sai/view/IndexViewManager.java |    2 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java    |   13 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java     |  171 ++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   12 +-
 .../apache/cassandra/tools/nodetool/Import.java    |   15 +-
 test/unit/org/apache/cassandra/db/ImportTest.java  |  222 +++-
 .../org/apache/cassandra/index/sai/SAITester.java  |    4 +-
 .../io/sstable/CQLSSTableWriterClientTest.java     |   78 +-
 .../sstable/CQLSSTableWriterConcurrencyTest.java   |    2 +-
 .../io/sstable/CQLSSTableWriterDaemonTest.java     |   44 +
 .../cassandra/io/sstable/CQLSSTableWriterTest.java | 1209 ++++++++++++--------
 25 files changed, 1406 insertions(+), 682 deletions(-)

diff --cc CHANGES.txt
index ebf48c2315,a7859ee9ec..290185e085
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,5 +1,14 @@@
 -5.0-beta2
 +5.1
 + * Limit cassandra startup to supported JDKs, allow higher JDKs by setting 
CASSANDRA_JDK_UNSUPPORTED (CASSANDRA-18688)
 + * Standardize nodetool tablestats formatting of data units (CASSANDRA-19104)
 + * Make nodetool tablestats use number of significant digits for time and 
average values consistently (CASSANDRA-19015)
 + * Upgrade jackson to 2.15.3 and snakeyaml to 2.1 (CASSANDRA-18875)
 + * Transactional Cluster Metadata [CEP-21] (CASSANDRA-18330)
 + * Add ELAPSED command to cqlsh (CASSANDRA-18861)
 + * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
 + * Clean up obsolete functions and simplify cql_version handling in cqlsh 
(CASSANDRA-18787)
 +Merged from 5.0:
+  * Make CQLSSTableWriter to support building of SAI indexes (CASSANDRA-18714)
   * Append additional JVM options when using JDK17+ (CASSANDRA-19001)
   * Upgrade Python driver to 3.29.0 (CASSANDRA-19245)
   * Creating a SASI index after creating an SAI index does not break secondary 
index queries (CASSANDRA-18939)
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1dc2687c40,bcf4dc7073..be6136dd4a
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -389,20 -383,22 +391,21 @@@ public class ColumnFamilyStore implemen
          // only update these runtime-modifiable settings if they have not 
been modified.
          if (!minCompactionThreshold.isModified())
              for (ColumnFamilyStore cfs : concatWithIndexes())
 -                cfs.minCompactionThreshold = new 
DefaultValue(metadata().params.compaction.minCompactionThreshold());
 +                cfs.minCompactionThreshold = new 
DefaultValue<>(tableMetadata.params.compaction.minCompactionThreshold());
          if (!maxCompactionThreshold.isModified())
              for (ColumnFamilyStore cfs : concatWithIndexes())
 -                cfs.maxCompactionThreshold = new 
DefaultValue(metadata().params.compaction.maxCompactionThreshold());
 +                cfs.maxCompactionThreshold = new 
DefaultValue<>(tableMetadata.params.compaction.maxCompactionThreshold());
          if (!crcCheckChance.isModified())
              for (ColumnFamilyStore cfs : concatWithIndexes())
 -                cfs.crcCheckChance = new 
DefaultValue(metadata().params.crcCheckChance);
 -
 -        
compactionStrategyManager.maybeReloadParamsFromSchema(metadata().params.compaction);
 +                cfs.crcCheckChance = new 
DefaultValue<>(tableMetadata.params.crcCheckChance);
  
 -        indexManager.reload();
 +        
compactionStrategyManager.maybeReloadParamsFromSchema(tableMetadata.params.compaction);
  
 -        memtableFactory = metadata().params.memtable.factory();
 +        indexManager.reload(tableMetadata);
  
 +        memtableFactory = tableMetadata.params.memtable.factory();
-         switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, tableMetadata, 
Memtable::metadataUpdated);
+         if (DatabaseDescriptor.isDaemonInitialized())
 -            switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, 
Memtable::metadataUpdated);
++            switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, tableMetadata, 
Memtable::metadataUpdated);
      }
  
      public static Runnable getBackgroundCompactionTaskSubmitter()
diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 60e2179c0f,7e503829aa..3211b9576a
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@@ -44,6 -47,10 +47,9 @@@ import org.apache.cassandra.cql3.statem
  import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
  import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement;
  import org.apache.cassandra.db.Clustering;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
 -import org.apache.cassandra.db.Directories.DataDirectory;
+ import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Slice;
  import org.apache.cassandra.db.Slices;
  import org.apache.cassandra.db.marshal.AbstractType;
@@@ -55,8 -63,9 +62,10 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.util.File;
  import org.apache.cassandra.schema.KeyspaceMetadata;
  import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.schema.Keyspaces;
  import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.schema.SchemaConstants;
++import org.apache.cassandra.schema.SchemaTransformation;
  import org.apache.cassandra.schema.SchemaTransformations;
  import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.schema.TableMetadataRef;
@@@ -65,7 -74,6 +74,9 @@@ import org.apache.cassandra.schema.Type
  import org.apache.cassandra.schema.UserFunctions;
  import org.apache.cassandra.schema.Views;
  import org.apache.cassandra.service.ClientState;
++import org.apache.cassandra.tcm.ClusterMetadata;
 +import org.apache.cassandra.tcm.ClusterMetadataService;
++import org.apache.cassandra.tcm.transformations.AlterSchema;
  import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.JavaDriverUtils;
@@@ -603,28 -638,54 +642,72 @@@ public class CQLSSTableWriter implement
              synchronized (CQLSSTableWriter.class)
              {
                  String keyspaceName = schemaStatement.keyspace();
 -
 -                
Schema.instance.transform(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(keyspaceName,
 -                                                                              
                      KeyspaceParams.simple(1),
 -                                                                              
                      Tables.none(),
 -                                                                              
                      Views.none(),
 -                                                                              
                      Types.none(),
 -                                                                              
                      UserFunctions.none()), true));
 -
 -                KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
 -
 -                TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.table());
++                String tableName = schemaStatement.table();
 +
 +                
Schema.instance.submit(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(keyspaceName,
 +                                                                              
                   KeyspaceParams.simple(1),
 +                                                                              
                   Tables.none(),
 +                                                                              
                   Views.none(),
 +                                                                              
                   Types.none(),
 +                                                                              
                   UserFunctions.none()), true));
 +
 +                KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspaceName,
 +                                                               
KeyspaceParams.simple(1),
 +                                                               Tables.none(),
 +                                                               Views.none(),
 +                                                               Types.none(),
 +                                                               
UserFunctions.none());
 +
-                 TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.table());
++                TableMetadata tableMetadata = 
Schema.instance.getTableMetadata(keyspaceName, tableName);
                  if (tableMetadata == null)
                  {
                      Types types = createTypes(keyspaceName);
 -                    
Schema.instance.transform(SchemaTransformations.addTypes(types, true));
 +                    
Schema.instance.submit(SchemaTransformations.addTypes(types, true));
                      tableMetadata = createTable(types);
 +                    
Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true));
+ 
+                     if (buildIndexes && !indexStatements.isEmpty())
+                     {
 -                        tableMetadata = 
applyIndexes(ksm.withSwapped(ksm.tables.with(tableMetadata)));
 -                        Keyspace ks = 
Keyspace.openWithoutSSTables(keyspaceName);
 -                        Directories directories = new 
Directories(tableMetadata, Collections.singleton(new DataDirectory(new 
File(directory.toPath()))));
 -                        ColumnFamilyStore cfs = 
ColumnFamilyStore.createColumnFamilyStore(ks,
 -                                                                              
            tableMetadata.name,
 -                                                                              
            TableMetadataRef.forOfflineTools(tableMetadata),
 -                                                                              
            directories,
 -                                                                              
            false,
 -                                                                              
            false,
 -                                                                              
            true);
 -                        ks.initCfCustom(cfs);
 -
 -                        // this is the empty directory / leftover from times 
we initialized ColumnFamilyStore
 -                        // it will automatically create directories for 
keyspace and table on disk after initialization
 -                        // we set that directory to the destination of 
generated SSTables so we just remove empty directories here
 -                        try
 -                        {
 -                            new File(directory, 
keyspaceName).deleteRecursive();
 -                        }
 -                        catch (UncheckedIOException ex)
++                        // we need to commit keyspace metadata first so 
applyIndexes sees that keyspace from TCM
++                        
commitKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(tableMetadata)));
++                        applyIndexes(keyspaceName);
++                    }
++
++                    KeyspaceMetadata keyspaceMetadata = 
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspaceName);
++                    tableMetadata = 
keyspaceMetadata.tables.getNullable(tableName);
++
++                    
Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true));
++                }
++
++                ColumnFamilyStore cfs = null;
++                if (buildIndexes && !indexStatements.isEmpty())
++                {
++                    KeyspaceMetadata keyspaceMetadata = 
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspaceName);
++                    Keyspace keyspace = Keyspace.mockKS(keyspaceMetadata);
++                    Directories directories = new Directories(tableMetadata, 
Collections.singleton(new Directories.DataDirectory(new 
File(directory.toPath()))));
++                    cfs = ColumnFamilyStore.createColumnFamilyStore(keyspace,
++                                                                    tableName,
++                                                                    
tableMetadata,
++                                                                    
directories,
++                                                                    false,
++                                                                    false);
++
++                    keyspace.initCfCustom(cfs);
++
++                    // this is the empty directory / leftover from times we 
initialized ColumnFamilyStore
++                    // it will automatically create directories for keyspace 
and table on disk after initialization
++                    // we set that directory to the destination of generated 
SSTables so we just remove empty directories here
++                    try
++                    {
++                        new File(directory, keyspaceName).deleteRecursive();
++                    }
++                    catch (UncheckedIOException ex)
++                    {
++                        if (!(ex.getCause() instanceof NoSuchFileException))
+                         {
 -                            if (!(ex.getCause() instanceof 
NoSuchFileException))
 -                            {
 -                                throw ex;
 -                            }
++                            throw ex;
+                         }
+                     }
 -
 -                    
Schema.instance.transform(SchemaTransformations.addTable(tableMetadata, true));
                  }
  
                  ModificationStatement preparedModificationStatement = 
prepareModificationStatement();
@@@ -637,6 -698,13 +720,13 @@@
                  if (format != null)
                      writer.setSSTableFormatType(format);
  
 -                if (buildIndexes && !indexStatements.isEmpty())
++                if (buildIndexes && !indexStatements.isEmpty() && cfs != null)
+                 {
 -                    StorageAttachedIndexGroup saiGroup = 
StorageAttachedIndexGroup.getIndexGroup(Schema.instance.getColumnFamilyStoreInstance(tableMetadata.id));
++                    StorageAttachedIndexGroup saiGroup = 
StorageAttachedIndexGroup.getIndexGroup(cfs);
+                     if (saiGroup != null)
+                         writer.addIndexGroup(saiGroup);
+                 }
+ 
                  return new CQLSSTableWriter(writer, 
preparedModificationStatement, 
preparedModificationStatement.getBindVariables());
              }
          }
@@@ -654,6 -722,23 +744,29 @@@
              return builder.build();
          }
  
+         /**
+          * Applies any provided index definitions to the target table
+          *
 -         * @param ksm the KeyspaceMetadata object that has the table defined
 -         * @return an updated TableMetadata instance with the indexe create 
statements applied
++         * @param keyspaceName name of the keyspace to apply indexes for
++         * @return table metadata reflecting applied indexes
+          */
 -        private TableMetadata applyIndexes(KeyspaceMetadata ksm)
++        private void applyIndexes(String keyspaceName)
+         {
+             ClientState state = ClientState.forInternalCalls();
 -            Keyspaces keyspaces = Keyspaces.of(ksm);
+ 
+             for (CreateIndexStatement.Raw statement : indexStatements)
 -                keyspaces = statement.prepare(state).apply(keyspaces);
++            {
++                Keyspaces keyspaces = 
statement.prepare(state).apply(ClusterMetadata.current());
++                commitKeyspaceMetadata(keyspaces.getNullable(keyspaceName));
++            }
++        }
+ 
 -            return 
keyspaces.get(ksm.name).get().tables.get(schemaStatement.table()).get();
++        private void commitKeyspaceMetadata(KeyspaceMetadata keyspaceMetadata)
++        {
++            SchemaTransformation schemaTransformation = metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(keyspaceMetadata);
++            ClusterMetadataService.instance().commit(new 
AlterSchema(schemaTransformation, Schema.instance));
+         }
+ 
          /**
           * Creates the table according to schema statement
           *
diff --cc test/unit/org/apache/cassandra/db/ImportTest.java
index 5cd0f15bdb,8f9c5de6fd..f6383b5ef8
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@@ -45,14 -53,16 +53,18 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.util.File;
  import org.apache.cassandra.io.util.PathUtils;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.service.CacheService;
 -import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tcm.ClusterMetadata;
 +import org.apache.cassandra.tcm.membership.NodeAddresses;
 +import org.apache.cassandra.tcm.membership.NodeState;
 +import org.apache.cassandra.tcm.transformations.Register;
 +import org.apache.cassandra.tcm.transformations.UnsafeJoin;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
  
+ import static org.apache.lucene.codecs.CodecUtil.FOOTER_MAGIC;
+ import static org.apache.lucene.codecs.CodecUtil.writeBEInt;
+ import static org.apache.lucene.codecs.CodecUtil.writeBELong;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
@@@ -702,6 -721,215 +713,215 @@@ public class ImportTest extends CQLTest
          }
      }
  
+     @Test
+     public void mustNotFailOnBuiltSAIIndexesWhenRequiredTest() throws 
Throwable
+     {
+         try
+         {
+             schemaChange(String.format("CREATE TABLE %s.%s (id int primary 
key, d int)", KEYSPACE, "sai_test"));
 -            createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) 
USING 'sai'", KEYSPACE, "sai_test"), "idx1");
++            schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 
'sai'", KEYSPACE, "sai_test"));
+ 
+             for (int i = 0; i < 10; i++)
+                 execute(String.format("INSERT INTO %s.%s (id, d) values (?, 
?)", KEYSPACE, "sai_test"), i, i);
+ 
+             ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, 
"sai_test");
+             Util.flush(cfs);
+ 
+             Set<SSTableReader> sstables = cfs.getLiveSSTables();
+             cfs.clearUnsafe();
+ 
+             File backupDir = moveToBackupDir(sstables);
+ 
+             assertEquals(0, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_test")).size());
+ 
+             SSTableImporter importer = new SSTableImporter(cfs);
+             SSTableImporter.Options options = 
SSTableImporter.Options.options(backupDir.toString())
+                                                                      
.copyData(true)
+                                                                      
.failOnMissingIndex(true)
+                                                                      .build();
+             assertTrue(importer.importNewSSTables(options).isEmpty());
+             assertEquals(10, execute(String.format("SELECT * FROM %s.%s WHERE 
d >= 0", KEYSPACE, "sai_test")).size());
+         }
+         finally
+         {
+             execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, 
"sai_test"));
+         }
+     }
+ 
+     @Test
+     public void mustNotFailOnMissingSAIIndexWhenSAIDoesNotExistTest() throws 
Throwable
+     {
+         try
+         {
+             schemaChange(String.format("CREATE TABLE %s.%s (id int primary 
key, d int)", KEYSPACE, "sai_less_test"));
+ 
+             for (int i = 0; i < 10; i++)
+                 execute(String.format("INSERT INTO %s.%s (id, d) values (?, 
?)", KEYSPACE, "sai_less_test"), i, i);
+ 
+             ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, 
"sai_less_test");
+             Util.flush(cfs);
+ 
+             Set<SSTableReader> sstables = cfs.getLiveSSTables();
+             cfs.clearUnsafe();
+ 
+             File backupDir = moveToBackupDir(sstables);
+ 
+             assertEquals(0, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_less_test")).size());
+ 
+             SSTableImporter importer = new SSTableImporter(cfs);
+             SSTableImporter.Options options = 
SSTableImporter.Options.options(backupDir.toString())
+                                                                      
.copyData(true)
+                                                                      // this 
does not mean anything
+                                                                      // 
because our table does not have any SAI index
+                                                                      
.failOnMissingIndex(true)
+                                                                      .build();
+             assertTrue(importer.importNewSSTables(options).isEmpty());
+             assertEquals(10, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_less_test")).size());
+         }
+         finally
+         {
+             execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, 
"sai_less_test"));
+         }
+     }
+ 
+     @Test
+     public void mustFailOnMissingSAIWhenRequiredTest() throws Throwable
+     {
+         File backupDir = null;
+         try
+         {
+             schemaChange(String.format("CREATE TABLE %s.%s (id int primary 
key, d int)", KEYSPACE, "sai_test"));
+ 
+             for (int i = 0; i < 10; i++)
+                 execute(String.format("INSERT INTO %s.%s (id, d) values (?, 
?)", KEYSPACE, "sai_test"), i, i);
+ 
+             ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, 
"sai_test");
+             Util.flush(cfs);
+ 
+             Set<SSTableReader> sstables = cfs.getLiveSSTables();
+             cfs.clearUnsafe();
+ 
+             backupDir = moveToBackupDir(sstables);
+ 
+             assertEquals(0, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_test")).size());
+ 
+             // create index and load sstables, they will be without indexes 
(because we created
+             // data when index was not created yet)
 -            createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) 
USING 'sai'", KEYSPACE, "sai_test"),  "idx1");
++            schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 
'sai'", KEYSPACE, "sai_test"));
+ 
+             SSTableImporter importer = new SSTableImporter(cfs);
+             SSTableImporter.Options options = 
SSTableImporter.Options.options(backupDir.toString())
+                                                                      
.copyData(true)
+                                                                      
.failOnMissingIndex(true)
+                                                                      .build();
+             assertFalse(importer.importNewSSTables(options).isEmpty());
+             assertEquals(0, execute(String.format("SELECT * FROM %s.%s WHERE 
d >= 0", KEYSPACE, "sai_test")).size());
+         }
+         finally
+         {
+             if (backupDir != null)
+             {
+                 backupDir.deleteRecursive();
+                 backupDir.parent().deleteRecursive();
+             }
+ 
+             execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, 
"sai_test"));
+         }
+     }
+ 
+     @Test
+     public void skipIndexChecksumOnSAITest() throws Throwable
+     {
+         try
+         {
+             schemaChange(String.format("CREATE TABLE %s.%s (id int primary 
key, d int)", KEYSPACE, "sai_test"));
 -            createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) 
USING 'sai'", KEYSPACE, "sai_test"), "idx1");
++            schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 
'sai'", KEYSPACE, "sai_test"));
+ 
+             for (int i = 0; i < 10; i++)
+                 execute(String.format("INSERT INTO %s.%s (id, d) values (?, 
?)", KEYSPACE, "sai_test"), i, i);
+ 
+             ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, 
"sai_test");
+             Util.flush(cfs);
+ 
+             Set<SSTableReader> sstables = cfs.getLiveSSTables();
+             cfs.clearUnsafe();
+ 
+             File backupDir = moveToBackupDir(sstables);
+ 
+             File[] dataFiles = backupDir.list(f -> f.name().endsWith('-' + 
BigFormat.Components.DATA.type.repr));
+ 
+             IndexDescriptor indexDescriptor = 
IndexDescriptor.create(Descriptor.fromFile(dataFiles[0]),
+                                                                      
Murmur3Partitioner.instance,
+                                                                      
Schema.instance.getTableMetadata(KEYSPACE, "sai_test").comparator);
+             IndexIdentifier indexIdentifier = new IndexIdentifier(KEYSPACE, 
"sai_test", "idx1");
+ 
+             // corrupt one of index files
+             try (IndexOutputWriter output = 
indexDescriptor.openPerIndexOutput(IndexComponent.COLUMN_COMPLETION_MARKER, 
indexIdentifier))
+             {
+                 SAICodecUtils.writeHeader(output);
+                 output.writeByte((byte) 0);
+                 // taken from SAICodecUtils#writeFooter
+                 writeBEInt(output, FOOTER_MAGIC);
+                 writeBEInt(output, 0);
+                 writeBELong(output, 123); // some garbage checksum value to 
prove the point
+             }
+ 
+             assertEquals(0, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_test")).size());
+ 
+             SSTableImporter importer = new SSTableImporter(cfs);
+             SSTableImporter.Options options = 
SSTableImporter.Options.options(backupDir.toString())
+                                                                      
.copyData(true)
+                                                                      
.failOnMissingIndex(true)
+                                                                      
.validateIndexChecksum(false)
+                                                                      .build();
+ 
+             // even with corrupted column completion marker (wrong checksum), 
it will import
+             assertTrue(importer.importNewSSTables(options).isEmpty());
+             assertEquals(10, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_test")).size());
+         }
+         finally
+         {
+             execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, 
"sai_test"));
+         }
+     }
+ 
+     @Test
+     public void skipEmptyIndexChecksumOnSAITest() throws Throwable
+     {
+         try
+         {
+             schemaChange(String.format("CREATE TABLE %s.%s (id int primary 
key, d int)", KEYSPACE, "sai_test"));
 -            createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) 
USING 'sai'", KEYSPACE, "sai_test"), "idx1");
++            schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 
'sai'", KEYSPACE, "sai_test"));
+ 
+             // no data in indexed column = empty index
+             for (int i = 0; i < 10; i++)
+                 execute(String.format("INSERT INTO %s.%s (id) values (?)", 
KEYSPACE, "sai_test"), i);
+ 
+             ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, 
"sai_test");
+             Util.flush(cfs);
+ 
+             Set<SSTableReader> sstables = cfs.getLiveSSTables();
+             cfs.clearUnsafe();
+ 
+             File backupDir = moveToBackupDir(sstables);
+ 
+             assertEquals(0, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_test")).size());
+ 
+             SSTableImporter importer = new SSTableImporter(cfs);
+             SSTableImporter.Options options = 
SSTableImporter.Options.options(backupDir.toString())
+                                                                      
.copyData(true)
+                                                                      
.failOnMissingIndex(true)
+                                                                      
.validateIndexChecksum(true)
+                                                                      .build();
+             assertTrue(importer.importNewSSTables(options).isEmpty());
+             assertEquals(10, execute(String.format("SELECT * FROM %s.%s", 
KEYSPACE, "sai_test")).size());
+         }
+         finally
+         {
+             execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, 
"sai_test"));
+         }
+     }
+ 
      private static class MockCFS extends ColumnFamilyStore
      {
          public MockCFS(ColumnFamilyStore cfs, Directories dirs)
diff --cc 
test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index 7764da0184,a402059ed4..b7322aa1a3
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@@ -17,80 -17,37 +17,48 @@@
   */
  package org.apache.cassandra.io.sstable;
  
 +
- import java.io.IOException;
- import java.util.function.BiPredicate;
- 
 +import com.google.common.io.Files;
- import org.apache.cassandra.io.util.File;
  import org.junit.After;
  import org.junit.Before;
- import org.junit.Test;
  
+ import org.apache.cassandra.config.CassandraRelevantProperties;
+ import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.IPartitioner;
++import org.apache.cassandra.io.util.File;
 +import org.apache.cassandra.io.util.FileUtils;
  
- import static org.junit.Assert.assertEquals;
- 
- public class CQLSSTableWriterClientTest
+ public class CQLSSTableWriterClientTest extends CQLSSTableWriterTest
  {
 +    private File testDirectory;
+     private IPartitioner oldPartitioner;
  
      @Before
--    public void setUp()
-     {
-         this.testDirectory = new File(Files.createTempDir());
-         DatabaseDescriptor.clientInitialization();
-     }
- 
-     @Test
-     public void testMultipleWritersWithDistinctTables() throws IOException
++    public void setup()
      {
-         testWriterInClientMode("table1", "table2");
-     }
++        // setting this to true will execute a CQL query to table
++        // and this path is not enabled in client mode
++        verifyDataAfterLoading = false;
 +
-     @Test
-     public void testMultipleWritersWithSameTable() throws IOException
-     {
-         testWriterInClientMode("table1", "table1");
-     }
- 
-     public void testWriterInClientMode(String table1, String table2) throws 
IOException, InvalidRequestException
-     {
-         String schema = "CREATE TABLE client_test.%s ("
-                             + "  k int PRIMARY KEY,"
-                             + "  v1 text,"
-                             + "  v2 int"
-                             + ")";
-         String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, 
?)";
- 
-         CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                                                   
.inDirectory(this.testDirectory)
-                                                   
.forTable(String.format(schema, table1))
-                                                   
.using(String.format(insert, table1)).build();
- 
-         CQLSSTableWriter writer2 = CQLSSTableWriter.builder()
-                                                    
.inDirectory(this.testDirectory)
-                                                    
.forTable(String.format(schema, table2))
-                                                    
.using(String.format(insert, table2)).build();
- 
-         writer.addRow(0, "A", 0);
-         writer2.addRow(0, "A", 0);
-         writer.addRow(1, "B", 1);
-         writer2.addRow(1, "B", 1);
-         writer.close();
-         writer2.close();
- 
-         BiPredicate<File, String> filter = (dir, name) -> 
name.endsWith("-Data.db");
- 
-         File[] dataFiles = this.testDirectory.tryList(filter);
-         assertEquals(2, dataFiles.length);
++        this.testDirectory = new File(Files.createTempDir());
+         DatabaseDescriptor.clientInitialization(true,
+                                                 () -> {
+                                                     Config config = new 
Config();
 -                                                    
config.data_file_directories = new String[]{ dataDir.absolutePath() };
++                                                    
config.data_file_directories = new String[]{ testDirectory.absolutePath() };
+                                                     return config;
+                                                 });
+         
CassandraRelevantProperties.FORCE_LOAD_LOCAL_KEYSPACES.setBoolean(true);
+         oldPartitioner = 
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+         Keyspace.setInitialized();
      }
  
      @After
      public void tearDown()
      {
 +        FileUtils.deleteRecursive(this.testDirectory);
+         DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
      }
  }
diff --cc 
test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java
index 0000000000,8e96073705..64c617667c
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java
@@@ -1,0 -1,40 +1,44 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 -package org.apache.cassandra.io.sstable;
+ 
++package org.apache.cassandra.io.sstable;
+ 
+ import org.junit.BeforeClass;
+ 
+ import org.apache.cassandra.SchemaLoader;
++import org.apache.cassandra.ServerTestUtils;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.service.StorageService;
+ 
+ public class CQLSSTableWriterDaemonTest extends CQLSSTableWriterTest
+ {
+     @BeforeClass
+     public static void setup() throws Exception
+     {
+         DatabaseDescriptor.daemonInitialization();
+         CommitLog.instance.start();
+         SchemaLoader.cleanupAndLeaveDirs();
+         Keyspace.setInitialized();
++        ServerTestUtils.prepareServerNoRegister();
++        MessagingService.instance().waitUntilListeningUnchecked();
+         StorageService.instance.initServer();
+     }
+ }
diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index ac0d1d682a,de592283cc..03fbf965ae
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@@ -15,13 -15,20 +15,20 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
 -
  package org.apache.cassandra.io.sstable;
  
 +
  import java.io.IOException;
  import java.nio.ByteBuffer;
- import java.util.*;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Map;
 -import java.util.Set;
+ import java.util.UUID;
 +import java.util.concurrent.ExecutionException;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.function.BiPredicate;
@@@ -41,22 -44,29 +44,36 @@@ import org.junit.Test
  import org.junit.rules.TemporaryFolder;
  
  import com.datastax.driver.core.utils.UUIDs;
- import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
- import org.apache.cassandra.config.*;
- import org.apache.cassandra.cql3.*;
- import org.apache.cassandra.cql3.functions.types.*;
- import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.commitlog.CommitLog;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.UntypedResultSet;
+ import org.apache.cassandra.cql3.functions.types.DataType;
+ import org.apache.cassandra.cql3.functions.types.LocalDate;
+ import org.apache.cassandra.cql3.functions.types.TypeCodec;
+ import org.apache.cassandra.cql3.functions.types.UDTValue;
+ import org.apache.cassandra.cql3.functions.types.UserType;
 -import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.marshal.UTF8Type;
- import org.apache.cassandra.dht.*;
- import org.apache.cassandra.exceptions.*;
- import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
++import org.apache.cassandra.dht.Range;
++import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
+ import org.apache.cassandra.index.sai.utils.IndexIdentifier;
+ import org.apache.cassandra.io.sstable.format.big.BigFormat;
+ import org.apache.cassandra.io.util.File;
+ import org.apache.cassandra.io.util.PathUtils;
++import org.apache.cassandra.locator.RangesAtEndpoint;
++import org.apache.cassandra.schema.KeyspaceMetadata;
  import org.apache.cassandra.schema.Schema;
++import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
- import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.tcm.ClusterMetadata;
  import org.apache.cassandra.transport.ProtocolVersion;
- import org.apache.cassandra.utils.*;
+ import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.JavaDriverUtils;
++import org.apache.cassandra.utils.OutputHandler;
  
  import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
  import static org.junit.Assert.assertEquals;
@@@ -65,33 -75,20 +82,21 @@@ import static org.junit.Assert.assertNo
  import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;
  
- public class CQLSSTableWriterTest
+ @Ignore
+ public abstract class CQLSSTableWriterTest
  {
      private static final AtomicInteger idGen = new AtomicInteger(0);
+     private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
+ 
 -    @Rule
 -    public TemporaryFolder tempFolder = new TemporaryFolder();
 -
      private String keyspace;
 -    protected String table;
 +    private String table;
      private String qualifiedTable;
 -    protected File dataDir;
 +    private File dataDir;
- 
-     static
-     {
-         DatabaseDescriptor.daemonInitialization();
-     }
++    protected boolean verifyDataAfterLoading = true;
 +
 +    @Rule
 +    public TemporaryFolder tempFolder = new TemporaryFolder();
  
-     @BeforeClass
-     public static void setup() throws Exception
-     {
-         CommitLog.instance.start();
-         SchemaLoader.cleanupAndLeaveDirs();
-         Keyspace.setInitialized();
-         ServerTestUtils.prepareServerNoRegister();
-         MessagingService.instance().waitUntilListeningUnchecked();
-         StorageService.instance.initServer();
-     }
- 
      @Before
      public void perTestSetup() throws IOException
      {
@@@ -105,13 -102,13 +110,13 @@@
      @Test
      public void testUnsortedWriter() throws Exception
      {
--        try (AutoCloseable switcher = 
Util.switchPartitioner(ByteOrderedPartitioner.instance))
++        try (AutoCloseable ignored = 
Util.switchPartitioner(ByteOrderedPartitioner.instance))
          {
              String schema = "CREATE TABLE " + qualifiedTable + " ("
-                           + "  k int PRIMARY KEY,"
-                           + "  v1 text,"
-                           + "  v2 int"
-                           + ")";
+                             + "  k int PRIMARY KEY,"
+                             + "  v1 text,"
+                             + "  v2 int"
+                             + ")";
              String insert = "INSERT INTO " + qualifiedTable + " (k, v1, v2) 
VALUES (?, ?, ?)";
              CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                        .inDirectory(dataDir)
@@@ -125,34 -122,34 +130,37 @@@
  
              writer.close();
  
--            loadSSTables(dataDir, keyspace);
- 
-             UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
-             assertEquals(4, rs.size());
- 
-             Iterator<UntypedResultSet.Row> iter = rs.iterator();
-             UntypedResultSet.Row row;
- 
-             row = iter.next();
-             assertEquals(0, row.getInt("k"));
-             assertEquals("test1", row.getString("v1"));
-             assertEquals(24, row.getInt("v2"));
- 
-             row = iter.next();
-             assertEquals(1, row.getInt("k"));
-             assertEquals("test2", row.getString("v1"));
-             //assertFalse(row.has("v2"));
-             assertEquals(44, row.getInt("v2"));
++            loadSSTables(dataDir, keyspace, table);
  
 -            UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
 -            assertEquals(4, rs.size());
 -
 -            Iterator<UntypedResultSet.Row> iter = rs.iterator();
 -            UntypedResultSet.Row row;
 -
 -            row = iter.next();
 -            assertEquals(0, row.getInt("k"));
 -            assertEquals("test1", row.getString("v1"));
 -            assertEquals(24, row.getInt("v2"));
 -
 -            row = iter.next();
 -            assertEquals(1, row.getInt("k"));
 -            assertEquals("test2", row.getString("v1"));
 -            //assertFalse(row.has("v2"));
 -            assertEquals(44, row.getInt("v2"));
 -
--            row = iter.next();
--            assertEquals(2, row.getInt("k"));
--            assertEquals("test3", row.getString("v1"));
--            assertEquals(42, row.getInt("v2"));
--
--            row = iter.next();
--            assertEquals(3, row.getInt("k"));
--            assertEquals(null, row.getBytes("v1")); // Using getBytes because 
we know it won't NPE
--            assertEquals(12, row.getInt("v2"));
++            if (verifyDataAfterLoading)
++            {
++                UntypedResultSet rs = QueryProcessor.executeInternal("SELECT 
* FROM " + qualifiedTable);
++                assertEquals(4, rs.size());
++
++                Iterator<UntypedResultSet.Row> iter = rs.iterator();
++                UntypedResultSet.Row row;
++
++                row = iter.next();
++                assertEquals(0, row.getInt("k"));
++                assertEquals("test1", row.getString("v1"));
++                assertEquals(24, row.getInt("v2"));
++
++                row = iter.next();
++                assertEquals(1, row.getInt("k"));
++                assertEquals("test2", row.getString("v1"));
++                //assertFalse(row.has("v2"));
++                assertEquals(44, row.getInt("v2"));
++
++                row = iter.next();
++                assertEquals(2, row.getInt("k"));
++                assertEquals("test3", row.getString("v1"));
++                assertEquals(42, row.getInt("v2"));
++
++                row = iter.next();
++                assertEquals(3, row.getInt("k"));
++                assertEquals(null, row.getBytes("v1")); // Using getBytes 
because we know it won't NPE
++                assertEquals(12, row.getInt("v2"));
++            }
          }
      }
  
@@@ -245,8 -241,8 +253,12 @@@
                                + ")";
  
          testUpdateStatement(); // start by adding some data
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(2, resultSet.size());
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(2, resultSet.size());
++        }
  
          CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                    .inDirectory(dataDir)
@@@ -258,12 -254,12 +270,15 @@@
          writer.addRow(1, 2, 3);
          writer.addRow(4, 5, 6);
          writer.close();
--        loadSSTables(dataDir, keyspace);
++        loadSSTables(dataDir, keyspace, table);
  
--        resultSet = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable);
--        assertEquals(0, resultSet.size());
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        assertFalse(iter.hasNext());
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(0, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            assertFalse(iter.hasNext());
++        }
      }
  
      @Test
@@@ -292,32 -288,32 +307,35 @@@
          writer.addRow(2, 8, 9, "d");
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(4, resultSet.size());
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals(1, r1.getInt("k"));
--        assertEquals(2, r1.getInt("c1"));
--        assertEquals(3, r1.getInt("c2"));
--        assertEquals("a", r1.getString("v"));
--        UntypedResultSet.Row r2 = iter.next();
--        assertEquals(1, r2.getInt("k"));
--        assertEquals(4, r2.getInt("c1"));
--        assertEquals(5, r2.getInt("c2"));
--        assertEquals("b", r2.getString("v"));
--        UntypedResultSet.Row r3 = iter.next();
--        assertEquals(1, r3.getInt("k"));
--        assertEquals(6, r3.getInt("c1"));
--        assertEquals(7, r3.getInt("c2"));
--        assertEquals("c", r3.getString("v"));
--        UntypedResultSet.Row r4 = iter.next();
--        assertEquals(2, r4.getInt("k"));
--        assertEquals(8, r4.getInt("c1"));
--        assertEquals(9, r4.getInt("c2"));
--        assertEquals("d", r4.getString("v"));
--        assertFalse(iter.hasNext());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(4, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(2, r1.getInt("c1"));
++            assertEquals(3, r1.getInt("c2"));
++            assertEquals("a", r1.getString("v"));
++            UntypedResultSet.Row r2 = iter.next();
++            assertEquals(1, r2.getInt("k"));
++            assertEquals(4, r2.getInt("c1"));
++            assertEquals(5, r2.getInt("c2"));
++            assertEquals("b", r2.getString("v"));
++            UntypedResultSet.Row r3 = iter.next();
++            assertEquals(1, r3.getInt("k"));
++            assertEquals(6, r3.getInt("c1"));
++            assertEquals(7, r3.getInt("c2"));
++            assertEquals("c", r3.getString("v"));
++            UntypedResultSet.Row r4 = iter.next();
++            assertEquals(2, r4.getInt("k"));
++            assertEquals(8, r4.getInt("c1"));
++            assertEquals(9, r4.getInt("c2"));
++            assertEquals("d", r4.getString("v"));
++            assertFalse(iter.hasNext());
++        }
  
          writer = CQLSSTableWriter.builder()
                                   .inDirectory(dataDir)
@@@ -328,17 -324,17 +346,21 @@@
  
          writer.addRow(1);
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        resultSet = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable);
--        assertEquals(1, resultSet.size());
--        iter = resultSet.iterator();
--        UntypedResultSet.Row r5 = iter.next();
--        assertEquals(2, r5.getInt("k"));
--        assertEquals(8, r5.getInt("c1"));
--        assertEquals(9, r5.getInt("c2"));
--        assertEquals("d", r5.getString("v"));
--        assertFalse(iter.hasNext());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(1, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r5 = iter.next();
++            assertEquals(2, r5.getInt("k"));
++            assertEquals(8, r5.getInt("c1"));
++            assertEquals(9, r5.getInt("c2"));
++            assertEquals("d", r5.getString("v"));
++            assertFalse(iter.hasNext());
++        }
      }
  
      @Test
@@@ -364,41 -360,41 +386,46 @@@
                                                          
.using(String.format("DELETE FROM %s WHERE k=? AND c1=? and c2>=?", 
qualifiedTable))
                                                          .build();
  
--        updateWriter.addRow("v0.0", "a", 0, 0);
--        updateWriter.addRow("v0.1", "a", 0, 1);
--        updateWriter.addRow("v0.2", "a", 0, 2);
--        updateWriter.addRow("v0.0", "b", 0, 0);
--        updateWriter.addRow("v0.1", "b", 0, 1);
--        updateWriter.addRow("v0.2", "b", 0, 2);
--        updateWriter.close();
--        deleteWriter.addRow("a", 0, 1);
--        deleteWriter.addRow("b", 0, 2);
++        if (verifyDataAfterLoading)
++        {
++            updateWriter.addRow("v0.0", "a", 0, 0);
++            updateWriter.addRow("v0.1", "a", 0, 1);
++            updateWriter.addRow("v0.2", "a", 0, 2);
++            updateWriter.addRow("v0.0", "b", 0, 0);
++            updateWriter.addRow("v0.1", "b", 0, 1);
++            updateWriter.addRow("v0.2", "b", 0, 2);
++            updateWriter.close();
++            deleteWriter.addRow("a", 0, 1);
++            deleteWriter.addRow("b", 0, 2);
++        }
++
          deleteWriter.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(3, resultSet.size());
--
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals("a", r1.getString("k"));
--        assertEquals(0, r1.getInt("c1"));
--        assertEquals(0, r1.getInt("c2"));
--        UntypedResultSet.Row r2 = iter.next();
--        assertEquals("b", r2.getString("k"));
--        assertEquals(0, r2.getInt("c1"));
--        assertEquals(0, r2.getInt("c2"));
--        UntypedResultSet.Row r3 = iter.next();
--        assertEquals("b", r3.getString("k"));
--        assertEquals(0, r3.getInt("c1"));
--        assertEquals(1, r3.getInt("c2"));
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(3, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals("a", r1.getString("k"));
++            assertEquals(0, r1.getInt("c1"));
++            assertEquals(0, r1.getInt("c2"));
++            UntypedResultSet.Row r2 = iter.next();
++            assertEquals("b", r2.getString("k"));
++            assertEquals(0, r2.getInt("c1"));
++            assertEquals(0, r2.getInt("c2"));
++            UntypedResultSet.Row r3 = iter.next();
++            assertEquals("b", r3.getString("k"));
++            assertEquals(0, r3.getInt("c1"));
++            assertEquals(1, r3.getInt("c2"));
++        }
      }
  
      @Test
      public void testDeleteRangeEmptyKeyComponent() throws Exception
      {
--
--
          final String schema = "CREATE TABLE " + qualifiedTable + " ("
                                + "  k text,"
                                + "  c1 int,"
@@@ -428,20 -424,20 +455,23 @@@
          deleteWriter.addRow("a", 0);
          deleteWriter.addRow("b", 0);
          deleteWriter.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(2, resultSet.size());
--
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals("a", r1.getString("k"));
--        assertEquals(1, r1.getInt("c1"));
--        assertEquals(2, r1.getInt("c2"));
--        UntypedResultSet.Row r2 = iter.next();
--        assertEquals("b", r2.getString("k"));
--        assertEquals(1, r2.getInt("c1"));
--        assertEquals(2, r2.getInt("c2"));
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(2, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals("a", r1.getString("k"));
++            assertEquals(1, r1.getInt("c1"));
++            assertEquals(2, r1.getInt("c2"));
++            UntypedResultSet.Row r2 = iter.next();
++            assertEquals("b", r2.getString("k"));
++            assertEquals(1, r2.getInt("c1"));
++            assertEquals(2, r2.getInt("c2"));
++        }
      }
  
      @Test
@@@ -482,79 -478,35 +512,43 @@@
          updateWriter.addRow("v0.3", "b", 3, 4);
          updateWriter.close();
  
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(2, resultSet.size());
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row insertedRow = iter.next();
--        assertEquals("v0.2", insertedRow.getString("v"));
--        assertEquals("a", insertedRow.getString("k"));
--        assertEquals(1, insertedRow.getInt("c1"));
--        assertEquals(2, insertedRow.getInt("c2"));
--        UntypedResultSet.Row updatedRow = iter.next();
--        assertEquals("v0.3", updatedRow.getString("v"));
--        assertEquals("b", updatedRow.getString("k"));
--        assertEquals(3, updatedRow.getInt("c1"));
--        assertEquals(4, updatedRow.getInt("c2"));
--
--        deleteWriter.addRow("a", 1, 2);
--        deleteWriter.addRow("b", 3, 4);
-         deleteWriter.close();
-         loadSSTables(dataDir, keyspace);
- 
-         resultSet = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable);
-         assertEquals(1, resultSet.size());
-         iter = resultSet.iterator();
-         UntypedResultSet.Row modifiedRow = iter.next();
-         assertFalse(modifiedRow.has("v"));
-         assertEquals("a", modifiedRow.getString("k"));
-         assertEquals(1, modifiedRow.getInt("c1"));
-         assertEquals(2, modifiedRow.getInt("c2"));
-     }
- 
-     private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
-     private class WriterThread extends Thread
-     {
-         private final File dataDir;
-         private final int id;
-         private final String qualifiedTable;
-         public volatile Exception exception;
++        loadSSTables(dataDir, keyspace, table);
 +
-         public WriterThread(File dataDir, int id, String qualifiedTable)
++        if (verifyDataAfterLoading)
 +        {
-             this.dataDir = dataDir;
-             this.id = id;
-             this.qualifiedTable = qualifiedTable;
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(2, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row insertedRow = iter.next();
++            assertEquals("v0.2", insertedRow.getString("v"));
++            assertEquals("a", insertedRow.getString("k"));
++            assertEquals(1, insertedRow.getInt("c1"));
++            assertEquals(2, insertedRow.getInt("c2"));
++            UntypedResultSet.Row updatedRow = iter.next();
++            assertEquals("v0.3", updatedRow.getString("v"));
++            assertEquals("b", updatedRow.getString("k"));
++            assertEquals(3, updatedRow.getInt("c1"));
++            assertEquals(4, updatedRow.getInt("c2"));
++
++            deleteWriter.addRow("a", 1, 2);
++            deleteWriter.addRow("b", 3, 4);
++
 +        }
 +
-         @Override
-         public void run()
-         {
-             String schema = "CREATE TABLE " + qualifiedTable + " ("
-                     + "  k int,"
-                     + "  v int,"
-                     + "  PRIMARY KEY (k, v)"
-                     + ")";
-             String insert = "INSERT INTO " + qualifiedTable + " (k, v) VALUES 
(?, ?)";
-             CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                     .inDirectory(dataDir)
-                     .forTable(schema)
-                     .using(insert).build();
+         deleteWriter.close();
 -        loadSSTables(dataDir, keyspace);
 -
 -        resultSet = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable);
 -        assertEquals(1, resultSet.size());
 -        iter = resultSet.iterator();
 -        UntypedResultSet.Row modifiedRow = iter.next();
 -        assertFalse(modifiedRow.has("v"));
 -        assertEquals("a", modifiedRow.getString("k"));
 -        assertEquals(1, modifiedRow.getInt("c1"));
 -        assertEquals(2, modifiedRow.getInt("c2"));
++        loadSSTables(dataDir, keyspace, table);
 +
-             try
-             {
-                 for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
-                 {
-                     writer.addRow(id, i);
-                 }
-                 writer.close();
-             }
-             catch (Exception e)
-             {
-                 exception = e;
-             }
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(1, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row modifiedRow = iter.next();
++            assertFalse(modifiedRow.has("v"));
++            assertEquals("a", modifiedRow.getString("k"));
++            assertEquals(1, modifiedRow.getInt("c1"));
++            assertEquals(2, modifiedRow.getInt("c2"));
 +        }
      }
  
      @Test
@@@ -578,10 -530,10 +572,13 @@@
              }
          }
  
--        loadSSTables(dataDir, keyspace);
++        loadSSTables(dataDir, keyspace, table);
  
--        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ qualifiedTable + ";");
--        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable + ";");
++            assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, 
rs.size());
++        }
      }
  
      @Test
@@@ -623,30 -575,31 +620,34 @@@
          }
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + keyspace + "." + table);
--        TypeCodec collectionCodec = 
JavaDriverUtils.codecFor(DataType.CollectionType.list(tuple2Type));
--        TypeCodec tuple3Codec = JavaDriverUtils.codecFor(tuple3Type);
- 
-         assertEquals(resultSet.size(), 100);
-         int cnt = 0;
-         for (UntypedResultSet.Row row: resultSet) {
-             assertEquals(cnt,
-                          row.getInt("k"));
-             List<UDTValue> values = (List<UDTValue>) 
collectionCodec.deserialize(row.getBytes("v1"),
-                                                                               
   ProtocolVersion.CURRENT);
-             assertEquals(values.get(0).getInt("a"), cnt * 10);
-             assertEquals(values.get(0).getInt("b"), cnt * 20);
-             assertEquals(values.get(1).getInt("a"), cnt * 30);
-             assertEquals(values.get(1).getInt("b"), cnt * 40);
- 
-             UDTValue v2 = (UDTValue) 
tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.CURRENT);
- 
-             assertEquals(v2.getInt("a"), cnt * 100);
-             assertEquals(v2.getInt("b"), cnt * 200);
-             assertEquals(v2.getInt("c"), cnt * 300);
-             cnt++;
++        loadSSTables(dataDir, keyspace, table);
+ 
 -        assertEquals(resultSet.size(), 100);
 -        int cnt = 0;
 -        for (UntypedResultSet.Row row : resultSet)
++        if (verifyDataAfterLoading)
+         {
 -            assertEquals(cnt,
 -                         row.getInt("k"));
 -            List<UDTValue> values = (List<UDTValue>) 
collectionCodec.deserialize(row.getBytes("v1"),
 -                                                                              
   ProtocolVersion.CURRENT);
 -            assertEquals(values.get(0).getInt("a"), cnt * 10);
 -            assertEquals(values.get(0).getInt("b"), cnt * 20);
 -            assertEquals(values.get(1).getInt("a"), cnt * 30);
 -            assertEquals(values.get(1).getInt("b"), cnt * 40);
 -
 -            UDTValue v2 = (UDTValue) 
tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.CURRENT);
 -
 -            assertEquals(v2.getInt("a"), cnt * 100);
 -            assertEquals(v2.getInt("b"), cnt * 200);
 -            assertEquals(v2.getInt("c"), cnt * 300);
 -            cnt++;
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table);
++            TypeCodec collectionCodec = 
JavaDriverUtils.codecFor(DataType.CollectionType.list(tuple2Type));
++            TypeCodec tuple3Codec = JavaDriverUtils.codecFor(tuple3Type);
++
++            assertEquals(resultSet.size(), 100);
++            int cnt = 0;
++            for (UntypedResultSet.Row row : resultSet)
++            {
++                assertEquals(cnt,
++                             row.getInt("k"));
++                List<UDTValue> values = (List<UDTValue>) 
collectionCodec.deserialize(row.getBytes("v1"),
++                                                                              
       ProtocolVersion.CURRENT);
++                assertEquals(values.get(0).getInt("a"), cnt * 10);
++                assertEquals(values.get(0).getInt("b"), cnt * 20);
++                assertEquals(values.get(1).getInt("a"), cnt * 30);
++                assertEquals(values.get(1).getInt("b"), cnt * 40);
++
++                UDTValue v2 = (UDTValue) 
tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.CURRENT);
++
++                assertEquals(v2.getInt("a"), cnt * 100);
++                assertEquals(v2.getInt("b"), cnt * 200);
++                assertEquals(v2.getInt("c"), cnt * 300);
++                cnt++;
++            }
          }
      }
  
@@@ -687,23 -640,24 +688,27 @@@
          }
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
- 
-         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + keyspace + "." + table);
- 
-         assertEquals(resultSet.size(), 100);
-         int cnt = 0;
-         for (UntypedResultSet.Row row: resultSet) {
-             assertEquals(cnt,
-                          row.getInt("k"));
-             UDTValue nestedTpl = (UDTValue) 
nestedTupleCodec.deserialize(row.getBytes("v1"),
-                                                                          
ProtocolVersion.CURRENT);
-             assertEquals(nestedTpl.getInt("c"), cnt * 100);
-             UDTValue tpl = nestedTpl.getUDTValue("tpl");
-             assertEquals(tpl.getInt("a"), cnt * 200);
-             assertEquals(tpl.getInt("b"), cnt * 300);
- 
-             cnt++;
++        loadSSTables(dataDir, keyspace, table);
+ 
 -        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + keyspace + "." + table);
 -
 -        assertEquals(resultSet.size(), 100);
 -        int cnt = 0;
 -        for (UntypedResultSet.Row row : resultSet)
++        if (verifyDataAfterLoading)
+         {
 -            assertEquals(cnt,
 -                         row.getInt("k"));
 -            UDTValue nestedTpl = (UDTValue) 
nestedTupleCodec.deserialize(row.getBytes("v1"),
 -                                                                         
ProtocolVersion.CURRENT);
 -            assertEquals(nestedTpl.getInt("c"), cnt * 100);
 -            UDTValue tpl = nestedTpl.getUDTValue("tpl");
 -            assertEquals(tpl.getInt("a"), cnt * 200);
 -            assertEquals(tpl.getInt("b"), cnt * 300);
 -
 -            cnt++;
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table);
++
++            assertEquals(resultSet.size(), 100);
++            int cnt = 0;
++            for (UntypedResultSet.Row row : resultSet)
++            {
++                assertEquals(cnt,
++                             row.getInt("k"));
++                UDTValue nestedTpl = (UDTValue) 
nestedTupleCodec.deserialize(row.getBytes("v1"),
++                                                                             
ProtocolVersion.CURRENT);
++                assertEquals(nestedTpl.getInt("c"), cnt * 100);
++                UDTValue tpl = nestedTpl.getUDTValue("tpl");
++                assertEquals(tpl.getInt("a"), cnt * 200);
++                assertEquals(tpl.getInt("b"), cnt * 300);
++
++                cnt++;
++            }
          }
      }
  
@@@ -771,36 -725,36 +776,39 @@@
          writer.addRow(5, 5, 5, "5");
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals(1, r1.getInt("k"));
--        assertEquals(1, r1.getInt("c1"));
--        assertEquals(1, r1.getInt("c2"));
--        assertEquals(false, r1.has("v"));
--        UntypedResultSet.Row r2 = iter.next();
--        assertEquals(2, r2.getInt("k"));
--        assertEquals(2, r2.getInt("c1"));
--        assertEquals(2, r2.getInt("c2"));
--        assertEquals(false, r2.has("v"));
--        UntypedResultSet.Row r3 = iter.next();
--        assertEquals(3, r3.getInt("k"));
--        assertEquals(3, r3.getInt("c1"));
--        assertEquals(3, r3.getInt("c2"));
--        assertEquals(false, r3.has("v"));
--        UntypedResultSet.Row r4 = iter.next();
--        assertEquals(4, r4.getInt("k"));
--        assertEquals(4, r4.getInt("c1"));
--        assertEquals(4, r4.getInt("c2"));
--        assertEquals(false, r3.has("v"));
--        UntypedResultSet.Row r5 = iter.next();
--        assertEquals(5, r5.getInt("k"));
--        assertEquals(5, r5.getInt("c1"));
--        assertEquals(5, r5.getInt("c2"));
--        assertEquals(true, r5.has("v"));
--        assertEquals("5", r5.getString("v"));
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(1, r1.getInt("c1"));
++            assertEquals(1, r1.getInt("c2"));
++            assertEquals(false, r1.has("v"));
++            UntypedResultSet.Row r2 = iter.next();
++            assertEquals(2, r2.getInt("k"));
++            assertEquals(2, r2.getInt("c1"));
++            assertEquals(2, r2.getInt("c2"));
++            assertEquals(false, r2.has("v"));
++            UntypedResultSet.Row r3 = iter.next();
++            assertEquals(3, r3.getInt("k"));
++            assertEquals(3, r3.getInt("c1"));
++            assertEquals(3, r3.getInt("c2"));
++            assertEquals(false, r3.has("v"));
++            UntypedResultSet.Row r4 = iter.next();
++            assertEquals(4, r4.getInt("k"));
++            assertEquals(4, r4.getInt("c1"));
++            assertEquals(4, r4.getInt("c2"));
++            assertEquals(false, r3.has("v"));
++            UntypedResultSet.Row r5 = iter.next();
++            assertEquals(5, r5.getInt("k"));
++            assertEquals(5, r5.getInt("c1"));
++            assertEquals(5, r5.getInt("c2"));
++            assertEquals(true, r5.has("v"));
++            assertEquals("5", r5.getString("v"));
++        }
      }
  
      @Test
@@@ -826,23 -780,23 +834,26 @@@
          writer.addRow(null, 7, 8, 9);
          writer.addRow(CQLSSTableWriter.UNSET_VALUE, 10, 11, 12);
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(2, resultSet.size());
--
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals(1, r1.getInt("k"));
--        assertEquals(2, r1.getInt("c1"));
--        assertEquals(3, r1.getInt("c2"));
--        assertEquals("a", r1.getString("v"));
--        UntypedResultSet.Row r2 = iter.next();
--        assertEquals(4, r2.getInt("k"));
--        assertEquals(5, r2.getInt("c1"));
--        assertEquals(6, r2.getInt("c2"));
--        assertEquals("b", r2.getString("v"));
--        assertFalse(iter.hasNext());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(2, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(2, r1.getInt("c1"));
++            assertEquals(3, r1.getInt("c2"));
++            assertEquals("a", r1.getString("v"));
++            UntypedResultSet.Row r2 = iter.next();
++            assertEquals(4, r2.getInt("k"));
++            assertEquals(5, r2.getInt("c1"));
++            assertEquals(6, r2.getInt("c2"));
++            assertEquals("b", r2.getString("v"));
++            assertFalse(iter.hasNext());
++        }
      }
  
      @Test
@@@ -866,25 -820,25 +877,28 @@@
          writer.addRow(4, 5, 6, "efg");
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
 -
 -        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
 -        assertEquals(2, resultSet.size());
 -
 -        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
 -        UntypedResultSet.Row r1 = iter.next();
 -        assertEquals(1, r1.getInt("k"));
 -        assertEquals(2, r1.getInt("c1"));
 -        assertEquals(3, r1.getInt("c2"));
 -        assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v"));
++        loadSSTables(dataDir, keyspace, table);
  
-         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
-         assertEquals(2, resultSet.size());
- 
-         Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
-         UntypedResultSet.Row r1 = iter.next();
-         assertEquals(1, r1.getInt("k"));
-         assertEquals(2, r1.getInt("c1"));
-         assertEquals(3, r1.getInt("c2"));
-         assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v"));
- 
--        UntypedResultSet.Row r2 = iter.next();
--        assertEquals(4, r2.getInt("k"));
--        assertEquals(5, r2.getInt("c1"));
--        assertEquals(6, r2.getInt("c2"));
--        assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v"));
--
--        assertFalse(iter.hasNext());
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(2, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(2, r1.getInt("c1"));
++            assertEquals(3, r1.getInt("c2"));
++            assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v"));
++
++            UntypedResultSet.Row r2 = iter.next();
++            assertEquals(4, r2.getInt("k"));
++            assertEquals(5, r2.getInt("c1"));
++            assertEquals(6, r2.getInt("c2"));
++            assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v"));
++
++            assertFalse(iter.hasNext());
++        }
      }
  
      @Test
@@@ -913,10 -867,10 +927,13 @@@
          }
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
++        loadSSTables(dataDir, keyspace, table);
  
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(100, resultSet.size());
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(100, resultSet.size());
++        }
      }
  
      @Test
@@@ -944,16 -899,17 +962,20 @@@
              writer.addRow(i + ID_OFFSET, LocalDate.fromDaysSinceEpoch(i));
          }
          writer.close();
--        loadSSTables(dataDir, keyspace);
- 
-         UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ qualifiedTable + ";");
-         assertEquals(200, rs.size());
-         Map<Integer, LocalDate> map = StreamSupport.stream(rs.spliterator(), 
false)
-                                                    .collect(Collectors.toMap( 
r -> r.getInt("k"), r -> r.getDate("c")));
-         for (int i = 0; i < 100; i++) {
-             final LocalDate expected = LocalDate.fromDaysSinceEpoch(i);
-             assertEquals(expected, map.get(i + ID_OFFSET));
-             assertEquals(expected, map.get(i));
++        loadSSTables(dataDir, keyspace, table);
+ 
 -        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ qualifiedTable + ";");
 -        assertEquals(200, rs.size());
 -        Map<Integer, LocalDate> map = StreamSupport.stream(rs.spliterator(), 
false)
 -                                                   
.collect(Collectors.toMap(r -> r.getInt("k"), r -> r.getDate("c")));
 -        for (int i = 0; i < 100; i++)
++        if (verifyDataAfterLoading)
+         {
 -            final LocalDate expected = LocalDate.fromDaysSinceEpoch(i);
 -            assertEquals(expected, map.get(i + ID_OFFSET));
 -            assertEquals(expected, map.get(i));
++            UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable + ";");
++            assertEquals(200, rs.size());
++            Map<Integer, LocalDate> map = 
StreamSupport.stream(rs.spliterator(), false)
++                                                       
.collect(Collectors.toMap(r -> r.getInt("k"), r -> r.getDate("c")));
++            for (int i = 0; i < 100; i++)
++            {
++                final LocalDate expected = LocalDate.fromDaysSinceEpoch(i);
++                assertEquals(expected, map.get(i + ID_OFFSET));
++                assertEquals(expected, map.get(i));
++            }
          }
      }
  
@@@ -988,33 -944,33 +1010,36 @@@
              writer.addRow(String.valueOf(i), map);
          }
          writer.close();
--        loadSSTables(dataDir, keyspace);
++        loadSSTables(dataDir, keyspace, table);
  
--        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ qualifiedTable + ";");
--        assertEquals(200, rs.size());
--        Map<String, Map<String, String>> map = 
StreamSupport.stream(rs.spliterator(), false)
--                                                            
.collect(Collectors.toMap(r -> r.getString("k"), r -> r.getFrozenMap("c", 
UTF8Type.instance, UTF8Type.instance)));
--        for (int i = 0; i < 200; i++)
++        if (verifyDataAfterLoading)
          {
--            final String expectedKey = String.valueOf(i);
--            assertTrue(map.containsKey(expectedKey));
--            Map<String, String> innerMap = map.get(expectedKey);
--            assertTrue(innerMap.containsKey("a_key"));
--            assertEquals(innerMap.get("a_key"), "av" + i);
--            assertTrue(innerMap.containsKey("b_key"));
--            assertEquals(innerMap.get("b_key"), "zv" + i);
--        }
++            UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable + ";");
++            assertEquals(200, rs.size());
++            Map<String, Map<String, String>> map = 
StreamSupport.stream(rs.spliterator(), false)
++                                                                
.collect(Collectors.toMap(r -> r.getString("k"), r -> r.getFrozenMap("c", 
UTF8Type.instance, UTF8Type.instance)));
++            for (int i = 0; i < 200; i++)
++            {
++                final String expectedKey = String.valueOf(i);
++                assertTrue(map.containsKey(expectedKey));
++                Map<String, String> innerMap = map.get(expectedKey);
++                assertTrue(innerMap.containsKey("a_key"));
++                assertEquals(innerMap.get("a_key"), "av" + i);
++                assertTrue(innerMap.containsKey("b_key"));
++                assertEquals(innerMap.get("b_key"), "zv" + i);
++            }
  
--        // Make sure we can filter with map values regardless of which order 
we put the keys in
--        UntypedResultSet filtered;
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='0' and c={'a_key': 'av0', 'b_key': 'zv0'};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='0' and c={'b_key': 'zv0', 'a_key': 'av0'};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='100' and c={'b_key': 'zv100', 'a_key': 'av100'};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='100' and c={'a_key': 'av100', 'b_key': 'zv100'};");
--        assertEquals(1, filtered.size());
++            // Make sure we can filter with map values regardless of which 
order we put the keys in
++            UntypedResultSet filtered;
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='0' and c={'a_key': 'av0', 'b_key': 'zv0'};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='0' and c={'b_key': 'zv0', 'a_key': 'av0'};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='100' and c={'b_key': 'zv100', 'a_key': 'av100'};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='100' and c={'a_key': 'av100', 'b_key': 'zv100'};");
++            assertEquals(1, filtered.size());
++        }
      }
  
      @Test
@@@ -1049,26 -1005,26 +1074,29 @@@
          writer.addRow(String.valueOf(2), map2);
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ qualifiedTable + ";");
--        assertEquals(2, rs.size());
--
--        // Make sure we can filter with map values regardless of which order 
we put the keys in
--        UntypedResultSet filtered;
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid1 + ": 1, " + uuid2 + ": 2};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid2 + ": 2, " + uuid1 + ": 1};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + uuid4 + ": 2};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + uuid3 + ": 1};");
--        assertEquals(1, filtered.size());
--        UUID other = UUIDs.startOf(1234L); // Just some other TimeUUID
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + other + ": 2};");
--        assertEquals(0, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + other + ": 1};");
--        assertEquals(0, filtered.size());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable + ";");
++            assertEquals(2, rs.size());
++
++            // Make sure we can filter with map values regardless of which 
order we put the keys in
++            UntypedResultSet filtered;
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid1 + ": 1, " + uuid2 + ": 2};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid2 + ": 2, " + uuid1 + ": 1};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + uuid4 + ": 2};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + uuid3 + ": 1};");
++            assertEquals(1, filtered.size());
++            UUID other = UUIDs.startOf(1234L); // Just some other TimeUUID
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + other + ": 2};");
++            assertEquals(0, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + other + ": 1};");
++            assertEquals(0, filtered.size());
++        }
      }
  
      @Test
@@@ -1101,26 -1057,26 +1129,29 @@@
          writer.addRow(String.valueOf(2), set2);
  
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ qualifiedTable + ";");
--        assertEquals(2, rs.size());
--
--        // Make sure we can filter with map values regardless of which order 
we put the keys in
--        UntypedResultSet filtered;
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid1 + ", " + uuid2 + "};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid2 + ", " + uuid1 + "};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid1 + ", " + uuid2 + "};");
--        assertEquals(1, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid2 + ", " + uuid1 + "};");
--        assertEquals(1, filtered.size());
--        UUID other = UUIDs.startOf(10000000L + 1L); // Pick one that's really 
close just to make sure clustering filters are working
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid1 + ", " + other + "};");
--        assertEquals(0, filtered.size());
--        filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + other + ", " + uuid1 + "};");
--        assertEquals(0, filtered.size());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable + ";");
++            assertEquals(2, rs.size());
++
++            // Make sure we can filter with map values regardless of which 
order we put the keys in
++            UntypedResultSet filtered;
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid1 + ", " + uuid2 + "};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='1' and c={" + uuid2 + ", " + uuid1 + "};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid1 + ", " + uuid2 + "};");
++            assertEquals(1, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid2 + ", " + uuid1 + "};");
++            assertEquals(1, filtered.size());
++            UUID other = UUIDs.startOf(10000000L + 1L); // Pick one that's 
really close just to make sure clustering filters are working
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + uuid1 + ", " + other + "};");
++            assertEquals(0, filtered.size());
++            filtered = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable + " where k='2' and c={" + other + ", " + uuid1 + "};");
++            assertEquals(0, filtered.size());
++        }
      }
  
      @Test
@@@ -1145,22 -1101,23 +1176,26 @@@
  
          // Note that, all other things being equal, Cassandra will sort these 
rows lexicographically, so we use "higher" values in the
          // row we expect to "win" so that we're sure that it isn't just 
accidentally picked due to the row sorting.
-         writer.addRow( 1, 4, 5, "b", now); // This write should be the one 
found at the end because it has a higher timestamp
-         writer.addRow( 1, 2, 3, "a", then);
+         writer.addRow(1, 4, 5, "b", now); // This write should be the one 
found at the end because it has a higher timestamp
+         writer.addRow(1, 2, 3, "a", then);
          writer.close();
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(1, resultSet.size());
--
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals(1, r1.getInt("k"));
--        assertEquals(4, r1.getInt("v1"));
--        assertEquals(5, r1.getInt("v2"));
--        assertEquals("b", r1.getString("v3"));
--        assertFalse(iter.hasNext());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(1, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(4, r1.getInt("v1"));
++            assertEquals(5, r1.getInt("v2"));
++            assertEquals("b", r1.getString("v3"));
++            assertFalse(iter.hasNext());
++        }
      }
+ 
      @Test
      public void testWriteWithTtl() throws Exception
      {
@@@ -1173,30 -1130,31 +1208,34 @@@
                                + ")";
  
          CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
-                                                          .inDirectory(dataDir)
-                                                          .forTable(schema)
-                                                          .using("INSERT INTO 
" + qualifiedTable +
-                                                                 " (k, v1, v2, 
v3) VALUES (?,?,?,?) using TTL ?");
+                                                            
.inDirectory(dataDir)
+                                                            .forTable(schema)
+                                                            .using("INSERT 
INTO " + qualifiedTable +
+                                                                   " (k, v1, 
v2, v3) VALUES (?,?,?,?) using TTL ?");
          CQLSSTableWriter writer = builder.build();
          // add a row that _should_ show up - 1 hour TTL
-         writer.addRow( 1, 2, 3, "a", 3600);
+         writer.addRow(1, 2, 3, "a", 3600);
          // Insert a row with a TTL of 1 second - should not appear in results 
once we sleep
-         writer.addRow( 2, 4, 5, "b", 1);
+         writer.addRow(2, 4, 5, "b", 1);
          writer.close();
          Thread.sleep(1200); // Slightly over 1 second, just to make sure
--        loadSSTables(dataDir, keyspace);
--
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(1, resultSet.size());
--
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals(1, r1.getInt("k"));
--        assertEquals(2, r1.getInt("v1"));
--        assertEquals(3, r1.getInt("v2"));
--        assertEquals("a", r1.getString("v3"));
--        assertFalse(iter.hasNext());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(1, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(2, r1.getInt("v1"));
++            assertEquals(3, r1.getInt("v2"));
++            assertEquals("a", r1.getString("v3"));
++            assertFalse(iter.hasNext());
++        }
      }
+ 
      @Test
      public void testWriteWithTimestampsAndTtl() throws Exception
      {
@@@ -1220,23 -1178,23 +1259,27 @@@
          long oneSecondFromNow = 
TimeUnit.MILLISECONDS.toMicros(currentTimeMillis() + 1000);
          // Insert some rows with a timestamp of 1 second from now, and 
different TTLs
          // add a row that _should_ show up - 1 hour TTL
-         writer.addRow( 1, 2, 3, "a", oneSecondFromNow, 3600);
+         writer.addRow(1, 2, 3, "a", oneSecondFromNow, 3600);
          // Insert a row "two seconds ago" with a TTL of 1 second - should not 
appear in results
-         writer.addRow( 2, 4, 5, "b", oneSecondFromNow, 1);
+         writer.addRow(2, 4, 5, "b", oneSecondFromNow, 1);
          writer.close();
--        loadSSTables(dataDir, keyspace);
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        Thread.sleep(1200);
--        resultSet = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable);
--        assertEquals(1, resultSet.size());
--
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        UntypedResultSet.Row r1 = iter.next();
--        assertEquals(1, r1.getInt("k"));
--        assertEquals(2, r1.getInt("v1"));
--        assertEquals(3, r1.getInt("v2"));
--        assertEquals("a", r1.getString("v3"));
--        assertFalse(iter.hasNext());
++        loadSSTables(dataDir, keyspace, table);
++
++        if (verifyDataAfterLoading)
++        {
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            Thread.sleep(1200);
++            resultSet = QueryProcessor.executeInternal("SELECT * FROM " + 
qualifiedTable);
++            assertEquals(1, resultSet.size());
++
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            UntypedResultSet.Row r1 = iter.next();
++            assertEquals(1, r1.getInt("k"));
++            assertEquals(2, r1.getInt("v1"));
++            assertEquals(3, r1.getInt("v2"));
++            assertEquals("a", r1.getString("v3"));
++            assertFalse(iter.hasNext());
++        }
      }
  
      @Test
@@@ -1258,15 -1216,15 +1301,18 @@@
              writer.addRow(i, UUID.randomUUID().toString());
          }
          writer.close();
--        loadSSTables(dataDir, keyspace);
++        loadSSTables(dataDir, keyspace, table);
  
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(rowCount, resultSet.size());
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        for (int i = 0; i < rowCount; i++)
++        if (verifyDataAfterLoading)
          {
--            UntypedResultSet.Row row = iter.next();
--            assertEquals(i, row.getInt("k"));
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(rowCount, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            for (int i = 0; i < rowCount; i++)
++            {
++                UntypedResultSet.Row row = iter.next();
++                assertEquals(i, row.getInt("k"));
++            }
          }
      }
  
@@@ -1301,37 -1259,203 +1347,226 @@@
          assertTrue("The file size should be close to 1MiB (with at most 50KiB 
error rate for the test)",
                     Math.abs(1024 * 1024 - closeTo1MiBFileSize) < 50 * 1024);
  
--        loadSSTables(dataDir, keyspace);
++        loadSSTables(dataDir, keyspace, table);
  
--        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + qualifiedTable);
--        assertEquals(rowCount, resultSet.size());
--        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
--        for (int i = 0; i < rowCount; i++)
++        if (verifyDataAfterLoading)
          {
--            UntypedResultSet.Row row = iter.next();
--            assertEquals(i, row.getInt("k"));
++            UntypedResultSet resultSet = 
QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable);
++            assertEquals(rowCount, resultSet.size());
++            Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
++            for (int i = 0; i < rowCount; i++)
++            {
++                UntypedResultSet.Row row = iter.next();
++                assertEquals(i, row.getInt("k"));
++            }
          }
      }
  
-     private static void loadSSTables(File dataDir, String ks) throws 
ExecutionException, InterruptedException
+     @Test
+     public void testMultipleWritersWithDistinctTables() throws IOException
+     {
 -        testWriters("table1", "table2");
++        testWriterInClientMode("table1", "table2");
+     }
+ 
+     @Test
+     public void testMultipleWritersWithSameTable() throws IOException
+     {
 -        testWriters("table1", "table1");
++        testWriterInClientMode("table1", "table1");
+     }
+ 
 -    private void testWriters(String table1, String table2) throws 
IOException, InvalidRequestException
++    public void testWriterInClientMode(String table1, String table2) throws 
IOException, InvalidRequestException
+     {
+         String schema = "CREATE TABLE client_test.%s ("
+                         + "  k int PRIMARY KEY,"
+                         + "  v1 text,"
+                         + "  v2 int"
+                         + ")";
 -
+         String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, 
?)";
+ 
+         CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                   .inDirectory(dataDir)
+                                                   
.forTable(String.format(schema, table1))
 -                                                  
.using(String.format(insert, table1))
 -                                                  .build();
++                                                  
.using(String.format(insert, table1)).build();
+ 
+         CQLSSTableWriter writer2 = CQLSSTableWriter.builder()
+                                                    .inDirectory(dataDir)
+                                                    
.forTable(String.format(schema, table2))
 -                                                   
.using(String.format(insert, table2))
 -                                                   .build();
++                                                   
.using(String.format(insert, table2)).build();
+ 
+         writer.addRow(0, "A", 0);
+         writer2.addRow(0, "A", 0);
+         writer.addRow(1, "B", 1);
+         writer2.addRow(1, "B", 1);
+         writer.close();
+         writer2.close();
+ 
+         BiPredicate<File, String> filter = (dir, name) -> 
name.endsWith("-Data.db");
+ 
+         File[] dataFiles = dataDir.tryList(filter);
+         assertEquals(2, dataFiles.length);
+     }
+ 
+     @Test
+     public void testWriteWithSAI() throws Exception
+     {
+         writeWithSaiInternal();
+         writeWithSaiInternal();
+     }
+ 
+     private void writeWithSaiInternal() throws Exception
+     {
+         String schema = "CREATE TABLE " + qualifiedTable + " ("
+                         + "  k int PRIMARY KEY,"
+                         + "  v1 text,"
+                         + "  v2 int )";
+ 
+         String v1Index = "CREATE INDEX idx1 ON " + qualifiedTable + " (v1) 
USING 'sai'";
+         String v2Index = "CREATE INDEX idx2 ON " + qualifiedTable + " (v2) 
USING 'sai'";
+ 
+         String insert = "INSERT INTO " + qualifiedTable + " (k, v1, v2) 
VALUES (?, ?, ?)";
+ 
+         CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                   .inDirectory(dataDir)
+                                                   .forTable(schema)
+                                                   .using(insert)
+                                                   .withIndexes(v1Index, 
v2Index)
+                                                   .withBuildIndexes(true)
+                                                   
.withPartitioner(Murmur3Partitioner.instance)
+                                                   .build();
+ 
+         int rowCount = 30_000;
+         for (int i = 0; i < rowCount; i++)
+             writer.addRow(i, UUID.randomUUID().toString(), i);
+ 
+         writer.close();
+ 
+         File[] dataFiles = dataDir.list(f -> f.name().endsWith('-' + 
BigFormat.Components.DATA.type.repr));
+         assertNotNull(dataFiles);
+ 
+         IndexDescriptor indexDescriptor = 
IndexDescriptor.create(Descriptor.fromFile(dataFiles[0]),
+                                                                  
Murmur3Partitioner.instance,
+                                                                  
Schema.instance.getTableMetadata(keyspace, table).comparator);
+ 
+         assertTrue(indexDescriptor.isPerColumnIndexBuildComplete(new 
IndexIdentifier(keyspace, table, "idx1")));
+         assertTrue(indexDescriptor.isPerColumnIndexBuildComplete(new 
IndexIdentifier(keyspace, table, "idx2")));
+ 
+         if (PathUtils.isDirectory(dataDir.toPath()))
+             PathUtils.forEach(dataDir.toPath(), PathUtils::deleteRecursive);
+     }
+ 
+     @Test
+     public void testSkipBuildingIndexesWithSAI() throws Exception
+     {
+         String schema = "CREATE TABLE " + qualifiedTable + " ("
+                         + "  k int PRIMARY KEY,"
+                         + "  v1 text,"
+                         + "  v2 int )";
+ 
+         String v1Index = "CREATE INDEX idx1 ON " + qualifiedTable + " (v1) 
USING 'sai'";
+         String v2Index = "CREATE INDEX idx2 ON " + qualifiedTable + " (v2) 
USING 'sai'";
+ 
+         String insert = "INSERT INTO " + qualifiedTable + " (k, v1, v2) 
VALUES (?, ?, ?)";
+ 
+         CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                   .inDirectory(dataDir)
+                                                   .forTable(schema)
+                                                   .using(insert)
+                                                   .withIndexes(v1Index, 
v2Index)
+                                                   // not building indexes 
here so no SAI components will be present
+                                                   .withBuildIndexes(false)
+                                                   
.withPartitioner(Murmur3Partitioner.instance)
+                                                   .build();
+ 
+         int rowCount = 30_000;
+         for (int i = 0; i < rowCount; i++)
+             writer.addRow(i, UUID.randomUUID().toString(), i);
+ 
+         writer.close();
+ 
+         File[] dataFiles = dataDir.list(f -> f.name().endsWith('-' + 
BigFormat.Components.DATA.type.repr));
+         assertNotNull(dataFiles);
+ 
+         IndexDescriptor indexDescriptor = 
IndexDescriptor.create(Descriptor.fromFile(dataFiles[0]),
+                                                                  
Murmur3Partitioner.instance,
+                                                                  
Schema.instance.getTableMetadata(keyspace, table).comparator);
+ 
+         // no indexes built due to withBuildIndexes set to false
+         assertFalse(indexDescriptor.isPerColumnIndexBuildComplete(new 
IndexIdentifier(keyspace, table, "idx1")));
+         assertFalse(indexDescriptor.isPerColumnIndexBuildComplete(new 
IndexIdentifier(keyspace, table, "idx2")));
+     }
+ 
 -    protected void loadSSTables(File dataDir, String ksName)
++    protected static void loadSSTables(File dataDir, final String ks, final 
String tb) throws ExecutionException, InterruptedException
      {
 -        ColumnFamilyStore cfs = 
Keyspace.openWithoutSSTables(ksName).getColumnFamilyStore(table);
 -        Set<String> dataFilePaths = Set.of(dataDir.absolutePath());
 -        cfs.importNewSSTables(dataFilePaths, false, false, false,
 -                              false, false, false, false,
 -                              true, false);
 +        SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
 +        {
 +            private String keyspace;
 +
++            @Override
 +            public void init(String keyspace)
 +            {
 +                this.keyspace = keyspace;
-                 for (Range<Token> range : 
StorageService.instance.getLocalReplicas(ks).ranges())
++
++                KeyspaceMetadata keyspaceMetadata = 
Schema.instance.getKeyspaceMetadata(keyspace);
++
++                RangesAtEndpoint addressReplicas = 
keyspaceMetadata.replicationStrategy.getAddressReplicas(ClusterMetadata.current(),
 FBUtilities.getBroadcastAddressAndPort());
++
++                for (Range<Token> range : addressReplicas.ranges())
 +                    addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddressAndPort());
 +            }
 +
-             public TableMetadataRef getTableMetadata(String cfName)
++            @Override
++            public TableMetadataRef getTableMetadata(String tableName)
 +            {
-                 return Schema.instance.getTableMetadataRef(keyspace, cfName);
++                KeyspaceMetadata keyspaceMetadata = 
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspace);
++                TableMetadata tableMetadata = 
keyspaceMetadata.tables.getNullable(tableName);
++                assert tableMetadata != null;
++                return tableMetadata.ref;
 +            }
-         }, new OutputHandler.SystemOutput(false, false));
++        }, new OutputHandler.SystemOutput(false, false), 1, ks, tb);
 +
 +        loader.stream().get();
      }
+ 
+     private class WriterThread extends Thread
+     {
+         private final File dataDir;
+         private final int id;
+         private final String qualifiedTable;
+         public volatile Exception exception;
+ 
+         public WriterThread(File dataDir, int id, String qualifiedTable)
+         {
+             this.dataDir = dataDir;
+             this.id = id;
+             this.qualifiedTable = qualifiedTable;
+         }
+ 
+         @Override
+         public void run()
+         {
+             String schema = "CREATE TABLE " + qualifiedTable + " ("
+                             + "  k int,"
+                             + "  v int,"
+                             + "  PRIMARY KEY (k, v)"
+                             + ")";
+             String insert = "INSERT INTO " + qualifiedTable + " (k, v) VALUES 
(?, ?)";
+             CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                       .inDirectory(dataDir)
+                                                       .forTable(schema)
+                                                       .using(insert).build();
+ 
+             try
+             {
+                 for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
+                 {
+                     writer.addRow(id, i);
+                 }
+                 writer.close();
+             }
+             catch (Exception e)
+             {
+                 exception = e;
+             }
+         }
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to