This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 69766bc CASSANDRA-18662: Fix cassandra-analytics-core-example
69766bc is described below
commit 69766bca399cc779e0f2f8e859e39f7e29a17b7a
Author: Francisco Guerrero <[email protected]>
AuthorDate: Tue Jun 27 10:03:56 2023 -0700
CASSANDRA-18662: Fix cassandra-analytics-core-example
This commit fixes the `SampleCassandraJob` available under the
`cassandra-analytics-core-example`
subproject.
Fix checkstyle issues
Fix serialization issue in SidecarDataTransferApi
The `sidecarClient` field in `SidecarDataTransferApi` is declared as
transient,
this is causing NPEs coming from executors while trying to perform an
SSTable
upload.
This commit completely avoids serializing the `dataTransferApi` field in the
`CassandraBulkWriterContext`, and lazily initializing it during the
`transfer()`
method invocation. We guard the initialization to a single thread by making
the
`tranfer()` method synchronized. The `SidecarDataTransferApi` can be
recreated
when needed using the already serialized `clusterInfo`, `jobInfo`, and
`conf`
fields.
Fix setting ROW_BUFFER_MODE to BUFFERED
patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for
CASSANDRA-18662
---
CHANGES.txt | 1 +
build.gradle | 31 +++---
cassandra-analytics-core-example/README.md | 7 --
.../spark/example/SampleCassandraJob.java | 120 ++++++++++++---------
.../java/org/apache/cassandra/clients/Sidecar.java | 64 +++++------
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 21 ++--
.../bulkwriter/CassandraBulkSourceRelation.java | 1 -
.../bulkwriter/CassandraBulkWriterContext.java | 20 ++--
.../cassandra/spark/bulkwriter/RowBufferMode.java | 4 +-
.../cassandra/spark/bulkwriter/SSTableWriter.java | 2 +-
.../cassandra/spark/data/CassandraDataLayer.java | 5 +-
.../spark/bulkwriter/BulkSparkConfTest.java | 66 ++++++++++--
.../spark/bulkwriter/MockBulkWriterContext.java | 2 +-
13 files changed, 214 insertions(+), 130 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 16c16ee..f36e3d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Fix cassandra-analytics-core-example (CASSANDRA-18662)
* Added caching of Node Settings to improve efficiency (CASSANDRA-18633)
* Upgrade to JUnit 5 (CASSANDRA-18599)
* Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605)
diff --git a/build.gradle b/build.gradle
index 8735bb1..5772b48 100644
--- a/build.gradle
+++ b/build.gradle
@@ -17,6 +17,9 @@
* under the License.
*/
+import java.nio.file.Files
+import java.nio.file.Paths
+
buildscript {
dependencies {
classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow',
version: '6.1.0')
@@ -80,12 +83,23 @@ tasks.register('buildIgnoreRatList', Exec) {
}
rat {
+
+ doFirst {
+ def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt")
+ def excludeLines = Files.readAllLines(excludeFilePath)
+ excludeLines.each { line ->
+ if (line.endsWith("/")) {
+ excludes.add("**/" + line + "**")
+ } else {
+ excludes.add(line)
+ }
+ }
+ }
+
// List of Gradle exclude directives, defaults to ['**/.gradle/**']
excludes.add("**/build/**")
- excludes.add("**/.out/**")
excludes.add("CHANGES.txt")
excludes.add("**/org.apache.spark.sql.sources.DataSourceRegister")
- excludes.add("**/resources/cassandra-analytics-build.properties")
// Sidecar for build process
excludes.add("**/cassandra-sidecar/**")
@@ -97,23 +111,16 @@ rat {
excludes.add("gradlew")
excludes.add("gradlew.bat")
- // idea generated files
- excludes.add("**/.idea/**")
-
// resource files for test
excludes.add("**/test**/resources/**")
- // resources
- excludes.add("**/resources/sidecar.version")
-
- // Rat excludes file, one directive per line
- excludeFile.set(layout.projectDirectory.file("build/.rat-excludes.txt"))
-
// XML, TXT and HTML reports directory, defaults to 'build/reports/rat'
reportDir.set(file("build/reports/rat"))
}
-rat.dependsOn buildIgnoreRatList
+tasks.named('rat').configure {
+ dependsOn(buildIgnoreRatList)
+}
subprojects {
apply(plugin: 'java-library')
diff --git a/cassandra-analytics-core-example/README.md
b/cassandra-analytics-core-example/README.md
index 3428995..89122c9 100644
--- a/cassandra-analytics-core-example/README.md
+++ b/cassandra-analytics-core-example/README.md
@@ -108,13 +108,6 @@ cassandra_instances:
I have a 3 node setup, so I configure Sidecar for those 3 nodes. CCM creates
the Cassandra cluster under
`${HOME}/.ccm/test`, so I update my `data_dirs` and `staging_dir`
configuration to use my local path.
-Next, create the `staging_dir` where Sidecar will stage SSTables coming from
Cassandra Spark bulk writer.
-In my case, I have decided to keep the `sstable-staging` directory inside each
of the node's directories.
-
-```shell
-mkdir -p ${HOME}/.ccm/test/node{1..3}/sstable-staging
-```
-
Finally, run Cassandra Sidecar, we skip running integration tests because we
need docker for integration tests. You
can opt to run integration tests if you have docker running in your local
environment.
diff --git
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
index 58c9827..b71c57b 100644
---
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
+++
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
@@ -20,6 +20,7 @@
package org.apache.cassandra.spark.example;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -28,19 +29,20 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
-import org.apache.cassandra.spark.bulkwriter.TTLOption;
-import org.apache.cassandra.spark.bulkwriter.TimestampOption;
-import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.spark.KryoRegister;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.TTLOption;
+import org.apache.cassandra.spark.bulkwriter.TimestampOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -128,26 +130,46 @@ public final class SampleCassandraJob
{
JavaSparkContext javaSparkContext =
JavaSparkContext.fromSparkContext(sc);
int parallelism = sc.defaultParallelism();
- boolean addTTLColumn = true;
- boolean addTimestampColumn = true;
+ boolean addTTLColumn = false;
+ boolean addTimestampColumn = false;
JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount,
parallelism, addTTLColumn, addTimestampColumn);
Dataset<Row> df = sql.createDataFrame(rows,
getWriteSchema(addTTLColumn, addTimestampColumn));
- df.write()
- .format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
- .option("sidecar_instances", "localhost,localhost2,localhost3")
- .option("keyspace", "spark_test")
- .option("table", "test")
- .option("local_dc", "datacenter1")
- .option("bulk_writer_cl", "LOCAL_QUORUM")
- .option("number_splits", "-1")
- // A constant timestamp and TTL can be used by setting the following
options.
- // .option(WriterOptions.TIMESTAMP.name(),
TimestampOption.constant(System.currentTimeMillis() * 1000))
- // .option(WriterOptions.TTL.name(), TTLOption.constant(20))
- .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"))
- .option(WriterOptions.TIMESTAMP.name(),
TimestampOption.perRow("timestamp"))
- .mode("append")
- .save();
+ DataFrameWriter<Row> dfWriter = df.write()
+
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
+ .option("sidecar_instances",
"localhost,localhost2,localhost3")
+ .option("keyspace", "spark_test")
+ .option("table", "test")
+ .option("local_dc", "datacenter1")
+ .option("bulk_writer_cl",
"LOCAL_QUORUM")
+ .option("number_splits", "-1")
+ // A constant timestamp and TTL can
be used by setting the following options.
+ // .option(WriterOptions.TTL.name(),
TTLOption.constant(20))
+ //
.option(WriterOptions.TIMESTAMP.name(),
TimestampOption.constant(System.currentTimeMillis() * 1000))
+ .mode("append");
+
+ List<String> addedColumns = new ArrayList<>();
+ if (addTTLColumn)
+ {
+ addedColumns.add("ttl");
+ dfWriter = dfWriter
+ .option(WriterOptions.TTL.name(),
TTLOption.perRow("ttl"));
+ }
+
+ if (addTimestampColumn)
+ {
+ addedColumns.add("timestamp");
+ dfWriter = dfWriter
+ .option(WriterOptions.TIMESTAMP.name(),
TimestampOption.perRow("timestamp"));
+ }
+
+ dfWriter.save();
+
+ if (!addedColumns.isEmpty())
+ {
+ df = df.drop(addedColumns.toArray(new String[0]));
+ }
+
return df;
}
@@ -183,9 +205,9 @@ public final class SampleCassandraJob
private static StructType getWriteSchema(boolean addTTLColumn, boolean
addTimestampColumn)
{
StructType schema = new StructType()
- .add("id", LongType, false)
- .add("course", BinaryType, false)
- .add("marks", LongType, false);
+ .add("id", LongType, false)
+ .add("course", BinaryType, false)
+ .add("marks", LongType, false);
if (addTTLColumn)
{
schema = schema.add("ttl", IntegerType, false);
@@ -215,34 +237,34 @@ public final class SampleCassandraJob
long recordsPerPartition = records / parallelism;
long remainder = records - (recordsPerPartition * parallelism);
List<Integer> seq = IntStream.range(0,
parallelism).boxed().collect(Collectors.toList());
- int ttl = 10;
+ int ttl = 120; // data will not be queryable in two minutes
long timeStamp = System.currentTimeMillis() * 1000;
JavaRDD<Row> dataset = sc.parallelize(seq,
parallelism).mapPartitionsWithIndex(
- (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index,
integerIterator) -> {
- long firstRecordNumber = index * recordsPerPartition;
- long recordsToGenerate = index.equals(parallelism) ?
remainder : recordsPerPartition;
- java.util.Iterator<Row> rows = LongStream.range(0,
recordsToGenerate).mapToObj(offset -> {
- long recordNumber = firstRecordNumber + offset;
- String courseNameString = String.valueOf(recordNumber);
- Integer courseNameStringLen =
courseNameString.length();
- Integer courseNameMultiplier = 1000 /
courseNameStringLen;
- byte[] courseName = dupStringAsBytes(courseNameString,
courseNameMultiplier);
- if (addTTLColumn && addTimestampColumn)
- {
- return RowFactory.create(recordNumber, courseName,
recordNumber, ttl, timeStamp);
- }
- if (addTTLColumn)
- {
- return RowFactory.create(recordNumber, courseName,
recordNumber, ttl);
- }
- if (addTimestampColumn)
- {
- return RowFactory.create(recordNumber, courseName,
recordNumber, timeStamp);
- }
- return RowFactory.create(recordNumber, courseName,
recordNumber);
- }).iterator();
- return rows;
- }, false);
+ (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index,
integerIterator) -> {
+ long firstRecordNumber = index * recordsPerPartition;
+ long recordsToGenerate = index.equals(parallelism) ? remainder :
recordsPerPartition;
+ java.util.Iterator<Row> rows = LongStream.range(0,
recordsToGenerate).mapToObj(offset -> {
+ long recordNumber = firstRecordNumber + offset;
+ String courseNameString = String.valueOf(recordNumber);
+ Integer courseNameStringLen = courseNameString.length();
+ Integer courseNameMultiplier = 1000 / courseNameStringLen;
+ byte[] courseName = dupStringAsBytes(courseNameString,
courseNameMultiplier);
+ if (addTTLColumn && addTimestampColumn)
+ {
+ return RowFactory.create(recordNumber, courseName,
recordNumber, ttl, timeStamp);
+ }
+ if (addTTLColumn)
+ {
+ return RowFactory.create(recordNumber, courseName,
recordNumber, ttl);
+ }
+ if (addTimestampColumn)
+ {
+ return RowFactory.create(recordNumber, courseName,
recordNumber, timeStamp);
+ }
+ return RowFactory.create(recordNumber, courseName,
recordNumber);
+ }).iterator();
+ return rows;
+ }, false);
return dataset;
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 9721c32..0871ddd 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -81,32 +81,32 @@ public final class Sidecar
Vertx vertx = Vertx.vertx(new
VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize()));
HttpClientConfig.Builder<?> builder = new HttpClientConfig.Builder<>()
- .ssl(false)
-
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
- .idleTimeoutMillis((int)
TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
- .receiveBufferSize((int) config.chunkBufferSize())
- .maxChunkSize((int) config.maxBufferSize())
- .userAgent(BuildInfo.READER_USER_AGENT);
+ .ssl(false)
+
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
+ .idleTimeoutMillis((int)
TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
+ .receiveBufferSize((int)
config.chunkBufferSize())
+ .maxChunkSize((int)
config.maxBufferSize())
+
.userAgent(BuildInfo.READER_USER_AGENT);
if (secretsProvider != null)
{
builder = builder
- .ssl(true)
- .keyStoreInputStream(secretsProvider.keyStoreInputStream())
-
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
- .keyStoreType(secretsProvider.keyStoreType())
-
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
-
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
- .trustStoreType(secretsProvider.trustStoreType());
+ .ssl(true)
+
.keyStoreInputStream(secretsProvider.keyStoreInputStream())
+
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
+ .keyStoreType(secretsProvider.keyStoreType())
+
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
+
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
+ .trustStoreType(secretsProvider.trustStoreType());
}
HttpClientConfig httpClientConfig = builder.build();
SidecarConfig sidecarConfig = new SidecarConfig.Builder()
- .maxRetries(config.maxRetries())
- .retryDelayMillis(config.millisToSleep())
- .maxRetryDelayMillis(config.maxMillisToSleep())
- .build();
+ .maxRetries(config.maxRetries())
+ .retryDelayMillis(config.millisToSleep())
+
.maxRetryDelayMillis(config.maxMillisToSleep())
+ .build();
return buildClient(sidecarConfig, vertx, httpClientConfig,
sidecarInstancesProvider);
}
@@ -117,23 +117,23 @@ public final class Sidecar
.setWorkerPoolSize(conf.getMaxHttpConnections()));
HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>()
- .timeoutMillis(conf.getHttpResponseTimeoutMs())
- .idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
- .userAgent(BuildInfo.WRITER_USER_AGENT)
- .keyStoreInputStream(conf.getKeyStore())
- .keyStorePassword(conf.getKeyStorePassword())
- .keyStoreType(conf.getKeyStoreTypeOrDefault())
- .trustStoreInputStream(conf.getTrustStore())
- .trustStorePassword(conf.getTrustStorePasswordOrDefault())
- .trustStoreType(conf.getTrustStoreTypeOrDefault())
- .ssl(conf.hasKeystoreAndKeystorePassword())
- .build();
+
.timeoutMillis(conf.getHttpResponseTimeoutMs())
+
.idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
+
.userAgent(BuildInfo.WRITER_USER_AGENT)
+
.keyStoreInputStream(conf.getKeyStore())
+
.keyStorePassword(conf.getKeyStorePassword())
+
.keyStoreType(conf.getKeyStoreTypeOrDefault())
+
.trustStoreInputStream(conf.getTrustStore())
+
.trustStorePassword(conf.getTrustStorePasswordOrDefault())
+
.trustStoreType(conf.getTrustStoreTypeOrDefault())
+
.ssl(conf.hasKeystoreAndKeystorePassword())
+ .build();
SidecarConfig sidecarConfig = new SidecarConfig.Builder()
- .maxRetries(conf.getSidecarRequestRetries())
-
.retryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestRetryDelayInSeconds()))
-
.maxRetryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestMaxRetryDelayInSeconds()))
- .build();
+ .maxRetries(5)
+ .retryDelayMillis(200)
+ .maxRetryDelayMillis(500)
+ .build();
return buildClient(sidecarConfig, vertx, httpClientConfig,
sidecarInstancesProvider);
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index ee63df9..a719df7 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -80,7 +80,7 @@ public class BulkSparkConf implements Serializable
public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_SECONDS =
60L;
public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000;
public static final int DEFAULT_RING_RETRY_COUNT = 3;
- public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE =
RowBufferMode.UNBUFFERRED;
+ public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE =
RowBufferMode.UNBUFFERED;
public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000;
// NOTE: All Cassandra Analytics setting names must start with "spark" in
order to not be ignored by Spark,
@@ -141,7 +141,7 @@ public class BulkSparkConf implements Serializable
this.consistencyLevel =
ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options,
WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
this.localDC = MapUtils.getOrDefault(options,
WriterOptions.LOCAL_DC.name(), null);
this.numberSplits = MapUtils.getInt(options,
WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
- this.rowBufferMode = MapUtils.getEnumOption(options,
WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row bufferring
mode");
+ this.rowBufferMode = MapUtils.getEnumOption(options,
WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering
mode");
this.sstableDataSizeInMB = MapUtils.getInt(options,
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB");
this.sstableBatchSize = MapUtils.getInt(options,
WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size");
this.commitBatchSize = MapUtils.getInt(options,
WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit
batch size");
@@ -185,15 +185,18 @@ public class BulkSparkConf implements Serializable
protected void validateTableWriterSettings()
{
boolean batchSizeIsZero = sstableBatchSize == 0;
- if (rowBufferMode == RowBufferMode.BUFFERRED
- && (!batchSizeIsZero && sstableBatchSize !=
DEFAULT_BATCH_SIZE_IN_ROWS))
+
+ if (rowBufferMode == RowBufferMode.UNBUFFERED)
+ {
+ Preconditions.checkArgument(!batchSizeIsZero,
+ "If writing in sorted order
(ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE "
+ + "should be non zero, but it was set
to 0 in writer options");
+ }
+ else if (!batchSizeIsZero && sstableBatchSize !=
DEFAULT_BATCH_SIZE_IN_ROWS)
{
- LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value
({}) but ROW_BUFFER_MODE is set to BUFFERRED."
- + " Ignoring BATCH_SIZE.", sstableBatchSize);
+ LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value
({}) but ROW_BUFFER_MODE is set to BUFFERED."
+ + " Ignoring BATCH_SIZE.", sstableBatchSize);
}
- Preconditions.checkArgument(rowBufferMode == RowBufferMode.UNBUFFERRED
&& !batchSizeIsZero,
- "If writing in sorted order
(ROW_BUFFER_MODE is UNBUFFERRED) then BATCH_SIZE "
- + "should be non zero, but it was set to 0
in writer options");
}
/*
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 8f2c0b5..0a83785 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -121,7 +121,6 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
sparkContext.cancelJobGroup(writerContext.job().getId().toString());
}
- @SuppressWarnings("RedundantCast")
private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]>
sortedRDD, String[] columnNames)
{
writeValidator.setPhase("Environment Validation");
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 2709b51..abe4ef8 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -53,7 +53,7 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
@NotNull
private final BulkSparkConf conf;
private final JobInfo jobInfo;
- private final DataTransferApi dataTransferApi;
+ private transient DataTransferApi dataTransferApi;
private final CassandraClusterInfo clusterInfo;
private final SchemaInfo schemaInfo;
@@ -70,14 +70,12 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
clusterInfo = new CassandraClusterInfo(conf);
CassandraRing<RingInstance> ring = clusterInfo.getRing(true);
jobInfo = new CassandraJobInfo(conf,
- new TokenPartitioner(ring, conf.numberSplits,
sparkContext.defaultParallelism(), conf.getCores()));
+ new TokenPartitioner(ring,
conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores()));
Preconditions.checkArgument(!conf.consistencyLevel.isLocal()
- || (conf.localDC != null &&
ring.getReplicationFactor().getOptions().containsKey(conf.localDC)),
+ || (conf.localDC != null &&
ring.getReplicationFactor().getOptions().containsKey(conf.localDC)),
String.format("Keyspace %s is not
replicated on datacenter %s",
conf.keyspace,
conf.localDC));
- dataTransferApi = new
SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(),
jobInfo, conf);
-
String keyspace = conf.keyspace;
String table = conf.table;
@@ -133,14 +131,14 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
/**
* Use the implementation of the KryoSerializable interface as a detection
device to make sure the Spark Bulk
* Writer's KryoRegistrator is properly in place.
- *
+ * <p>
* If this class is serialized by Kryo, it means we're <b>not</b> set up
correctly, and therefore we log and fail.
* This failure will occur early in the job and be very clear, so users
can quickly fix their code and get up and
* running again, rather than having a random NullPointerException further
down the line.
*/
public static final String KRYO_REGISTRATION_WARNING =
- "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not
registered with Spark - "
- + "please see the README.md file for more details on how to register
the Spark Bulk Writer.";
+ "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not
registered with Spark - "
+ + "please see the README.md file for more details on how to register the
Spark Bulk Writer.";
@Override
public void write(Kryo kryo, Output output)
@@ -187,8 +185,12 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
@Override
@NotNull
- public DataTransferApi transfer()
+ public synchronized DataTransferApi transfer()
{
+ if (dataTransferApi == null)
+ {
+ dataTransferApi = new
SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(),
jobInfo, conf);
+ }
return dataTransferApi;
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
index d54ce3f..5d30ad8 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
@@ -21,6 +21,6 @@ package org.apache.cassandra.spark.bulkwriter;
public enum RowBufferMode
{
- BUFFERRED,
- UNBUFFERRED
+ BUFFERED,
+ UNBUFFERED
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index df6f155..9d2ebc9 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -74,7 +74,7 @@ public class SSTableWriter
LOGGER.info("Running with version " + packageVersion);
TableSchema tableSchema = writerContext.schema().getTableSchema();
- boolean sorted = writerContext.job().getRowBufferMode() ==
RowBufferMode.UNBUFFERRED;
+ boolean sorted = writerContext.job().getRowBufferMode() ==
RowBufferMode.UNBUFFERED;
this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter(
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
this.outDir.toString(),
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index fdc51bc..b4104bd 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -389,9 +389,12 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
instanceMap =
clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname,
Function.identity()));
try
{
+ SslConfigSecretsProvider secretsProvider = sslConfig != null
+ ? new
SslConfigSecretsProvider(sslConfig)
+ : null;
sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new
ArrayList<>(clusterConfig)),
sidecarClientConfig,
- new SslConfigSecretsProvider(sslConfig));
+ secretsProvider);
}
catch (IOException ioException)
{
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index cbe6030..0877d23 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -34,10 +34,11 @@ import org.apache.spark.SparkConf;
import org.jetbrains.annotations.NotNull;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.isEmptyString;
+import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -122,8 +123,8 @@ public class BulkSparkConfTest
@Test
public void ensureSetupSparkConfAddsPerformsNecessaryTasks()
{
- assertThat(sparkConf.get("spark.kryo.registrator", ""),
isEmptyString());
- assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""),
isEmptyString());
+ assertThat(sparkConf.get("spark.kryo.registrator", ""),
is(emptyString()));
+ assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""),
is(emptyString()));
BulkSparkConf.setupSparkConf(sparkConf, true);
assertEquals("," + SbwKryoRegistrator.class.getName(),
sparkConf.get("spark.kryo.registrator", ""));
if (BuildInfo.isAtLeastJava11(BuildInfo.javaSpecificationVersion()))
@@ -150,8 +151,8 @@ public class BulkSparkConfTest
NullPointerException npe = assertThrows(NullPointerException.class,
() -> new
BulkSparkConf(sparkConf, options));
assertEquals("Keystore password was set. But both keystore path and
base64 encoded string are not set. "
- + "Please either set option " + WriterOptions.KEYSTORE_PATH
- + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED,
npe.getMessage());
+ + "Please either set option " +
WriterOptions.KEYSTORE_PATH
+ + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED,
npe.getMessage());
}
@Test
@@ -206,7 +207,60 @@ public class BulkSparkConfTest
NullPointerException npe = assertThrows(NullPointerException.class,
() -> new
BulkSparkConf(sparkConf, options));
assertEquals("Trust Store Path was provided, but password is missing. "
- + "Please provide option " +
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
+ + "Please provide option " +
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
+ }
+
+ @Test
+ public void testUnbufferedRowBufferMode()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ options.put(WriterOptions.ROW_BUFFER_MODE.name(), "UNBUFFERED");
+ BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+ assertNotNull(bulkSparkConf);
+ assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.UNBUFFERED);
+ }
+
+ @Test
+ public void testBufferedRowBufferMode()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
+ BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+ assertNotNull(bulkSparkConf);
+ assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
+ }
+
+ @Test
+ public void testInvalidRowBufferMode()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid");
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> new
BulkSparkConf(sparkConf, options));
+ assertEquals("Key row buffering mode with value invalid is not a valid
Enum of type class org.apache.cassandra.spark.bulkwriter.RowBufferMode.",
+ exception.getMessage());
+ }
+
+ @Test
+ public void testBufferedRowBufferModeWithZeroBatchSize()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
+ options.put(WriterOptions.BATCH_SIZE.name(), "0");
+ BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+ assertNotNull(bulkSparkConf);
+ assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
+ }
+
+ @Test
+ public void testNonZeroBatchSizeIsIgnoredWithBufferedRowBufferMode()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ options.put(WriterOptions.BATCH_SIZE.name(), "5");
+ options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
+ BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+ assertNotNull(bulkSparkConf);
+ assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
}
private Map<String, String> copyDefaultOptions()
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 4cf29ca..7d1908b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -59,7 +59,7 @@ import static
org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCq
public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo,
JobInfo, SchemaInfo, DataTransferApi
{
private static final long serialVersionUID = -2912371629236770646L;
- private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERRED;
+ private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED;
private ConsistencyLevel.CL consistencyLevel;
public interface CommitResultSupplier extends BiFunction<List<String>,
String, RemoteCommitResult>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]