This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69766bc  CASSANDRA-18662: Fix cassandra-analytics-core-example
69766bc is described below

commit 69766bca399cc779e0f2f8e859e39f7e29a17b7a
Author: Francisco Guerrero <[email protected]>
AuthorDate: Tue Jun 27 10:03:56 2023 -0700

    CASSANDRA-18662: Fix cassandra-analytics-core-example
    
    This commit fixes the `SampleCassandraJob` available under the 
`cassandra-analytics-core-example`
    subproject.
    
    Fix checkstyle issues
    
    Fix serialization issue in SidecarDataTransferApi
    
    The `sidecarClient` field in `SidecarDataTransferApi` is declared as 
transient,
    this is causing NPEs coming from executors while trying to perform an 
SSTable
    upload.
    
    This commit completely avoids serializing the `dataTransferApi` field in the
    `CassandraBulkWriterContext`, and lazily initializing it during the 
`transfer()`
    method invocation. We guard the initialization to a single thread by making 
the
    `tranfer()` method synchronized. The `SidecarDataTransferApi` can be 
recreated
    when needed using the already serialized `clusterInfo`, `jobInfo`, and 
`conf`
    fields.
    
    Fix setting ROW_BUFFER_MODE to BUFFERED
    
    patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRA-18662
---
 CHANGES.txt                                        |   1 +
 build.gradle                                       |  31 +++---
 cassandra-analytics-core-example/README.md         |   7 --
 .../spark/example/SampleCassandraJob.java          | 120 ++++++++++++---------
 .../java/org/apache/cassandra/clients/Sidecar.java |  64 +++++------
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  21 ++--
 .../bulkwriter/CassandraBulkSourceRelation.java    |   1 -
 .../bulkwriter/CassandraBulkWriterContext.java     |  20 ++--
 .../cassandra/spark/bulkwriter/RowBufferMode.java  |   4 +-
 .../cassandra/spark/bulkwriter/SSTableWriter.java  |   2 +-
 .../cassandra/spark/data/CassandraDataLayer.java   |   5 +-
 .../spark/bulkwriter/BulkSparkConfTest.java        |  66 ++++++++++--
 .../spark/bulkwriter/MockBulkWriterContext.java    |   2 +-
 13 files changed, 214 insertions(+), 130 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 16c16ee..f36e3d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Fix cassandra-analytics-core-example (CASSANDRA-18662)
  * Added caching of Node Settings to improve efficiency (CASSANDRA-18633)
  * Upgrade to JUnit 5 (CASSANDRA-18599)
  * Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605)
diff --git a/build.gradle b/build.gradle
index 8735bb1..5772b48 100644
--- a/build.gradle
+++ b/build.gradle
@@ -17,6 +17,9 @@
  * under the License.
  */
 
+import java.nio.file.Files
+import java.nio.file.Paths
+
 buildscript {
   dependencies {
     classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow', 
version: '6.1.0')
@@ -80,12 +83,23 @@ tasks.register('buildIgnoreRatList', Exec) {
 }
 
 rat {
+
+  doFirst {
+    def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt")
+    def excludeLines = Files.readAllLines(excludeFilePath)
+    excludeLines.each { line ->
+      if (line.endsWith("/")) {
+        excludes.add("**/" + line + "**")
+      } else {
+        excludes.add(line)
+      }
+    }
+  }
+
   // List of Gradle exclude directives, defaults to ['**/.gradle/**']
   excludes.add("**/build/**")
-  excludes.add("**/.out/**")
   excludes.add("CHANGES.txt")
   excludes.add("**/org.apache.spark.sql.sources.DataSourceRegister")
-  excludes.add("**/resources/cassandra-analytics-build.properties")
 
   // Sidecar for build process
   excludes.add("**/cassandra-sidecar/**")
@@ -97,23 +111,16 @@ rat {
   excludes.add("gradlew")
   excludes.add("gradlew.bat")
 
-  // idea generated files
-  excludes.add("**/.idea/**")
-
   // resource files for test
   excludes.add("**/test**/resources/**")
 
-  // resources
-  excludes.add("**/resources/sidecar.version")
-
-  // Rat excludes file, one directive per line
-  excludeFile.set(layout.projectDirectory.file("build/.rat-excludes.txt"))
-
   // XML, TXT and HTML reports directory, defaults to 'build/reports/rat'
   reportDir.set(file("build/reports/rat"))
 }
 
-rat.dependsOn buildIgnoreRatList
+tasks.named('rat').configure {
+  dependsOn(buildIgnoreRatList)
+}
 
 subprojects {
   apply(plugin: 'java-library')
diff --git a/cassandra-analytics-core-example/README.md 
b/cassandra-analytics-core-example/README.md
index 3428995..89122c9 100644
--- a/cassandra-analytics-core-example/README.md
+++ b/cassandra-analytics-core-example/README.md
@@ -108,13 +108,6 @@ cassandra_instances:
 I have a 3 node setup, so I configure Sidecar for those 3 nodes. CCM creates 
the Cassandra cluster under
 `${HOME}/.ccm/test`, so I update my `data_dirs` and `staging_dir` 
configuration to use my local path.
 
-Next, create the `staging_dir` where Sidecar will stage SSTables coming from 
Cassandra Spark bulk writer.
-In my case, I have decided to keep the `sstable-staging` directory inside each 
of the node's directories.
-
-```shell
-mkdir -p ${HOME}/.ccm/test/node{1..3}/sstable-staging
-```
-
 Finally, run Cassandra Sidecar, we skip running integration tests because we 
need docker for integration tests. You
 can opt to run integration tests if you have docker running in your local 
environment.
 
diff --git 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
index 58c9827..b71c57b 100644
--- 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
+++ 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.spark.example;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -28,19 +29,20 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
-import org.apache.cassandra.spark.bulkwriter.TTLOption;
-import org.apache.cassandra.spark.bulkwriter.TimestampOption;
-import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.KryoRegister;
 import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.TTLOption;
+import org.apache.cassandra.spark.bulkwriter.TimestampOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -128,26 +130,46 @@ public final class SampleCassandraJob
     {
         JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(sc);
         int parallelism = sc.defaultParallelism();
-        boolean addTTLColumn = true;
-        boolean addTimestampColumn = true;
+        boolean addTTLColumn = false;
+        boolean addTimestampColumn = false;
         JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, 
parallelism, addTTLColumn, addTimestampColumn);
         Dataset<Row> df = sql.createDataFrame(rows, 
getWriteSchema(addTTLColumn, addTimestampColumn));
 
-        df.write()
-          .format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
-          .option("sidecar_instances", "localhost,localhost2,localhost3")
-          .option("keyspace", "spark_test")
-          .option("table", "test")
-          .option("local_dc", "datacenter1")
-          .option("bulk_writer_cl", "LOCAL_QUORUM")
-          .option("number_splits", "-1")
-          // A constant timestamp and TTL can be used by setting the following 
options.
-          // .option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.constant(System.currentTimeMillis() * 1000))
-          // .option(WriterOptions.TTL.name(), TTLOption.constant(20))
-          .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"))
-          .option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.perRow("timestamp"))
-          .mode("append")
-          .save();
+        DataFrameWriter<Row> dfWriter = df.write()
+                                          
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
+                                          .option("sidecar_instances", 
"localhost,localhost2,localhost3")
+                                          .option("keyspace", "spark_test")
+                                          .option("table", "test")
+                                          .option("local_dc", "datacenter1")
+                                          .option("bulk_writer_cl", 
"LOCAL_QUORUM")
+                                          .option("number_splits", "-1")
+                                          // A constant timestamp and TTL can 
be used by setting the following options.
+                                          // .option(WriterOptions.TTL.name(), 
TTLOption.constant(20))
+                                          // 
.option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.constant(System.currentTimeMillis() * 1000))
+                                          .mode("append");
+
+        List<String> addedColumns = new ArrayList<>();
+        if (addTTLColumn)
+        {
+            addedColumns.add("ttl");
+            dfWriter = dfWriter
+                       .option(WriterOptions.TTL.name(), 
TTLOption.perRow("ttl"));
+        }
+
+        if (addTimestampColumn)
+        {
+            addedColumns.add("timestamp");
+            dfWriter = dfWriter
+                       .option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.perRow("timestamp"));
+        }
+
+        dfWriter.save();
+
+        if (!addedColumns.isEmpty())
+        {
+            df = df.drop(addedColumns.toArray(new String[0]));
+        }
+
         return df;
     }
 
@@ -183,9 +205,9 @@ public final class SampleCassandraJob
     private static StructType getWriteSchema(boolean addTTLColumn, boolean 
addTimestampColumn)
     {
         StructType schema = new StructType()
-                .add("id", LongType, false)
-                .add("course", BinaryType, false)
-                .add("marks", LongType, false);
+                            .add("id", LongType, false)
+                            .add("course", BinaryType, false)
+                            .add("marks", LongType, false);
         if (addTTLColumn)
         {
             schema = schema.add("ttl", IntegerType, false);
@@ -215,34 +237,34 @@ public final class SampleCassandraJob
         long recordsPerPartition = records / parallelism;
         long remainder = records - (recordsPerPartition * parallelism);
         List<Integer> seq = IntStream.range(0, 
parallelism).boxed().collect(Collectors.toList());
-        int ttl = 10;
+        int ttl = 120; // data will not be queryable in two minutes
         long timeStamp = System.currentTimeMillis() * 1000;
         JavaRDD<Row> dataset = sc.parallelize(seq, 
parallelism).mapPartitionsWithIndex(
-                (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, 
integerIterator) -> {
-                    long firstRecordNumber = index * recordsPerPartition;
-                    long recordsToGenerate = index.equals(parallelism) ? 
remainder : recordsPerPartition;
-                    java.util.Iterator<Row> rows = LongStream.range(0, 
recordsToGenerate).mapToObj(offset -> {
-                        long recordNumber = firstRecordNumber + offset;
-                        String courseNameString = String.valueOf(recordNumber);
-                        Integer courseNameStringLen = 
courseNameString.length();
-                        Integer courseNameMultiplier = 1000 / 
courseNameStringLen;
-                        byte[] courseName = dupStringAsBytes(courseNameString, 
courseNameMultiplier);
-                        if (addTTLColumn && addTimestampColumn)
-                        {
-                            return RowFactory.create(recordNumber, courseName, 
recordNumber, ttl, timeStamp);
-                        }
-                        if (addTTLColumn)
-                        {
-                            return RowFactory.create(recordNumber, courseName, 
recordNumber, ttl);
-                        }
-                        if (addTimestampColumn)
-                        {
-                            return RowFactory.create(recordNumber, courseName, 
recordNumber, timeStamp);
-                        }
-                        return RowFactory.create(recordNumber, courseName, 
recordNumber);
-                    }).iterator();
-                    return rows;
-                }, false);
+        (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, 
integerIterator) -> {
+            long firstRecordNumber = index * recordsPerPartition;
+            long recordsToGenerate = index.equals(parallelism) ? remainder : 
recordsPerPartition;
+            java.util.Iterator<Row> rows = LongStream.range(0, 
recordsToGenerate).mapToObj(offset -> {
+                long recordNumber = firstRecordNumber + offset;
+                String courseNameString = String.valueOf(recordNumber);
+                Integer courseNameStringLen = courseNameString.length();
+                Integer courseNameMultiplier = 1000 / courseNameStringLen;
+                byte[] courseName = dupStringAsBytes(courseNameString, 
courseNameMultiplier);
+                if (addTTLColumn && addTimestampColumn)
+                {
+                    return RowFactory.create(recordNumber, courseName, 
recordNumber, ttl, timeStamp);
+                }
+                if (addTTLColumn)
+                {
+                    return RowFactory.create(recordNumber, courseName, 
recordNumber, ttl);
+                }
+                if (addTimestampColumn)
+                {
+                    return RowFactory.create(recordNumber, courseName, 
recordNumber, timeStamp);
+                }
+                return RowFactory.create(recordNumber, courseName, 
recordNumber);
+            }).iterator();
+            return rows;
+        }, false);
         return dataset;
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 9721c32..0871ddd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -81,32 +81,32 @@ public final class Sidecar
         Vertx vertx = Vertx.vertx(new 
VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize()));
 
         HttpClientConfig.Builder<?> builder = new HttpClientConfig.Builder<>()
-                .ssl(false)
-                
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
-                .idleTimeoutMillis((int) 
TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
-                .receiveBufferSize((int) config.chunkBufferSize())
-                .maxChunkSize((int) config.maxBufferSize())
-                .userAgent(BuildInfo.READER_USER_AGENT);
+                                              .ssl(false)
+                                              
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
+                                              .idleTimeoutMillis((int) 
TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
+                                              .receiveBufferSize((int) 
config.chunkBufferSize())
+                                              .maxChunkSize((int) 
config.maxBufferSize())
+                                              
.userAgent(BuildInfo.READER_USER_AGENT);
 
         if (secretsProvider != null)
         {
             builder = builder
-                    .ssl(true)
-                    .keyStoreInputStream(secretsProvider.keyStoreInputStream())
-                    
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
-                    .keyStoreType(secretsProvider.keyStoreType())
-                    
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
-                    
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
-                    .trustStoreType(secretsProvider.trustStoreType());
+                      .ssl(true)
+                      
.keyStoreInputStream(secretsProvider.keyStoreInputStream())
+                      
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
+                      .keyStoreType(secretsProvider.keyStoreType())
+                      
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
+                      
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
+                      .trustStoreType(secretsProvider.trustStoreType());
         }
 
         HttpClientConfig httpClientConfig = builder.build();
 
         SidecarConfig sidecarConfig = new SidecarConfig.Builder()
-                .maxRetries(config.maxRetries())
-                .retryDelayMillis(config.millisToSleep())
-                .maxRetryDelayMillis(config.maxMillisToSleep())
-                .build();
+                                      .maxRetries(config.maxRetries())
+                                      .retryDelayMillis(config.millisToSleep())
+                                      
.maxRetryDelayMillis(config.maxMillisToSleep())
+                                      .build();
 
         return buildClient(sidecarConfig, vertx, httpClientConfig, 
sidecarInstancesProvider);
     }
@@ -117,23 +117,23 @@ public final class Sidecar
                                                     
.setWorkerPoolSize(conf.getMaxHttpConnections()));
 
         HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>()
-                .timeoutMillis(conf.getHttpResponseTimeoutMs())
-                .idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
-                .userAgent(BuildInfo.WRITER_USER_AGENT)
-                .keyStoreInputStream(conf.getKeyStore())
-                .keyStorePassword(conf.getKeyStorePassword())
-                .keyStoreType(conf.getKeyStoreTypeOrDefault())
-                .trustStoreInputStream(conf.getTrustStore())
-                .trustStorePassword(conf.getTrustStorePasswordOrDefault())
-                .trustStoreType(conf.getTrustStoreTypeOrDefault())
-                .ssl(conf.hasKeystoreAndKeystorePassword())
-                .build();
+                                            
.timeoutMillis(conf.getHttpResponseTimeoutMs())
+                                            
.idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
+                                            
.userAgent(BuildInfo.WRITER_USER_AGENT)
+                                            
.keyStoreInputStream(conf.getKeyStore())
+                                            
.keyStorePassword(conf.getKeyStorePassword())
+                                            
.keyStoreType(conf.getKeyStoreTypeOrDefault())
+                                            
.trustStoreInputStream(conf.getTrustStore())
+                                            
.trustStorePassword(conf.getTrustStorePasswordOrDefault())
+                                            
.trustStoreType(conf.getTrustStoreTypeOrDefault())
+                                            
.ssl(conf.hasKeystoreAndKeystorePassword())
+                                            .build();
 
         SidecarConfig sidecarConfig = new SidecarConfig.Builder()
-                .maxRetries(conf.getSidecarRequestRetries())
-                
.retryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestRetryDelayInSeconds()))
-                
.maxRetryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestMaxRetryDelayInSeconds()))
-                .build();
+                                      .maxRetries(5)
+                                      .retryDelayMillis(200)
+                                      .maxRetryDelayMillis(500)
+                                      .build();
 
         return buildClient(sidecarConfig, vertx, httpClientConfig, 
sidecarInstancesProvider);
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index ee63df9..a719df7 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -80,7 +80,7 @@ public class BulkSparkConf implements Serializable
     public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_SECONDS = 
60L;
     public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000;
     public static final int DEFAULT_RING_RETRY_COUNT = 3;
-    public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = 
RowBufferMode.UNBUFFERRED;
+    public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = 
RowBufferMode.UNBUFFERED;
     public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000;
 
     // NOTE: All Cassandra Analytics setting names must start with "spark" in 
order to not be ignored by Spark,
@@ -141,7 +141,7 @@ public class BulkSparkConf implements Serializable
         this.consistencyLevel = 
ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, 
WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
         this.localDC = MapUtils.getOrDefault(options, 
WriterOptions.LOCAL_DC.name(), null);
         this.numberSplits = MapUtils.getInt(options, 
WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
-        this.rowBufferMode = MapUtils.getEnumOption(options, 
WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row bufferring 
mode");
+        this.rowBufferMode = MapUtils.getEnumOption(options, 
WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering 
mode");
         this.sstableDataSizeInMB = MapUtils.getInt(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB");
         this.sstableBatchSize = MapUtils.getInt(options, 
WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size");
         this.commitBatchSize = MapUtils.getInt(options, 
WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit 
batch size");
@@ -185,15 +185,18 @@ public class BulkSparkConf implements Serializable
     protected void validateTableWriterSettings()
     {
         boolean batchSizeIsZero = sstableBatchSize == 0;
-        if (rowBufferMode == RowBufferMode.BUFFERRED
-                && (!batchSizeIsZero && sstableBatchSize != 
DEFAULT_BATCH_SIZE_IN_ROWS))
+
+        if (rowBufferMode == RowBufferMode.UNBUFFERED)
+        {
+            Preconditions.checkArgument(!batchSizeIsZero,
+                                        "If writing in sorted order 
(ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE "
+                                        + "should be non zero, but it was set 
to 0 in writer options");
+        }
+        else if (!batchSizeIsZero && sstableBatchSize != 
DEFAULT_BATCH_SIZE_IN_ROWS)
         {
-            LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value 
({}) but ROW_BUFFER_MODE is set to BUFFERRED."
-                      + " Ignoring BATCH_SIZE.", sstableBatchSize);
+            LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value 
({}) but ROW_BUFFER_MODE is set to BUFFERED."
+                        + " Ignoring BATCH_SIZE.", sstableBatchSize);
         }
-        Preconditions.checkArgument(rowBufferMode == RowBufferMode.UNBUFFERRED 
&& !batchSizeIsZero,
-                                    "If writing in sorted order 
(ROW_BUFFER_MODE is UNBUFFERRED) then BATCH_SIZE "
-                                  + "should be non zero, but it was set to 0 
in writer options");
     }
 
     /*
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 8f2c0b5..0a83785 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -121,7 +121,6 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
         sparkContext.cancelJobGroup(writerContext.job().getId().toString());
     }
 
-    @SuppressWarnings("RedundantCast")
     private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> 
sortedRDD, String[] columnNames)
     {
         writeValidator.setPhase("Environment Validation");
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 2709b51..abe4ef8 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -53,7 +53,7 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
     @NotNull
     private final BulkSparkConf conf;
     private final JobInfo jobInfo;
-    private final DataTransferApi dataTransferApi;
+    private transient DataTransferApi dataTransferApi;
     private final CassandraClusterInfo clusterInfo;
     private final SchemaInfo schemaInfo;
 
@@ -70,14 +70,12 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
         clusterInfo = new CassandraClusterInfo(conf);
         CassandraRing<RingInstance> ring = clusterInfo.getRing(true);
         jobInfo = new CassandraJobInfo(conf,
-                new TokenPartitioner(ring, conf.numberSplits, 
sparkContext.defaultParallelism(), conf.getCores()));
+                                       new TokenPartitioner(ring, 
conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores()));
         Preconditions.checkArgument(!conf.consistencyLevel.isLocal()
-                                 || (conf.localDC != null && 
ring.getReplicationFactor().getOptions().containsKey(conf.localDC)),
+                                    || (conf.localDC != null && 
ring.getReplicationFactor().getOptions().containsKey(conf.localDC)),
                                     String.format("Keyspace %s is not 
replicated on datacenter %s",
                                                   conf.keyspace, 
conf.localDC));
 
-        dataTransferApi = new 
SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(), 
jobInfo, conf);
-
         String keyspace = conf.keyspace;
         String table = conf.table;
 
@@ -133,14 +131,14 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
     /**
      * Use the implementation of the KryoSerializable interface as a detection 
device to make sure the Spark Bulk
      * Writer's KryoRegistrator is properly in place.
-     *
+     * <p>
      * If this class is serialized by Kryo, it means we're <b>not</b> set up 
correctly, and therefore we log and fail.
      * This failure will occur early in the job and be very clear, so users 
can quickly fix their code and get up and
      * running again, rather than having a random NullPointerException further 
down the line.
      */
     public static final String KRYO_REGISTRATION_WARNING =
-            "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not 
registered with Spark - "
-          + "please see the README.md file for more details on how to register 
the Spark Bulk Writer.";
+    "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not 
registered with Spark - "
+    + "please see the README.md file for more details on how to register the 
Spark Bulk Writer.";
 
     @Override
     public void write(Kryo kryo, Output output)
@@ -187,8 +185,12 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
 
     @Override
     @NotNull
-    public DataTransferApi transfer()
+    public synchronized DataTransferApi transfer()
     {
+        if (dataTransferApi == null)
+        {
+            dataTransferApi = new 
SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(), 
jobInfo, conf);
+        }
         return dataTransferApi;
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
index d54ce3f..5d30ad8 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
@@ -21,6 +21,6 @@ package org.apache.cassandra.spark.bulkwriter;
 
 public enum RowBufferMode
 {
-    BUFFERRED,
-    UNBUFFERRED
+    BUFFERED,
+    UNBUFFERED
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index df6f155..9d2ebc9 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -74,7 +74,7 @@ public class SSTableWriter
         LOGGER.info("Running with version " + packageVersion);
 
         TableSchema tableSchema = writerContext.schema().getTableSchema();
-        boolean sorted = writerContext.job().getRowBufferMode() == 
RowBufferMode.UNBUFFERRED;
+        boolean sorted = writerContext.job().getRowBufferMode() == 
RowBufferMode.UNBUFFERED;
         this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter(
                 
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
                 this.outDir.toString(),
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index fdc51bc..b4104bd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -389,9 +389,12 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements Serializ
         instanceMap = 
clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname, 
Function.identity()));
         try
         {
+            SslConfigSecretsProvider secretsProvider = sslConfig != null
+                                                       ? new 
SslConfigSecretsProvider(sslConfig)
+                                                       : null;
             sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new 
ArrayList<>(clusterConfig)),
                                    sidecarClientConfig,
-                                   new SslConfigSecretsProvider(sslConfig));
+                                   secretsProvider);
         }
         catch (IOException ioException)
         {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index cbe6030..0877d23 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -34,10 +34,11 @@ import org.apache.spark.SparkConf;
 import org.jetbrains.annotations.NotNull;
 
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.isEmptyString;
+import static org.hamcrest.Matchers.emptyString;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -122,8 +123,8 @@ public class BulkSparkConfTest
     @Test
     public void ensureSetupSparkConfAddsPerformsNecessaryTasks()
     {
-        assertThat(sparkConf.get("spark.kryo.registrator", ""), 
isEmptyString());
-        assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), 
isEmptyString());
+        assertThat(sparkConf.get("spark.kryo.registrator", ""), 
is(emptyString()));
+        assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), 
is(emptyString()));
         BulkSparkConf.setupSparkConf(sparkConf, true);
         assertEquals("," + SbwKryoRegistrator.class.getName(), 
sparkConf.get("spark.kryo.registrator", ""));
         if (BuildInfo.isAtLeastJava11(BuildInfo.javaSpecificationVersion()))
@@ -150,8 +151,8 @@ public class BulkSparkConfTest
         NullPointerException npe = assertThrows(NullPointerException.class,
                                                 () -> new 
BulkSparkConf(sparkConf, options));
         assertEquals("Keystore password was set. But both keystore path and 
base64 encoded string are not set. "
-                   + "Please either set option " + WriterOptions.KEYSTORE_PATH
-                   + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, 
npe.getMessage());
+                     + "Please either set option " + 
WriterOptions.KEYSTORE_PATH
+                     + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, 
npe.getMessage());
     }
 
     @Test
@@ -206,7 +207,60 @@ public class BulkSparkConfTest
         NullPointerException npe = assertThrows(NullPointerException.class,
                                                 () -> new 
BulkSparkConf(sparkConf, options));
         assertEquals("Trust Store Path was provided, but password is missing. "
-                   + "Please provide option " + 
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
+                     + "Please provide option " + 
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
+    }
+
+    @Test
+    public void testUnbufferedRowBufferMode()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "UNBUFFERED");
+        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+        assertNotNull(bulkSparkConf);
+        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.UNBUFFERED);
+    }
+
+    @Test
+    public void testBufferedRowBufferMode()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
+        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+        assertNotNull(bulkSparkConf);
+        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
+    }
+
+    @Test
+    public void testInvalidRowBufferMode()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid");
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+                                                          () -> new 
BulkSparkConf(sparkConf, options));
+        assertEquals("Key row buffering mode with value invalid is not a valid 
Enum of type class org.apache.cassandra.spark.bulkwriter.RowBufferMode.",
+                     exception.getMessage());
+    }
+
+    @Test
+    public void testBufferedRowBufferModeWithZeroBatchSize()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
+        options.put(WriterOptions.BATCH_SIZE.name(), "0");
+        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+        assertNotNull(bulkSparkConf);
+        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
+    }
+
+    @Test
+    public void testNonZeroBatchSizeIsIgnoredWithBufferedRowBufferMode()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.BATCH_SIZE.name(), "5");
+        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
+        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+        assertNotNull(bulkSparkConf);
+        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
     }
 
     private Map<String, String> copyDefaultOptions()
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 4cf29ca..7d1908b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -59,7 +59,7 @@ import static 
org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCq
 public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, 
JobInfo, SchemaInfo, DataTransferApi
 {
     private static final long serialVersionUID = -2912371629236770646L;
-    private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERRED;
+    private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED;
     private ConsistencyLevel.CL consistencyLevel;
 
     public interface CommitResultSupplier extends BiFunction<List<String>, 
String, RemoteCommitResult>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to