This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 82b3c0a  CASSANDRA-18692 Fix bulk writes with Buffered RowBufferMode
82b3c0a is described below

commit 82b3c0a79c9322142738a4ec2ff7d4d4c0be2370
Author: Francisco Guerrero <[email protected]>
AuthorDate: Tue Jul 25 12:41:10 2023 -0700

    CASSANDRA-18692 Fix bulk writes with Buffered RowBufferMode
    
    When setting Buffered RowBufferMode as part of the `WriterOption`s,
    `org.apache.cassandra.spark.bulkwriter.RecordWriter` ignores that 
configuration and instead
    uses the batch size to determine when to finalize an SSTable and start 
writing a new SSTable,
    if more rows are available.
    
    In this commit, we fix 
`org.apache.cassandra.spark.bulkwriter.RecordWriter#checkBatchSize`
    to take into account the configured `RowBufferMode`. And in specific to the 
case of the
    `UNBUFFERED` RowBufferMode, we check then the batchSize of the SSTable 
during writes, and for
    the case of `BUFFERED` that check will take no effect.
    
    Co-authored-by: Doug Rohrer <[email protected]>
    
    Patch by Francisco Guerrero, Doug Rohrer; Reviewed by Dinesh Joshi, Yifan 
Cai for CASSANDRA-18692
---
 CHANGES.txt                                        |   1 +
 build.gradle                                       |   8 +-
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |   1 +
 .../bulkwriter/CassandraBulkWriterContext.java     |  15 --
 .../spark/bulkwriter/CassandraJobInfo.java         |   2 +
 .../apache/cassandra/spark/bulkwriter/JobInfo.java |   3 +
 .../cassandra/spark/bulkwriter/RecordWriter.java   |  34 +--
 .../cassandra/spark/bulkwriter/SSTableWriter.java  |  19 +-
 .../spark/bulkwriter/SSTableWriterFactory.java     |   5 +-
 .../cassandra/spark/data/CassandraDataLayer.java   | 243 +++++++++++----------
 .../clients/SidecarNativeLibrariesTest.java}       |  22 +-
 .../spark/bulkwriter/BulkSparkConfTest.java        |   3 +-
 .../spark/bulkwriter/CassandraRingMonitorTest.java |   2 +-
 .../spark/bulkwriter/MockBulkWriterContext.java    |  21 +-
 .../bulkwriter/NonValidatingTestSSTableWriter.java |   4 +-
 .../spark/bulkwriter/RecordWriterTest.java         |  60 ++++-
 .../cassandra/spark/bulkwriter/RingUtils.java      |  34 ++-
 .../spark/bulkwriter/SSTableWriterTest.java        |   6 +-
 .../bulkwriter/StreamSessionConsistencyTest.java   |   2 -
 .../spark/bulkwriter/StreamSessionTest.java        |   2 +-
 .../spark/bulkwriter/TokenPartitionerTest.java     |  12 +-
 .../apache/cassandra/bridge/CassandraBridge.java   |   2 +-
 .../apache/cassandra/bridge}/RowBufferMode.java    |  17 +-
 cassandra-four-zero/build.gradle                   |  11 +-
 .../bridge/CassandraBridgeImplementation.java      |   4 +-
 .../bridge/SSTableWriterImplementation.java        |  47 +++-
 .../bridge/SSTableWriterImplementationTest.java    | 103 +++++++++
 .../apache/cassandra/utils/ReflectionUtils.java    |  65 ++++++
 28 files changed, 506 insertions(+), 242 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index afbb30a..41e2b04 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Fix bulk writes with Buffered RowBufferMode (CASSANDRA-18692)
  * Minor Refactoring to Improve Code Reusability (CASSANDRA-18684)
  * Fix cassandra-analytics-core-example (CASSANDRA-18662)
  * Added caching of Node Settings to improve efficiency (CASSANDRA-18633)
diff --git a/build.gradle b/build.gradle
index 5772b48..837c6fe 100644
--- a/build.gradle
+++ b/build.gradle
@@ -20,16 +20,10 @@
 import java.nio.file.Files
 import java.nio.file.Paths
 
-buildscript {
-  dependencies {
-    classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow', 
version: '6.1.0')
-  }
-}
-
 plugins {
   id 'java'
   id 'java-library'
-  id 'com.github.johnrengelman.shadow' version '5.1.0'
+  id 'com.github.johnrengelman.shadow' version '7.1.2'
 
   // Release Audit Tool (RAT) plugin for checking project licenses
   id("org.nosphere.apache.rat") version "0.8.0"
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index a719df7..798f259 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.clients.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index abe4ef8..3789b63 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -30,7 +30,6 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoSerializable;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import io.netty.channel.EventLoopGroup;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
@@ -57,11 +56,6 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
     private final CassandraClusterInfo clusterInfo;
     private final SchemaInfo schemaInfo;
 
-    static
-    {
-        configureRelocatedNetty();
-    }
-
     private CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
                                        @NotNull StructType dfSchema,
                                        SparkContext sparkContext)
@@ -119,15 +113,6 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
         }
     }
 
-    // If using the shaded JAR, Configure our shaded Netty so it can find the 
correct native libraries
-    private static void configureRelocatedNetty()
-    {
-        if (EventLoopGroup.class.getName().startsWith("analytics"))
-        {
-            System.setProperty("analytics.io.netty.packagePrefix", 
"analytics");
-        }
-    }
-
     /**
      * Use the implementation of the KryoSerializable interface as a detection 
device to make sure the Spark Bulk
      * Writer's KryoRegistrator is properly in place.
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 3151d6e..5840c8a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.util.UUID;
 
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.jetbrains.annotations.NotNull;
 
@@ -50,6 +51,7 @@ public class CassandraJobInfo implements JobInfo
         return conf.localDC;
     }
 
+    @NotNull
     @Override
     public RowBufferMode getRowBufferMode()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 945012b..f7ac149 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -22,7 +22,9 @@ package org.apache.cassandra.spark.bulkwriter;
 import java.io.Serializable;
 import java.util.UUID;
 
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.jetbrains.annotations.NotNull;
 
 public interface JobInfo extends Serializable
 {
@@ -31,6 +33,7 @@ public interface JobInfo extends Serializable
 
     String getLocalDC();
 
+    @NotNull
     RowBufferMode getRowBufferMode();
 
     int getSstableDataSizeInMB();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 32b8632..426cb8a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -41,6 +41,7 @@ import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.spark.InterruptibleIterator;
 import org.apache.spark.TaskContext;
@@ -93,7 +94,7 @@ public class RecordWriter implements Serializable
         TaskContext taskContext = taskContextSupplier.get();
         LOGGER.info("[{}]: Processing Bulk Writer partition", 
taskContext.partitionId());
         scala.collection.Iterator<scala.Tuple2<DecoratedKey, Object[]>> 
dataIterator =
-                new InterruptibleIterator<>(taskContext, 
asScalaIterator(sourceIterator));
+        new InterruptibleIterator<>(taskContext, 
asScalaIterator(sourceIterator));
         StreamSession streamSession = createStreamSession(taskContext);
         validateAcceptableTimeSkewOrThrow(streamSession.replicas);
         int partitionId = taskContext.partitionId();
@@ -114,9 +115,9 @@ public class RecordWriter implements Serializable
                 checkBatchSize(streamSession, partitionId, job);
             }
 
-            if (batchSize != 0)
+            if (sstableWriter != null)
             {
-               finalizeSSTable(streamSession, partitionId, sstableWriter, 
batchNumber, batchSize);
+                finalizeSSTable(streamSession, partitionId, sstableWriter, 
batchNumber, batchSize);
             }
 
             LOGGER.info("[{}] Done with all writers and waiting for stream to 
complete", partitionId);
@@ -124,6 +125,11 @@ public class RecordWriter implements Serializable
         }
         catch (Exception exception)
         {
+            LOGGER.error("[{}] Failed to write job={}, 
taskStageAttemptNumber={}, taskAttemptNumber={}",
+                         partitionId,
+                         job.getId().toString(),
+                         taskContext.stageAttemptNumber(),
+                         taskContext.attemptNumber());
             throw new RuntimeException(exception);
         }
     }
@@ -137,8 +143,8 @@ public class RecordWriter implements Serializable
         if (localNow.isBefore(remoteNow.minus(range)) || 
localNow.isAfter(remoteNow.plus(range)))
         {
             String message = String.format("Time skew between Spark and 
Cassandra is too large. "
-                                         + "Allowable skew is %d minutes. "
-                                         + "Spark executor time is %s, 
Cassandra instance time is %s",
+                                           + "Allowable skew is %d minutes. "
+                                           + "Spark executor time is %s, 
Cassandra instance time is %s",
                                            
timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow);
             throw new UnsupportedOperationException(message);
         }
@@ -167,20 +173,22 @@ public class RecordWriter implements Serializable
         }
     }
 
-    public void checkBatchSize(StreamSession streamSession, int partitionId, 
JobInfo job) throws IOException
+    void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo 
job) throws IOException
     {
-        batchSize++;
-        if (batchSize > job.getSstableBatchSize())
+        if (job.getRowBufferMode() == RowBufferMode.UNBUFFERED)
         {
-            finalizeSSTable(streamSession, partitionId, sstableWriter, 
batchNumber, batchSize);
-
-            sstableWriter = null;
-            batchSize = 0;
+            batchSize++;
+            if (batchSize >= job.getSstableBatchSize())
+            {
+                finalizeSSTable(streamSession, partitionId, sstableWriter, 
batchNumber, batchSize);
 
+                sstableWriter = null;
+                batchSize = 0;
+            }
         }
     }
 
-    public void maybeCreateTableWriter(int partitionId, Path baseDir) throws 
IOException
+    void maybeCreateTableWriter(int partitionId, Path baseDir) throws 
IOException
     {
         if (sstableWriter == null)
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index 9d2ebc9..6b012db 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -74,15 +74,14 @@ public class SSTableWriter
         LOGGER.info("Running with version " + packageVersion);
 
         TableSchema tableSchema = writerContext.schema().getTableSchema();
-        boolean sorted = writerContext.job().getRowBufferMode() == 
RowBufferMode.UNBUFFERED;
         this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter(
-                
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
-                this.outDir.toString(),
-                writerContext.cluster().getPartitioner().toString(),
-                tableSchema.createStatement,
-                tableSchema.modificationStatement,
-                sorted,
-                writerContext.job().getSstableDataSizeInMB());
+        
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
+        this.outDir.toString(),
+        writerContext.cluster().getPartitioner().toString(),
+        tableSchema.createStatement,
+        tableSchema.modificationStatement,
+        writerContext.job().getRowBufferMode(),
+        writerContext.job().getSstableDataSizeInMB());
     }
 
     @NotNull
@@ -151,9 +150,9 @@ public class SSTableWriter
     {
         Map<Path, MD5Hash> fileHashes = new HashMap<>();
         try (DirectoryStream<Path> filesToHash =
-                Files.newDirectoryStream(dataFile.getParent(), 
SSTables.getSSTableBaseName(dataFile) + "*"))
+             Files.newDirectoryStream(dataFile.getParent(), 
SSTables.getSSTableBaseName(dataFile) + "*"))
         {
-            for (Path path: filesToHash)
+            for (Path path : filesToHash)
             {
                 fileHashes.put(path, calculateFileHash(path));
             }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
index 832e370..50c5d47 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.bridge.SSTableWriter;
 
 public final class SSTableWriterFactory
@@ -36,7 +37,7 @@ public final class SSTableWriterFactory
                                                  String partitioner,
                                                  String createStatement,
                                                  String insertStatement,
-                                                 boolean sorted,
+                                                 RowBufferMode rowBufferMode,
                                                  int bufferSizeMB)
     {
         CassandraBridge cassandraBridge = 
CassandraBridgeFactory.get(serverVersion);
@@ -44,7 +45,7 @@ public final class SSTableWriterFactory
                                                 partitioner,
                                                 createStatement,
                                                 insertStatement,
-                                                sorted,
+                                                rowBufferMode,
                                                 bufferSizeMB);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index b4104bd..39c69c1 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -109,10 +109,10 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
 
     public static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraDataLayer.class);
     private static final Cache<String, CompletableFuture<List<SSTable>>> 
SNAPSHOT_CACHE =
-            CacheBuilder.newBuilder()
-                        .expireAfterAccess(15, TimeUnit.MINUTES)
-                        .maximumSize(128)
-                        .build();
+    CacheBuilder.newBuilder()
+                .expireAfterAccess(15, TimeUnit.MINUTES)
+                .maximumSize(128)
+                .build();
 
     protected String snapshotName;
     protected String keyspace;
@@ -158,25 +158,25 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
     // For serialization
     @VisibleForTesting
     // CHECKSTYLE IGNORE: Constructor with many parameters
-    CassandraDataLayer(@Nullable String keyspace,
-                       @Nullable String table,
-                       @NotNull String snapshotName,
-                       @Nullable String datacenter,
-                       @NotNull Sidecar.ClientConfig sidecarClientConfig,
-                       @Nullable SslConfig sslConfig,
-                       @NotNull CqlTable cqlTable,
-                       @NotNull TokenPartitioner tokenPartitioner,
-                       @NotNull CassandraVersion version,
-                       @NotNull ConsistencyLevel consistencyLevel,
-                       @NotNull Set<SidecarInstanceImpl> clusterConfig,
-                       @NotNull Map<String, 
PartitionedDataLayer.AvailabilityHint> availabilityHints,
-                       @NotNull Map<String, BigNumberConfigImpl> 
bigNumberConfigMap,
-                       boolean enableStats,
-                       boolean readIndexOffset,
-                       boolean useIncrementalRepair,
-                       @Nullable String lastModifiedTimestampField,
-                       List<SchemaFeature> requestedFeatures,
-                       @NotNull Map<String, ReplicationFactor> rfMap)
+    protected CassandraDataLayer(@Nullable String keyspace,
+                                 @Nullable String table,
+                                 @NotNull String snapshotName,
+                                 @Nullable String datacenter,
+                                 @NotNull Sidecar.ClientConfig 
sidecarClientConfig,
+                                 @Nullable SslConfig sslConfig,
+                                 @NotNull CqlTable cqlTable,
+                                 @NotNull TokenPartitioner tokenPartitioner,
+                                 @NotNull CassandraVersion version,
+                                 @NotNull ConsistencyLevel consistencyLevel,
+                                 @NotNull Set<SidecarInstanceImpl> 
clusterConfig,
+                                 @NotNull Map<String, 
PartitionedDataLayer.AvailabilityHint> availabilityHints,
+                                 @NotNull Map<String, BigNumberConfigImpl> 
bigNumberConfigMap,
+                                 boolean enableStats,
+                                 boolean readIndexOffset,
+                                 boolean useIncrementalRepair,
+                                 @Nullable String lastModifiedTimestampField,
+                                 List<SchemaFeature> requestedFeatures,
+                                 @NotNull Map<String, ReplicationFactor> rfMap)
     {
         super(consistencyLevel, datacenter);
         this.snapshotName = snapshotName;
@@ -214,7 +214,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         clusterConfig = initializeClusterConfig(options);
         initInstanceMap();
 
-        // Get cluster info from CassandraManager
+        // Get cluster info from Cassandra Sidecar
         int effectiveNumberOfCores;
         CompletableFuture<RingResponse> ringFuture = sidecar.ring(keyspace);
         try
@@ -271,8 +271,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         ReplicationFactor replicationFactor = 
CqlUtils.extractReplicationFactor(fullSchema, keyspace);
         rfMap = ImmutableMap.of(keyspace, replicationFactor);
         CompletableFuture<Integer> sizingFuture = 
CompletableFuture.supplyAsync(
-                () -> getSizing(clusterConfig, replicationFactor, 
options).getEffectiveNumberOfCores(),
-                ExecutorHolder.EXECUTOR_SERVICE);
+        () -> getSizing(clusterConfig, replicationFactor, 
options).getEffectiveNumberOfCores(),
+        ExecutorHolder.EXECUTOR_SERVICE);
         validateReplicationFactor(replicationFactor);
         udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt));
 
@@ -296,8 +296,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
             else
             {
                 LOGGER.warn("Skipping clearing snapshot because it was not 
created by this job. "
-                          + "Only the job that created the snapshot can clear 
it. "
-                          + "snapshotName={} keyspace={} table={} dc={}",
+                            + "Only the job that created the snapshot can 
clear it. "
+                            + "snapshotName={} keyspace={} table={} dc={}",
                             snapshotName, keyspace, table, datacenter);
             }
         }
@@ -317,57 +317,57 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints = 
new ConcurrentHashMap<>(ring.size());
 
         // Fire off create snapshot request across the entire cluster
-        List<CompletableFuture<Void>> futures = ring
-                .stream()
-                .filter(ringEntry -> datacenter == null || 
datacenter.equals(ringEntry.datacenter()))
-                .map(ringEntry -> {
-                    PartitionedDataLayer.AvailabilityHint hint =
-                            
PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(), 
ringEntry.state());
-
-                    CompletableFuture<PartitionedDataLayer.AvailabilityHint> 
createSnapshotFuture;
-                    if (NODE_STATUS_NOT_CONSIDERED.contains(ringEntry.state()))
-                    {
-                        LOGGER.warn("Skip snapshot creating when node is 
joining or down "
-                                  + "snapshotName={} keyspace={} table={} 
datacenter={} fqdn={} status={} state={}",
-                                    snapshotName, keyspace, table, datacenter, 
ringEntry.fqdn(), ringEntry.status(), ringEntry.state());
-                        createSnapshotFuture = 
CompletableFuture.completedFuture(hint);
-                    }
-                    else
-                    {
-                        LOGGER.info("Creating snapshot on instance 
snapshotName={} keyspace={} table={} datacenter={} fqdn={}",
-                                    snapshotName, keyspace, table, datacenter, 
ringEntry.fqdn());
-                        SidecarInstance sidecarInstance = new 
SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.port());
-                        createSnapshotFuture = sidecar
-                                .createSnapshot(sidecarInstance, keyspace, 
table, snapshotName)
-                                .handle((resp, throwable) -> {
-                                    if (throwable == null)
-                                    {
-                                        // Create snapshot succeeded
-                                        return hint;
-                                    }
-
-                                    if (isExhausted(throwable))
-                                    {
-                                        LOGGER.warn("Failed to create snapshot 
on instance", throwable);
-                                        return 
PartitionedDataLayer.AvailabilityHint.DOWN;
-                                    }
-
-                                    LOGGER.error("Unexpected error creating 
snapshot on instance", throwable);
-                                    return 
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
-                                });
-                    }
-
-                    return createSnapshotFuture
-                           .thenAccept(h -> 
availabilityHints.put(ringEntry.fqdn(), h));
-                })
-                .collect(Collectors.toList());
+        List<CompletableFuture<Void>> futures =
+        ring.stream()
+            .filter(ringEntry -> datacenter == null || 
datacenter.equals(ringEntry.datacenter()))
+            .map(ringEntry -> {
+                PartitionedDataLayer.AvailabilityHint hint =
+                
PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(), 
ringEntry.state());
+
+                CompletableFuture<PartitionedDataLayer.AvailabilityHint> 
createSnapshotFuture;
+                if (NODE_STATUS_NOT_CONSIDERED.contains(ringEntry.state()))
+                {
+                    LOGGER.warn("Skip snapshot creating when node is joining 
or down "
+                                + "snapshotName={} keyspace={} table={} 
datacenter={} fqdn={} status={} state={}",
+                                snapshotName, keyspace, table, datacenter, 
ringEntry.fqdn(), ringEntry.status(), ringEntry.state());
+                    createSnapshotFuture = 
CompletableFuture.completedFuture(hint);
+                }
+                else
+                {
+                    LOGGER.info("Creating snapshot on instance snapshotName={} 
keyspace={} table={} datacenter={} fqdn={}",
+                                snapshotName, keyspace, table, datacenter, 
ringEntry.fqdn());
+                    SidecarInstance sidecarInstance = new 
SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.port());
+                    createSnapshotFuture = sidecar
+                                           .createSnapshot(sidecarInstance, 
keyspace, table, snapshotName)
+                                           .handle((resp, throwable) -> {
+                                               if (throwable == null)
+                                               {
+                                                   // Create snapshot succeeded
+                                                   return hint;
+                                               }
+
+                                               if (isExhausted(throwable))
+                                               {
+                                                   LOGGER.warn("Failed to 
create snapshot on instance", throwable);
+                                                   return 
PartitionedDataLayer.AvailabilityHint.DOWN;
+                                               }
+
+                                               LOGGER.error("Unexpected error 
creating snapshot on instance", throwable);
+                                               return 
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
+                                           });
+                }
+
+                return createSnapshotFuture
+                       .thenAccept(h -> 
availabilityHints.put(ringEntry.fqdn(), h));
+            })
+            .collect(Collectors.toList());
 
         return CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[0]))
                .handle((results, throwable) -> availabilityHints);
     }
 
-    private boolean isExhausted(@Nullable Throwable throwable)
+    protected boolean isExhausted(@Nullable Throwable throwable)
     {
         return throwable != null && (throwable instanceof 
RetriesExhaustedException || isExhausted(throwable.getCause()));
     }
@@ -511,14 +511,14 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         }
         String key = snapshotKey(sidecarInstance);  // NOTE: We don't 
currently support token filtering in list snapshot
         LOGGER.info("Listing snapshot partition={} lowerBound={} upperBound={} 
"
-                  + "instance={} port={} keyspace={} tableName={} 
snapshotName={}",
+                    + "instance={} port={} keyspace={} tableName={} 
snapshotName={}",
                     partitionId, range.lowerEndpoint(), range.upperEndpoint(),
                     sidecarInstance.hostname(), sidecarInstance.port(), 
keyspace, table, snapshotName);
         try
         {
             return SNAPSHOT_CACHE.get(key, () -> {
                 LOGGER.info("Listing instance snapshot partition={} 
lowerBound={} upperBound={} "
-                          + "instance={} port={} keyspace={} tableName={} 
snapshotName={} cacheKey={}",
+                            + "instance={} port={} keyspace={} tableName={} 
snapshotName={} cacheKey={}",
                             partitionId, range.lowerEndpoint(), 
range.upperEndpoint(),
                             sidecarInstance.hostname(), 
sidecarInstance.port(), keyspace, table, snapshotName, key);
                 return sidecar.listSnapshotFiles(sidecarInstance, keyspace, 
table, snapshotName)
@@ -574,16 +574,16 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
 
         // Map to SSTable
         return result.values().stream()
-                              .map(components -> new 
SidecarProvisionedSSTable(sidecar,
-                                                                               
sidecarClientConfig,
-                                                                               
sidecarInstance,
-                                                                               
keyspace,
-                                                                               
table,
-                                                                               
snapshotName,
-                                                                               
components,
-                                                                               
partitionId,
-                                                                               
stats()))
-                              .collect(Collectors.toList());
+                     .map(components -> new SidecarProvisionedSSTable(sidecar,
+                                                                      
sidecarClientConfig,
+                                                                      
sidecarInstance,
+                                                                      keyspace,
+                                                                      table,
+                                                                      
snapshotName,
+                                                                      
components,
+                                                                      
partitionId,
+                                                                      stats()))
+                     .collect(Collectors.toList());
     }
 
     @Override
@@ -606,10 +606,10 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
 
         CassandraDataLayer that = (CassandraDataLayer) other;
         return cqlTable.equals(that.cqlTable)
-            && snapshotName.equals(that.snapshotName)
-            && keyspace.equals(that.keyspace)
-            && table.equals(that.table)
-            && version().equals(that.version());
+               && snapshotName.equals(that.snapshotName)
+               && keyspace.equals(that.keyspace)
+               && table.equals(that.table)
+               && version().equals(that.version());
     }
 
     public Map<String, BigNumberConfigImpl> bigNumberConfigMap()
@@ -632,9 +632,10 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
                                                      RingResponse ring)
     {
         Collection<CassandraInstance> instances = ring
-                .stream()
-                .map(status -> new CassandraInstance(status.token(), 
status.fqdn(), status.datacenter()))
-                .collect(Collectors.toList());
+                                                  .stream()
+                                                  .filter(status -> datacenter 
== null || datacenter.equalsIgnoreCase(status.datacenter()))
+                                                  .map(status -> new 
CassandraInstance(status.token(), status.fqdn(), status.datacenter()))
+                                                  
.collect(Collectors.toList());
         return new CassandraRing(partitioner, keyspace, replicationFactor, 
instances);
     }
 
@@ -800,35 +801,35 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         {
             LOGGER.info("Deserializing CassandraDataLayer with Kryo");
             return new CassandraDataLayer(
-                    in.readString(),
-                    in.readString(),
-                    in.readString(),
-                    in.readString(),
-                    Sidecar.ClientConfig.create(in.readInt(),
-                                                in.readInt(),
-                                                in.readLong(),
-                                                in.readLong(),
-                                                in.readLong(),
-                                                in.readLong(),
-                                                in.readInt(),
-                                                in.readInt(),
-                                                (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class),
-                                                (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class)),
-                    kryo.readObjectOrNull(in, SslConfig.class),
-                    kryo.readObject(in, CqlTable.class),
-                    kryo.readObject(in, TokenPartitioner.class),
-                    kryo.readObject(in, CassandraVersion.class),
-                    kryo.readObject(in, ConsistencyLevel.class),
-                    kryo.readObject(in, HashSet.class),
-                    (Map<String, PartitionedDataLayer.AvailabilityHint>) 
kryo.readObject(in, HashMap.class),
-                    in.readBoolean() ? Collections.emptyMap()
-                                     : (Map<String, BigNumberConfigImpl>) 
kryo.readObject(in, HashMap.class),
-                    in.readBoolean(),
-                    in.readBoolean(),
-                    in.readBoolean(),
-                    in.readString(),
-                    kryo.readObject(in, 
SchemaFeaturesListWrapper.class).toList(),
-                    kryo.readObject(in, HashMap.class));
+            in.readString(),
+            in.readString(),
+            in.readString(),
+            in.readString(),
+            Sidecar.ClientConfig.create(in.readInt(),
+                                        in.readInt(),
+                                        in.readLong(),
+                                        in.readLong(),
+                                        in.readLong(),
+                                        in.readLong(),
+                                        in.readInt(),
+                                        in.readInt(),
+                                        (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class),
+                                        (Map<FileType, Long>) 
kryo.readObject(in, HashMap.class)),
+            kryo.readObjectOrNull(in, SslConfig.class),
+            kryo.readObject(in, CqlTable.class),
+            kryo.readObject(in, TokenPartitioner.class),
+            kryo.readObject(in, CassandraVersion.class),
+            kryo.readObject(in, ConsistencyLevel.class),
+            kryo.readObject(in, HashSet.class),
+            (Map<String, PartitionedDataLayer.AvailabilityHint>) 
kryo.readObject(in, HashMap.class),
+            in.readBoolean() ? Collections.emptyMap()
+                             : (Map<String, BigNumberConfigImpl>) 
kryo.readObject(in, HashMap.class),
+            in.readBoolean(),
+            in.readBoolean(),
+            in.readBoolean(),
+            in.readString(),
+            kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
+            kryo.readObject(in, HashMap.class));
         }
 
         // Wrapper only used internally for Kryo serialization/deserialization
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarNativeLibrariesTest.java
similarity index 58%
copy from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
copy to 
cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarNativeLibrariesTest.java
index 5d30ad8..454fdf3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarNativeLibrariesTest.java
@@ -17,10 +17,24 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.bulkwriter;
+package org.apache.cassandra.clients;
 
-public enum RowBufferMode
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.io.vertx.core.net.OpenSSLEngineOptions;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests to ensure that Sidecar client native libraries are loaded 
correctly
+ */
+public class SidecarNativeLibrariesTest
 {
-    BUFFERED,
-    UNBUFFERED
+    @DisplayName("Ensures that the shading is correct for the 
vertx-client-shaded project")
+    @Test
+    void openSslIsAvailable()
+    {
+        assertTrue(OpenSSLEngineOptions.isAvailable());
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index 0877d23..8a86a09 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
 import org.apache.cassandra.spark.utils.BuildInfo;
 import org.apache.spark.SparkConf;
@@ -237,7 +238,7 @@ public class BulkSparkConfTest
         options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid");
         IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
                                                           () -> new 
BulkSparkConf(sparkConf, options));
-        assertEquals("Key row buffering mode with value invalid is not a valid 
Enum of type class org.apache.cassandra.spark.bulkwriter.RowBufferMode.",
+        assertEquals("Key row buffering mode with value invalid is not a valid 
Enum of type class org.apache.cassandra.bridge.RowBufferMode.",
                      exception.getMessage());
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
index d86506f..d64b3a7 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
@@ -132,6 +132,6 @@ public class CassandraRingMonitorTest
 
     private CassandraRing<RingInstance> buildRing(int initialToken)
     {
-        return RingUtils.buildRing(initialToken, "dev3", "dev3", "DEV", 
"test", 3);
+        return RingUtils.buildRing(initialToken, "DEV", "test", 3);
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 7d1908b..b6d3ff5 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -33,9 +33,11 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
@@ -50,6 +52,7 @@ import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.NotNull;
 
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -61,6 +64,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     private static final long serialVersionUID = -2912371629236770646L;
     private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED;
     private ConsistencyLevel.CL consistencyLevel;
+    private int sstableDataSizeInMB = 128;
 
     public interface CommitResultSupplier extends BiFunction<List<String>, 
String, RemoteCommitResult>
     {
@@ -96,9 +100,9 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         this.cassandraVersion = cassandraVersion;
         this.consistencyLevel = consistencyLevel;
         validPair = TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns(
-                new String[]{"id", "date", "course", "marks"},
-                new 
org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, 
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
-                new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), 
mockCqlType(VARCHAR), mockCqlType(INT)});
+        new String[]{"id", "date", "course", "marks"},
+        new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, 
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
+        new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), 
mockCqlType(VARCHAR), mockCqlType(INT)});
         StructType validDataFrameSchema = validPair.getKey();
         ImmutableMap<String, CqlField.CqlType> validCqlColumns = 
validPair.getValue();
         String[] partitionKeyColumns = {"id", "date"};
@@ -173,6 +177,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return "DC1";
     }
 
+    @NotNull
     @Override
     public RowBufferMode getRowBufferMode()
     {
@@ -187,13 +192,19 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     @Override
     public int getSstableDataSizeInMB()
     {
-        return 128;
+        return sstableDataSizeInMB;
+    }
+
+    @VisibleForTesting
+    void setSstableDataSizeInMB(int sstableDataSizeInMB)
+    {
+        this.sstableDataSizeInMB = sstableDataSizeInMB;
     }
 
     @Override
     public int getSstableBatchSize()
     {
-        return 1;
+        return 2;
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
index 1d1c167..1be5c2c 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.nio.file.Path;
 
+import org.jetbrains.annotations.NotNull;
+
 class NonValidatingTestSSTableWriter extends SSTableWriter
 {
     NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path)
@@ -29,7 +31,7 @@ class NonValidatingTestSSTableWriter extends SSTableWriter
     }
 
     @Override
-    public void validateSSTables(BulkWriterContext writerContext, int 
partitionId)
+    public void validateSSTables(@NotNull BulkWriterContext writerContext, int 
partitionId)
     {
         // Skip validation for these tests
     }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 4dd8841..940f5a2 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -31,9 +31,11 @@ import java.util.stream.Stream;
 
 import com.google.common.collect.Range;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import scala.Tuple2;
@@ -68,7 +70,7 @@ public class RecordWriterTest
     public void setUp()
     {
         tw = new MockTableWriter(folder);
-        ring = RingUtils.buildRing(0, "app", "cluster", "DC1", "test", 12);
+        ring = RingUtils.buildRing(0, "DC1", "test", 12);
         writerContext = new MockBulkWriterContext(ring);
         tc = new TestTaskContext();
         range = 
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
@@ -85,7 +87,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithConstantTTL()
     {
-        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
         validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
     }
@@ -93,7 +95,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithTTLColumn()
     {
-        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, false);
         String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
         validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
@@ -102,7 +104,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithConstantTimestamp()
     {
-        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
         validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
     }
@@ -110,7 +112,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithTimestampColumn()
     {
-        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, true);
         String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", 
"timestamp"};
         validateSuccessfulWrite(bulkWriterContext, data, 
columnNamesWithTimestamp);
@@ -119,7 +121,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithTimestampAndTTLColumn()
     {
-        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, true);
         String[] columnNames = {"id", "date", "course", "marks", "ttl", 
"timestamp"};
         validateSuccessfulWrite(bulkWriterContext, data, columnNames);
@@ -128,7 +130,7 @@ public class RecordWriterTest
     @Test
     public void testCorruptSSTable()
     {
-        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) ->  new SSTableWriter(tw.setOutDir(path), path));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw.setOutDir(path), path));
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
         // TODO: Add better error handling with human-readable exception 
messages in SSTableReader::new
         // That way we can assert on the exception thrown here
@@ -143,7 +145,7 @@ public class RecordWriterTest
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
         assertThat(ex.getMessage(),
                    matchesPattern("java.lang.IllegalStateException: Received 
Token "
-                              + "5765203080415074583 outside of expected range 
\\[-9223372036854775808(‥|..)0]"));
+                                  + "5765203080415074583 outside of expected 
range \\[-9223372036854775808(‥|..)0]"));
     }
 
     @Test
@@ -180,17 +182,46 @@ public class RecordWriterTest
         rw.write(data);
     }
 
+    @DisplayName("Write 20 rows, in unbuffered mode with BATCH_SIZE of 2")
+    @Test()
+    void writeUnbuffered()
+    {
+        int numberOfRows = 20;
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = 
generateData(numberOfRows, true);
+        validateSuccessfulWrite(writerContext, data, COLUMN_NAMES, (int) 
Math.ceil(numberOfRows / writerContext.getSstableBatchSize()));
+    }
+
+    @DisplayName("Write 20 rows, in buffered mode with SSTABLE_DATA_SIZE_IN_MB 
of 10")
+    @Test()
+    void writeBuffered()
+    {
+        int numberOfRows = 20;
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = 
generateData(numberOfRows, true);
+        writerContext.setRowBufferMode(RowBufferMode.BUFFERED);
+        writerContext.setSstableDataSizeInMB(10);
+        // only a single data sstable file is created
+        validateSuccessfulWrite(writerContext, data, COLUMN_NAMES, 1);
+    }
+
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
                                          String[] columnNames)
+    {
+        validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
+    }
+
+    private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
+                                         Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
+                                         String[] columnNames,
+                                         int uploadedTables)
     {
         RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
         rw.write(data);
         Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
         assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
-        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * uploadedTables));
         List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
-        for (UploadRequest ur: requests)
+        for (UploadRequest ur : requests)
         {
             assertNotNull(ur.fileHash);
         }
@@ -200,6 +231,7 @@ public class RecordWriterTest
     {
         return generateData(numValues, onlyInRange, false, false);
     }
+
     private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int 
numValues, boolean onlyInRange, boolean withTTL, boolean withTimestamp)
     {
         Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0, 
integer -> integer + 1).mapToObj(index -> {
@@ -226,6 +258,12 @@ public class RecordWriterTest
         {
             source = source.filter(val -> range.contains(val._1.getToken()));
         }
-        return source.limit(numValues).iterator();
+        Stream<Tuple2<DecoratedKey, Object[]>> limitedStream = 
source.limit(numValues);
+        if (onlyInRange)
+        {
+            return limitedStream.sorted((o1, o2) -> o1._1.compareTo(o2._1))
+                                .iterator();
+        }
+        return limitedStream.iterator();
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
index 4916cb8..12207dd 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
@@ -40,30 +40,24 @@ public final class RingUtils
 
     @NotNull
     static CassandraRing<RingInstance> buildRing(int initialToken,
-                                                 String app,
-                                                 String cluster,
                                                  String dataCenter,
                                                  String keyspace)
     {
-        return buildRing(initialToken, app, cluster, dataCenter, keyspace, 3);
+        return buildRing(initialToken, dataCenter, keyspace, 3);
     }
 
     @NotNull
     public static CassandraRing<RingInstance> buildRing(int initialToken,
-                                                        String app,
-                                                        String cluster,
                                                         String dataCenter,
                                                         String keyspace,
                                                         int instancesPerDC)
     {
         ImmutableMap<String, Integer> rfByDC = ImmutableMap.of(dataCenter, 3);
-        return buildRing(initialToken, app, cluster, rfByDC, keyspace, 
instancesPerDC);
+        return buildRing(initialToken, rfByDC, keyspace, instancesPerDC);
     }
 
     @NotNull
     static CassandraRing<RingInstance> buildRing(int initialToken,
-                                                 String app,
-                                                 String cluster,
                                                  ImmutableMap<String, Integer> 
rfByDC,
                                                  String keyspace,
                                                  int instancesPerDC)
@@ -77,7 +71,7 @@ public final class RingUtils
     private static ReplicationFactor getReplicationFactor(Map<String, Integer> 
rfByDC)
     {
         ImmutableMap.Builder<String, String> optionsBuilder = 
ImmutableMap.<String, String>builder()
-                .put("class", 
"org.apache.cassandra.locator.NetworkTopologyStrategy");
+                                                                          
.put("class", "org.apache.cassandra.locator.NetworkTopologyStrategy");
         rfByDC.forEach((key, value) -> optionsBuilder.put(key, 
value.toString()));
         return new ReplicationFactor(optionsBuilder.build());
     }
@@ -93,17 +87,17 @@ public final class RingUtils
             for (int instance = 0; instance < instancesPerDc; instance++)
             {
                 instances.add(new RingInstance(new RingEntry.Builder()
-                        .address("127.0." + dcOffset + "." + instance)
-                        .datacenter(datacenter)
-                        .load("0")
-                        .token(Integer.toString(initialToken + dcOffset + 
100_000 * instance))
-                        .fqdn(datacenter + "-i" + instance)
-                        .rack("Rack")
-                        .hostId("")
-                        .status("UP")
-                        .state("NORMAL")
-                        .owns("")
-                        .build()));
+                                               .address("127.0." + dcOffset + 
"." + instance)
+                                               .datacenter(datacenter)
+                                               .load("0")
+                                               
.token(Integer.toString(initialToken + dcOffset + 100_000 * instance))
+                                               .fqdn(datacenter + "-i" + 
instance)
+                                               .rack("Rack")
+                                               .hostId("")
+                                               .status("UP")
+                                               .state("NORMAL")
+                                               .owns("")
+                                               .build()));
             }
             dcOffset++;
         }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
index 18eb482..c9f88b1 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
@@ -74,7 +74,7 @@ public class SSTableWriterTest
     }
 
     @NotNull
-    public CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", "DC1", "test", 12);  // CHECKSTYLE IGNORE: Public mutable field
+    public CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", 
"test", 12);  // CHECKSTYLE IGNORE: Public mutable field
 
     @TempDir
     public Path tmpDir; // CHECKSTYLE IGNORE: Public mutable field for testing
@@ -91,8 +91,8 @@ public class SSTableWriterTest
         try (DirectoryStream<Path> dataFileStream = 
Files.newDirectoryStream(tw.getOutDir(), "*Data.db"))
         {
             dataFileStream.forEach(dataFilePath ->
-                    
assertEquals(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion(),
-                                 
SSTables.cassandraVersionFromTable(dataFilePath).getMajorVersion()));
+                                   
assertEquals(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion(),
+                                                
SSTables.cassandraVersionFromTable(dataFilePath).getMajorVersion()));
         }
         tw.validateSSTables(writerContext, 1);
     }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 642181d..727598b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -60,8 +60,6 @@ public class StreamSessionConsistencyTest
     private static final Range<BigInteger> RANGE = 
Range.range(BigInteger.valueOf(101L), BoundType.CLOSED,
                                                                
BigInteger.valueOf(199L), BoundType.CLOSED);
     private static final CassandraRing<RingInstance> RING = 
RingUtils.buildRing(0,
-                                                                               
 "app",
-                                                                               
 "cluster",
                                                                                
 ImmutableMap.of("DC1", 3, "DC2", 3),
                                                                                
 "test",
                                                                                
 6);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
index 58805e8..92a6763 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
@@ -72,7 +72,7 @@ public class StreamSessionTest
     public void setup()
     {
         range = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, 
BigInteger.valueOf(199L), BoundType.CLOSED);
-        ring = RingUtils.buildRing(0, "app", "cluster", "DC1", "test", 12);
+        ring = RingUtils.buildRing(0, "DC1", "test", 12);
         writerContext = getBulkWriterContext();
         tableWriter = new MockTableWriter(folder);
         executor = new MockScheduledExecutorService();
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
index 6940e50..cea354f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
@@ -44,7 +44,7 @@ public class TokenPartitionerTest
     @Test
     public void testOneSplit()
     {
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", "DC1", "test");
+        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", 
"test");
         partitioner = new TokenPartitioner(ring, 1, 2, 1, false);
         assertEquals(4, partitioner.numPartitions());
         assertEquals(0, getPartitionForToken(new 
BigInteger("-9223372036854775808")));
@@ -58,7 +58,7 @@ public class TokenPartitionerTest
     @Test
     public void testTwoSplits()
     {
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", "DC1", "test");
+        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", 
"test");
         partitioner = new TokenPartitioner(ring, 2, 2, 1, false);
         assertEquals(10, partitioner.numPartitions());
         assertEquals(0, getPartitionForToken(new 
BigInteger("-4611686018427387905")));
@@ -90,7 +90,7 @@ public class TokenPartitionerTest
     @Test
     public void testReplicationFactorInOneDCOnly()
     {
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", ImmutableMap.of("DC1", 3, "DC2", 0), "test", 3);
+        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, 
ImmutableMap.of("DC1", 3, "DC2", 0), "test", 3);
         partitioner = new TokenPartitioner(ring, 1, 2, 1, false);
         assertEquals(4, partitioner.numPartitions());
         assertEquals(0, getPartitionForToken(new 
BigInteger("-9223372036854775808")));
@@ -104,7 +104,7 @@ public class TokenPartitionerTest
     @Test
     public void testSplitCalculationsUsingCores()
     {
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", "DC1", "test");
+        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", 
"test");
         // When passed "-1" for numberSplits, the token partitioner should 
calculate it on its own based on
         // the number of cores. This ring has 4 ranges when no splits are 
used, therefore we expect the number
         // of splits to be 25 for 100 cores and a default parallelism of 50 
(as we take the max of the two).
@@ -117,7 +117,7 @@ public class TokenPartitionerTest
     @Test
     public void testSplitCalculationsUsingDefaultParallelism()
     {
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", "DC1", "test");
+        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", 
"test");
         // When passed "-1" for numberSplits, the token partitioner should 
calculate it on its own based on
         // the number of cores. This ring has 4 ranges when no splits are 
used, therefore we expect the number
         // of splits to be 50 for 100 cores and a default parallelism of 200 
(as we take the max of the two).
@@ -136,7 +136,7 @@ public class TokenPartitionerTest
                                                                 .put("DC3", 3)
                                                                 .put("DC4", 3)
                                                                 .build();
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app", 
"cluster", dcMap, "test", 20);
+        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, dcMap, 
"test", 20);
         assertEquals(80, ring.getInstances().size());
         partitioner = new TokenPartitioner(ring, -1, 1, 750, false);
         assertEquals(10, partitioner.numSplits());
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 81e11b7..907c656 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -385,7 +385,7 @@ public abstract class CassandraBridge
                                                    String partitioner,
                                                    String createStatement,
                                                    String insertStatement,
-                                                   boolean isSorted,
+                                                   RowBufferMode rowBufferMode,
                                                    int bufferSizeMB);
 
     // CDC Configuration
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
 b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
similarity index 51%
rename from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
rename to 
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
index 5d30ad8..70be08c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
@@ -17,10 +17,25 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.bulkwriter;
+package org.apache.cassandra.bridge;
 
+/**
+ * Configures how data is flushed to an SSTable
+ */
 public enum RowBufferMode
 {
+    /**
+     * In this mode, Cassandra will flush an SSTable to disk once it reaches 
the configured BufferSizeInMB.
+     * This parameter is configured by the user-configurable 
SSTABLE_DATA_SIZE_IN_MB WriterOption. Note:
+     * This is the uncompressed size of data before being written to disk, and 
the actual size of an SSTable
+     * can be smaller based on the compression configuration for the SSTable 
and how compressible the data is.
+     */
     BUFFERED,
+
+    /**
+     * Cassandra expects rows in sorted order and will not flush an SSTable 
automatically. The size of an
+     * SSTable is based on the number of rows we write to the SSTable. This 
parameter is configured by the
+     * user-configurable BATCH_SIZE WriterOption.
+     */
     UNBUFFERED
 }
diff --git a/cassandra-four-zero/build.gradle b/cassandra-four-zero/build.gradle
index 0e85e06..01bbe7c 100644
--- a/cassandra-four-zero/build.gradle
+++ b/cassandra-four-zero/build.gradle
@@ -77,9 +77,14 @@ project(':cassandra-four-zero') {
         exclude('com/google/common/collect/Range*')
     }
 
-    task javadocJar(type: Jar, dependsOn: javadoc) {
-      classifier = 'javadoc'
-      from javadoc.destinationDir
+    tasks.register('javadocJar', Jar) {
+        dependsOn javadoc
+        classifier = 'javadoc'
+        from javadoc.destinationDir
+    }
+
+    test {
+        useJUnitPlatform()
     }
 
     artifacts {
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 7bdd4c1..9b50486 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -623,10 +623,10 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                           String partitioner,
                                           String createStatement,
                                           String insertStatement,
-                                          boolean isSorted,
+                                          RowBufferMode rowBufferMode,
                                           int bufferSizeMB)
     {
-        return new SSTableWriterImplementation(inDirectory, partitioner, 
createStatement, insertStatement, isSorted, bufferSizeMB);
+        return new SSTableWriterImplementation(inDirectory, partitioner, 
createStatement, insertStatement, rowBufferMode, bufferSizeMB);
     }
 
     // CDC Configuration
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index 05fb8bd..facfad6 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -22,6 +22,8 @@ package org.apache.cassandra.bridge;
 import java.io.IOException;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -42,28 +44,25 @@ public class SSTableWriterImplementation implements 
SSTableWriter
                                        String partitioner,
                                        String createStatement,
                                        String insertStatement,
-                                       boolean isSorted,
+                                       RowBufferMode rowBufferMode,
                                        int bufferSizeMB)
     {
         IPartitioner cassPartitioner = 
partitioner.toLowerCase().contains("random") ? new RandomPartitioner()
                                                                                
     : new Murmur3Partitioner();
-        CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
-                                                           
.inDirectory(inDirectory)
-                                                           
.forTable(createStatement)
-                                                           
.withPartitioner(cassPartitioner)
-                                                           
.using(insertStatement)
-                                                           
.withBufferSizeInMB(bufferSizeMB);
-        if (isSorted)
-        {
-            builder.sorted();
-        }
+
+        CQLSSTableWriter.Builder builder = configureBuilder(inDirectory,
+                                                            createStatement,
+                                                            insertStatement,
+                                                            rowBufferMode,
+                                                            bufferSizeMB,
+                                                            cassPartitioner);
         // TODO: Remove me once CQLSSTableWriter.Builder synchronize on schema 
(see CASSANDRA-TBD)
         //       build update schema, we need to synchronize
         writer = CassandraSchema.apply(s -> builder.build());
     }
 
     @Override
-    public void addRow(Map<String, Object> values)  throws IOException
+    public void addRow(Map<String, Object> values) throws IOException
     {
         try
         {
@@ -80,4 +79,28 @@ public class SSTableWriterImplementation implements 
SSTableWriter
     {
         writer.close();
     }
+
+    @VisibleForTesting
+    static CQLSSTableWriter.Builder configureBuilder(String inDirectory,
+                                                     String createStatement,
+                                                     String insertStatement,
+                                                     RowBufferMode 
rowBufferMode,
+                                                     int bufferSizeMB,
+                                                     IPartitioner 
cassPartitioner)
+    {
+        CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
+                                                           
.inDirectory(inDirectory)
+                                                           
.forTable(createStatement)
+                                                           
.withPartitioner(cassPartitioner)
+                                                           
.using(insertStatement);
+        if (rowBufferMode == RowBufferMode.UNBUFFERED)
+        {
+            builder.sorted();
+        }
+        else if (rowBufferMode == RowBufferMode.BUFFERED)
+        {
+            builder.withBufferSizeInMB(bufferSizeMB);
+        }
+        return builder;
+    }
 }
diff --git 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
new file mode 100644
index 0000000..6ede1d0
--- /dev/null
+++ 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.io.File;
+import java.lang.reflect.Field;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.utils.ReflectionUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for configuring {@link SSTableWriterImplementation}
+ */
+class SSTableWriterImplementationTest
+{
+    public static final String CREATE_STATEMENT = "CREATE TABLE 
test_keyspace.test_table (a int, b text)";
+    public static final String INSERT_STATEMENT = "INSERT INTO 
test_keyspace.test_table (a, b) VALUES (?, ?)";
+
+    @TempDir
+    File writeDirectory;
+
+    @Test
+    void testUnbufferedRowBufferModeConfiguration() throws 
NoSuchFieldException, IllegalAccessException
+    {
+        CQLSSTableWriter.Builder builder = 
SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(),
+                                                                               
         CREATE_STATEMENT,
+                                                                               
         INSERT_STATEMENT,
+                                                                               
         RowBufferMode.UNBUFFERED,
+                                                                               
         250,
+                                                                               
         new Murmur3Partitioner());
+
+
+        assertTrue(peekSorted(builder));
+        assertNotEquals(250, peekBufferSizeInMB(builder)); // 250 should not 
be set
+    }
+
+    @Test
+    void testBufferedRowBufferModeConfiguration() throws NoSuchFieldException, 
IllegalAccessException
+    {
+        CQLSSTableWriter.Builder builder = 
SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(),
+                                                                               
         CREATE_STATEMENT,
+                                                                               
         INSERT_STATEMENT,
+                                                                               
         RowBufferMode.BUFFERED,
+                                                                               
         250,
+                                                                               
         new Murmur3Partitioner());
+
+
+        assertFalse(peekSorted(builder));
+        assertEquals(250, peekBufferSizeInMB(builder));
+    }
+
+    static boolean peekSorted(CQLSSTableWriter.Builder builder) throws 
NoSuchFieldException, IllegalAccessException
+    {
+        Field sortedField = ReflectionUtils.getField(builder.getClass(), 
"sorted");
+        sortedField.setAccessible(true);
+        return (boolean) sortedField.get(builder);
+    }
+
+    static long peekBufferSizeInMB(CQLSSTableWriter.Builder builder) throws 
NoSuchFieldException, IllegalAccessException
+    {
+        Field bufferSizeInMBField;
+        try
+        {
+            bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(), 
"bufferSizeInMB");
+        }
+        catch (NoSuchFieldException noSuchFieldException)
+        {
+            // The bufferSizeInMB field has been renamed to bufferSizeInMiB in 
trunk, so we expect this to
+            // fail at some point, and we have a way to recover from the 
failure without causing the test
+            // to fail.
+            bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(), 
"bufferSizeInMiB");
+        }
+
+        bufferSizeInMBField.setAccessible(true);
+        return (long) bufferSizeInMBField.get(builder);
+    }
+}
diff --git 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/utils/ReflectionUtils.java
 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/utils/ReflectionUtils.java
new file mode 100644
index 0000000..b71de0f
--- /dev/null
+++ 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+/**
+ * Reflection utilities used for testing
+ */
+public final class ReflectionUtils
+{
+    private ReflectionUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static Field getField(Class<?> clazz, String fieldName) throws 
NoSuchFieldException
+    {
+        // below code works before Java 12
+        try
+        {
+            return clazz.getDeclaredField(fieldName);
+        }
+        catch (NoSuchFieldException e)
+        {
+            // this is mitigation for JDK 17 
(https://bugs.openjdk.org/browse/JDK-8210522)
+            try
+            {
+                Method getDeclaredFields0 = 
Class.class.getDeclaredMethod("getDeclaredFields0", boolean.class);
+                getDeclaredFields0.setAccessible(true);
+                Field[] fields = (Field[]) getDeclaredFields0.invoke(clazz, 
false);
+                for (Field field : fields)
+                {
+                    if (fieldName.equals(field.getName()))
+                    {
+                        return field;
+                    }
+                }
+            }
+            catch (ReflectiveOperationException ex)
+            {
+                e.addSuppressed(ex);
+            }
+            throw e;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to