This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 82b3c0a CASSANDRA-18692 Fix bulk writes with Buffered RowBufferMode
82b3c0a is described below
commit 82b3c0a79c9322142738a4ec2ff7d4d4c0be2370
Author: Francisco Guerrero <[email protected]>
AuthorDate: Tue Jul 25 12:41:10 2023 -0700
CASSANDRA-18692 Fix bulk writes with Buffered RowBufferMode
When setting Buffered RowBufferMode as part of the `WriterOption`s,
`org.apache.cassandra.spark.bulkwriter.RecordWriter` ignores that
configuration and instead
uses the batch size to determine when to finalize an SSTable and start
writing a new SSTable,
if more rows are available.
In this commit, we fix
`org.apache.cassandra.spark.bulkwriter.RecordWriter#checkBatchSize`
to take into account the configured `RowBufferMode`. And in specific to the
case of the
`UNBUFFERED` RowBufferMode, we check then the batchSize of the SSTable
during writes, and for
the case of `BUFFERED` that check will take no effect.
Co-authored-by: Doug Rohrer <[email protected]>
Patch by Francisco Guerrero, Doug Rohrer; Reviewed by Dinesh Joshi, Yifan
Cai for CASSANDRA-18692
---
CHANGES.txt | 1 +
build.gradle | 8 +-
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 1 +
.../bulkwriter/CassandraBulkWriterContext.java | 15 --
.../spark/bulkwriter/CassandraJobInfo.java | 2 +
.../apache/cassandra/spark/bulkwriter/JobInfo.java | 3 +
.../cassandra/spark/bulkwriter/RecordWriter.java | 34 +--
.../cassandra/spark/bulkwriter/SSTableWriter.java | 19 +-
.../spark/bulkwriter/SSTableWriterFactory.java | 5 +-
.../cassandra/spark/data/CassandraDataLayer.java | 243 +++++++++++----------
.../clients/SidecarNativeLibrariesTest.java} | 22 +-
.../spark/bulkwriter/BulkSparkConfTest.java | 3 +-
.../spark/bulkwriter/CassandraRingMonitorTest.java | 2 +-
.../spark/bulkwriter/MockBulkWriterContext.java | 21 +-
.../bulkwriter/NonValidatingTestSSTableWriter.java | 4 +-
.../spark/bulkwriter/RecordWriterTest.java | 60 ++++-
.../cassandra/spark/bulkwriter/RingUtils.java | 34 ++-
.../spark/bulkwriter/SSTableWriterTest.java | 6 +-
.../bulkwriter/StreamSessionConsistencyTest.java | 2 -
.../spark/bulkwriter/StreamSessionTest.java | 2 +-
.../spark/bulkwriter/TokenPartitionerTest.java | 12 +-
.../apache/cassandra/bridge/CassandraBridge.java | 2 +-
.../apache/cassandra/bridge}/RowBufferMode.java | 17 +-
cassandra-four-zero/build.gradle | 11 +-
.../bridge/CassandraBridgeImplementation.java | 4 +-
.../bridge/SSTableWriterImplementation.java | 47 +++-
.../bridge/SSTableWriterImplementationTest.java | 103 +++++++++
.../apache/cassandra/utils/ReflectionUtils.java | 65 ++++++
28 files changed, 506 insertions(+), 242 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index afbb30a..41e2b04 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Fix bulk writes with Buffered RowBufferMode (CASSANDRA-18692)
* Minor Refactoring to Improve Code Reusability (CASSANDRA-18684)
* Fix cassandra-analytics-core-example (CASSANDRA-18662)
* Added caching of Node Settings to improve efficiency (CASSANDRA-18633)
diff --git a/build.gradle b/build.gradle
index 5772b48..837c6fe 100644
--- a/build.gradle
+++ b/build.gradle
@@ -20,16 +20,10 @@
import java.nio.file.Files
import java.nio.file.Paths
-buildscript {
- dependencies {
- classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow',
version: '6.1.0')
- }
-}
-
plugins {
id 'java'
id 'java-library'
- id 'com.github.johnrengelman.shadow' version '5.1.0'
+ id 'com.github.johnrengelman.shadow' version '7.1.2'
// Release Audit Tool (RAT) plugin for checking project licenses
id("org.nosphere.apache.rat") version "0.8.0"
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index a719df7..798f259 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.clients.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index abe4ef8..3789b63 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -30,7 +30,6 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import io.netty.channel.EventLoopGroup;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
@@ -57,11 +56,6 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
private final CassandraClusterInfo clusterInfo;
private final SchemaInfo schemaInfo;
- static
- {
- configureRelocatedNetty();
- }
-
private CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
@NotNull StructType dfSchema,
SparkContext sparkContext)
@@ -119,15 +113,6 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
}
}
- // If using the shaded JAR, Configure our shaded Netty so it can find the
correct native libraries
- private static void configureRelocatedNetty()
- {
- if (EventLoopGroup.class.getName().startsWith("analytics"))
- {
- System.setProperty("analytics.io.netty.packagePrefix",
"analytics");
- }
- }
-
/**
* Use the implementation of the KryoSerializable interface as a detection
device to make sure the Spark Bulk
* Writer's KryoRegistrator is properly in place.
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 3151d6e..5840c8a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.bulkwriter;
import java.util.UUID;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.jetbrains.annotations.NotNull;
@@ -50,6 +51,7 @@ public class CassandraJobInfo implements JobInfo
return conf.localDC;
}
+ @NotNull
@Override
public RowBufferMode getRowBufferMode()
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 945012b..f7ac149 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -22,7 +22,9 @@ package org.apache.cassandra.spark.bulkwriter;
import java.io.Serializable;
import java.util.UUID;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.jetbrains.annotations.NotNull;
public interface JobInfo extends Serializable
{
@@ -31,6 +33,7 @@ public interface JobInfo extends Serializable
String getLocalDC();
+ @NotNull
RowBufferMode getRowBufferMode();
int getSstableDataSizeInMB();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 32b8632..426cb8a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -41,6 +41,7 @@ import com.google.common.collect.Range;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.TaskContext;
@@ -93,7 +94,7 @@ public class RecordWriter implements Serializable
TaskContext taskContext = taskContextSupplier.get();
LOGGER.info("[{}]: Processing Bulk Writer partition",
taskContext.partitionId());
scala.collection.Iterator<scala.Tuple2<DecoratedKey, Object[]>>
dataIterator =
- new InterruptibleIterator<>(taskContext,
asScalaIterator(sourceIterator));
+ new InterruptibleIterator<>(taskContext,
asScalaIterator(sourceIterator));
StreamSession streamSession = createStreamSession(taskContext);
validateAcceptableTimeSkewOrThrow(streamSession.replicas);
int partitionId = taskContext.partitionId();
@@ -114,9 +115,9 @@ public class RecordWriter implements Serializable
checkBatchSize(streamSession, partitionId, job);
}
- if (batchSize != 0)
+ if (sstableWriter != null)
{
- finalizeSSTable(streamSession, partitionId, sstableWriter,
batchNumber, batchSize);
+ finalizeSSTable(streamSession, partitionId, sstableWriter,
batchNumber, batchSize);
}
LOGGER.info("[{}] Done with all writers and waiting for stream to
complete", partitionId);
@@ -124,6 +125,11 @@ public class RecordWriter implements Serializable
}
catch (Exception exception)
{
+ LOGGER.error("[{}] Failed to write job={},
taskStageAttemptNumber={}, taskAttemptNumber={}",
+ partitionId,
+ job.getId().toString(),
+ taskContext.stageAttemptNumber(),
+ taskContext.attemptNumber());
throw new RuntimeException(exception);
}
}
@@ -137,8 +143,8 @@ public class RecordWriter implements Serializable
if (localNow.isBefore(remoteNow.minus(range)) ||
localNow.isAfter(remoteNow.plus(range)))
{
String message = String.format("Time skew between Spark and
Cassandra is too large. "
- + "Allowable skew is %d minutes. "
- + "Spark executor time is %s,
Cassandra instance time is %s",
+ + "Allowable skew is %d minutes. "
+ + "Spark executor time is %s,
Cassandra instance time is %s",
timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow);
throw new UnsupportedOperationException(message);
}
@@ -167,20 +173,22 @@ public class RecordWriter implements Serializable
}
}
- public void checkBatchSize(StreamSession streamSession, int partitionId,
JobInfo job) throws IOException
+ void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo
job) throws IOException
{
- batchSize++;
- if (batchSize > job.getSstableBatchSize())
+ if (job.getRowBufferMode() == RowBufferMode.UNBUFFERED)
{
- finalizeSSTable(streamSession, partitionId, sstableWriter,
batchNumber, batchSize);
-
- sstableWriter = null;
- batchSize = 0;
+ batchSize++;
+ if (batchSize >= job.getSstableBatchSize())
+ {
+ finalizeSSTable(streamSession, partitionId, sstableWriter,
batchNumber, batchSize);
+ sstableWriter = null;
+ batchSize = 0;
+ }
}
}
- public void maybeCreateTableWriter(int partitionId, Path baseDir) throws
IOException
+ void maybeCreateTableWriter(int partitionId, Path baseDir) throws
IOException
{
if (sstableWriter == null)
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index 9d2ebc9..6b012db 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -74,15 +74,14 @@ public class SSTableWriter
LOGGER.info("Running with version " + packageVersion);
TableSchema tableSchema = writerContext.schema().getTableSchema();
- boolean sorted = writerContext.job().getRowBufferMode() ==
RowBufferMode.UNBUFFERED;
this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter(
-
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
- this.outDir.toString(),
- writerContext.cluster().getPartitioner().toString(),
- tableSchema.createStatement,
- tableSchema.modificationStatement,
- sorted,
- writerContext.job().getSstableDataSizeInMB());
+
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
+ this.outDir.toString(),
+ writerContext.cluster().getPartitioner().toString(),
+ tableSchema.createStatement,
+ tableSchema.modificationStatement,
+ writerContext.job().getRowBufferMode(),
+ writerContext.job().getSstableDataSizeInMB());
}
@NotNull
@@ -151,9 +150,9 @@ public class SSTableWriter
{
Map<Path, MD5Hash> fileHashes = new HashMap<>();
try (DirectoryStream<Path> filesToHash =
- Files.newDirectoryStream(dataFile.getParent(),
SSTables.getSSTableBaseName(dataFile) + "*"))
+ Files.newDirectoryStream(dataFile.getParent(),
SSTables.getSSTableBaseName(dataFile) + "*"))
{
- for (Path path: filesToHash)
+ for (Path path : filesToHash)
{
fileHashes.put(path, calculateFileHash(path));
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
index 832e370..50c5d47 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.bridge.SSTableWriter;
public final class SSTableWriterFactory
@@ -36,7 +37,7 @@ public final class SSTableWriterFactory
String partitioner,
String createStatement,
String insertStatement,
- boolean sorted,
+ RowBufferMode rowBufferMode,
int bufferSizeMB)
{
CassandraBridge cassandraBridge =
CassandraBridgeFactory.get(serverVersion);
@@ -44,7 +45,7 @@ public final class SSTableWriterFactory
partitioner,
createStatement,
insertStatement,
- sorted,
+ rowBufferMode,
bufferSizeMB);
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index b4104bd..39c69c1 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -109,10 +109,10 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
public static final Logger LOGGER =
LoggerFactory.getLogger(CassandraDataLayer.class);
private static final Cache<String, CompletableFuture<List<SSTable>>>
SNAPSHOT_CACHE =
- CacheBuilder.newBuilder()
- .expireAfterAccess(15, TimeUnit.MINUTES)
- .maximumSize(128)
- .build();
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(15, TimeUnit.MINUTES)
+ .maximumSize(128)
+ .build();
protected String snapshotName;
protected String keyspace;
@@ -158,25 +158,25 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
// For serialization
@VisibleForTesting
// CHECKSTYLE IGNORE: Constructor with many parameters
- CassandraDataLayer(@Nullable String keyspace,
- @Nullable String table,
- @NotNull String snapshotName,
- @Nullable String datacenter,
- @NotNull Sidecar.ClientConfig sidecarClientConfig,
- @Nullable SslConfig sslConfig,
- @NotNull CqlTable cqlTable,
- @NotNull TokenPartitioner tokenPartitioner,
- @NotNull CassandraVersion version,
- @NotNull ConsistencyLevel consistencyLevel,
- @NotNull Set<SidecarInstanceImpl> clusterConfig,
- @NotNull Map<String,
PartitionedDataLayer.AvailabilityHint> availabilityHints,
- @NotNull Map<String, BigNumberConfigImpl>
bigNumberConfigMap,
- boolean enableStats,
- boolean readIndexOffset,
- boolean useIncrementalRepair,
- @Nullable String lastModifiedTimestampField,
- List<SchemaFeature> requestedFeatures,
- @NotNull Map<String, ReplicationFactor> rfMap)
+ protected CassandraDataLayer(@Nullable String keyspace,
+ @Nullable String table,
+ @NotNull String snapshotName,
+ @Nullable String datacenter,
+ @NotNull Sidecar.ClientConfig
sidecarClientConfig,
+ @Nullable SslConfig sslConfig,
+ @NotNull CqlTable cqlTable,
+ @NotNull TokenPartitioner tokenPartitioner,
+ @NotNull CassandraVersion version,
+ @NotNull ConsistencyLevel consistencyLevel,
+ @NotNull Set<SidecarInstanceImpl>
clusterConfig,
+ @NotNull Map<String,
PartitionedDataLayer.AvailabilityHint> availabilityHints,
+ @NotNull Map<String, BigNumberConfigImpl>
bigNumberConfigMap,
+ boolean enableStats,
+ boolean readIndexOffset,
+ boolean useIncrementalRepair,
+ @Nullable String lastModifiedTimestampField,
+ List<SchemaFeature> requestedFeatures,
+ @NotNull Map<String, ReplicationFactor> rfMap)
{
super(consistencyLevel, datacenter);
this.snapshotName = snapshotName;
@@ -214,7 +214,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
clusterConfig = initializeClusterConfig(options);
initInstanceMap();
- // Get cluster info from CassandraManager
+ // Get cluster info from Cassandra Sidecar
int effectiveNumberOfCores;
CompletableFuture<RingResponse> ringFuture = sidecar.ring(keyspace);
try
@@ -271,8 +271,8 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
ReplicationFactor replicationFactor =
CqlUtils.extractReplicationFactor(fullSchema, keyspace);
rfMap = ImmutableMap.of(keyspace, replicationFactor);
CompletableFuture<Integer> sizingFuture =
CompletableFuture.supplyAsync(
- () -> getSizing(clusterConfig, replicationFactor,
options).getEffectiveNumberOfCores(),
- ExecutorHolder.EXECUTOR_SERVICE);
+ () -> getSizing(clusterConfig, replicationFactor,
options).getEffectiveNumberOfCores(),
+ ExecutorHolder.EXECUTOR_SERVICE);
validateReplicationFactor(replicationFactor);
udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt));
@@ -296,8 +296,8 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
else
{
LOGGER.warn("Skipping clearing snapshot because it was not
created by this job. "
- + "Only the job that created the snapshot can clear
it. "
- + "snapshotName={} keyspace={} table={} dc={}",
+ + "Only the job that created the snapshot can
clear it. "
+ + "snapshotName={} keyspace={} table={} dc={}",
snapshotName, keyspace, table, datacenter);
}
}
@@ -317,57 +317,57 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints =
new ConcurrentHashMap<>(ring.size());
// Fire off create snapshot request across the entire cluster
- List<CompletableFuture<Void>> futures = ring
- .stream()
- .filter(ringEntry -> datacenter == null ||
datacenter.equals(ringEntry.datacenter()))
- .map(ringEntry -> {
- PartitionedDataLayer.AvailabilityHint hint =
-
PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(),
ringEntry.state());
-
- CompletableFuture<PartitionedDataLayer.AvailabilityHint>
createSnapshotFuture;
- if (NODE_STATUS_NOT_CONSIDERED.contains(ringEntry.state()))
- {
- LOGGER.warn("Skip snapshot creating when node is
joining or down "
- + "snapshotName={} keyspace={} table={}
datacenter={} fqdn={} status={} state={}",
- snapshotName, keyspace, table, datacenter,
ringEntry.fqdn(), ringEntry.status(), ringEntry.state());
- createSnapshotFuture =
CompletableFuture.completedFuture(hint);
- }
- else
- {
- LOGGER.info("Creating snapshot on instance
snapshotName={} keyspace={} table={} datacenter={} fqdn={}",
- snapshotName, keyspace, table, datacenter,
ringEntry.fqdn());
- SidecarInstance sidecarInstance = new
SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.port());
- createSnapshotFuture = sidecar
- .createSnapshot(sidecarInstance, keyspace,
table, snapshotName)
- .handle((resp, throwable) -> {
- if (throwable == null)
- {
- // Create snapshot succeeded
- return hint;
- }
-
- if (isExhausted(throwable))
- {
- LOGGER.warn("Failed to create snapshot
on instance", throwable);
- return
PartitionedDataLayer.AvailabilityHint.DOWN;
- }
-
- LOGGER.error("Unexpected error creating
snapshot on instance", throwable);
- return
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
- });
- }
-
- return createSnapshotFuture
- .thenAccept(h ->
availabilityHints.put(ringEntry.fqdn(), h));
- })
- .collect(Collectors.toList());
+ List<CompletableFuture<Void>> futures =
+ ring.stream()
+ .filter(ringEntry -> datacenter == null ||
datacenter.equals(ringEntry.datacenter()))
+ .map(ringEntry -> {
+ PartitionedDataLayer.AvailabilityHint hint =
+
PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(),
ringEntry.state());
+
+ CompletableFuture<PartitionedDataLayer.AvailabilityHint>
createSnapshotFuture;
+ if (NODE_STATUS_NOT_CONSIDERED.contains(ringEntry.state()))
+ {
+ LOGGER.warn("Skip snapshot creating when node is joining
or down "
+ + "snapshotName={} keyspace={} table={}
datacenter={} fqdn={} status={} state={}",
+ snapshotName, keyspace, table, datacenter,
ringEntry.fqdn(), ringEntry.status(), ringEntry.state());
+ createSnapshotFuture =
CompletableFuture.completedFuture(hint);
+ }
+ else
+ {
+ LOGGER.info("Creating snapshot on instance snapshotName={}
keyspace={} table={} datacenter={} fqdn={}",
+ snapshotName, keyspace, table, datacenter,
ringEntry.fqdn());
+ SidecarInstance sidecarInstance = new
SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.port());
+ createSnapshotFuture = sidecar
+ .createSnapshot(sidecarInstance,
keyspace, table, snapshotName)
+ .handle((resp, throwable) -> {
+ if (throwable == null)
+ {
+ // Create snapshot succeeded
+ return hint;
+ }
+
+ if (isExhausted(throwable))
+ {
+ LOGGER.warn("Failed to
create snapshot on instance", throwable);
+ return
PartitionedDataLayer.AvailabilityHint.DOWN;
+ }
+
+ LOGGER.error("Unexpected error
creating snapshot on instance", throwable);
+ return
PartitionedDataLayer.AvailabilityHint.UNKNOWN;
+ });
+ }
+
+ return createSnapshotFuture
+ .thenAccept(h ->
availabilityHints.put(ringEntry.fqdn(), h));
+ })
+ .collect(Collectors.toList());
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.handle((results, throwable) -> availabilityHints);
}
- private boolean isExhausted(@Nullable Throwable throwable)
+ protected boolean isExhausted(@Nullable Throwable throwable)
{
return throwable != null && (throwable instanceof
RetriesExhaustedException || isExhausted(throwable.getCause()));
}
@@ -511,14 +511,14 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
}
String key = snapshotKey(sidecarInstance); // NOTE: We don't
currently support token filtering in list snapshot
LOGGER.info("Listing snapshot partition={} lowerBound={} upperBound={}
"
- + "instance={} port={} keyspace={} tableName={}
snapshotName={}",
+ + "instance={} port={} keyspace={} tableName={}
snapshotName={}",
partitionId, range.lowerEndpoint(), range.upperEndpoint(),
sidecarInstance.hostname(), sidecarInstance.port(),
keyspace, table, snapshotName);
try
{
return SNAPSHOT_CACHE.get(key, () -> {
LOGGER.info("Listing instance snapshot partition={}
lowerBound={} upperBound={} "
- + "instance={} port={} keyspace={} tableName={}
snapshotName={} cacheKey={}",
+ + "instance={} port={} keyspace={} tableName={}
snapshotName={} cacheKey={}",
partitionId, range.lowerEndpoint(),
range.upperEndpoint(),
sidecarInstance.hostname(),
sidecarInstance.port(), keyspace, table, snapshotName, key);
return sidecar.listSnapshotFiles(sidecarInstance, keyspace,
table, snapshotName)
@@ -574,16 +574,16 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
// Map to SSTable
return result.values().stream()
- .map(components -> new
SidecarProvisionedSSTable(sidecar,
-
sidecarClientConfig,
-
sidecarInstance,
-
keyspace,
-
table,
-
snapshotName,
-
components,
-
partitionId,
-
stats()))
- .collect(Collectors.toList());
+ .map(components -> new SidecarProvisionedSSTable(sidecar,
+
sidecarClientConfig,
+
sidecarInstance,
+ keyspace,
+ table,
+
snapshotName,
+
components,
+
partitionId,
+ stats()))
+ .collect(Collectors.toList());
}
@Override
@@ -606,10 +606,10 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
CassandraDataLayer that = (CassandraDataLayer) other;
return cqlTable.equals(that.cqlTable)
- && snapshotName.equals(that.snapshotName)
- && keyspace.equals(that.keyspace)
- && table.equals(that.table)
- && version().equals(that.version());
+ && snapshotName.equals(that.snapshotName)
+ && keyspace.equals(that.keyspace)
+ && table.equals(that.table)
+ && version().equals(that.version());
}
public Map<String, BigNumberConfigImpl> bigNumberConfigMap()
@@ -632,9 +632,10 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
RingResponse ring)
{
Collection<CassandraInstance> instances = ring
- .stream()
- .map(status -> new CassandraInstance(status.token(),
status.fqdn(), status.datacenter()))
- .collect(Collectors.toList());
+ .stream()
+ .filter(status -> datacenter
== null || datacenter.equalsIgnoreCase(status.datacenter()))
+ .map(status -> new
CassandraInstance(status.token(), status.fqdn(), status.datacenter()))
+
.collect(Collectors.toList());
return new CassandraRing(partitioner, keyspace, replicationFactor,
instances);
}
@@ -800,35 +801,35 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements Serializ
{
LOGGER.info("Deserializing CassandraDataLayer with Kryo");
return new CassandraDataLayer(
- in.readString(),
- in.readString(),
- in.readString(),
- in.readString(),
- Sidecar.ClientConfig.create(in.readInt(),
- in.readInt(),
- in.readLong(),
- in.readLong(),
- in.readLong(),
- in.readLong(),
- in.readInt(),
- in.readInt(),
- (Map<FileType, Long>)
kryo.readObject(in, HashMap.class),
- (Map<FileType, Long>)
kryo.readObject(in, HashMap.class)),
- kryo.readObjectOrNull(in, SslConfig.class),
- kryo.readObject(in, CqlTable.class),
- kryo.readObject(in, TokenPartitioner.class),
- kryo.readObject(in, CassandraVersion.class),
- kryo.readObject(in, ConsistencyLevel.class),
- kryo.readObject(in, HashSet.class),
- (Map<String, PartitionedDataLayer.AvailabilityHint>)
kryo.readObject(in, HashMap.class),
- in.readBoolean() ? Collections.emptyMap()
- : (Map<String, BigNumberConfigImpl>)
kryo.readObject(in, HashMap.class),
- in.readBoolean(),
- in.readBoolean(),
- in.readBoolean(),
- in.readString(),
- kryo.readObject(in,
SchemaFeaturesListWrapper.class).toList(),
- kryo.readObject(in, HashMap.class));
+ in.readString(),
+ in.readString(),
+ in.readString(),
+ in.readString(),
+ Sidecar.ClientConfig.create(in.readInt(),
+ in.readInt(),
+ in.readLong(),
+ in.readLong(),
+ in.readLong(),
+ in.readLong(),
+ in.readInt(),
+ in.readInt(),
+ (Map<FileType, Long>)
kryo.readObject(in, HashMap.class),
+ (Map<FileType, Long>)
kryo.readObject(in, HashMap.class)),
+ kryo.readObjectOrNull(in, SslConfig.class),
+ kryo.readObject(in, CqlTable.class),
+ kryo.readObject(in, TokenPartitioner.class),
+ kryo.readObject(in, CassandraVersion.class),
+ kryo.readObject(in, ConsistencyLevel.class),
+ kryo.readObject(in, HashSet.class),
+ (Map<String, PartitionedDataLayer.AvailabilityHint>)
kryo.readObject(in, HashMap.class),
+ in.readBoolean() ? Collections.emptyMap()
+ : (Map<String, BigNumberConfigImpl>)
kryo.readObject(in, HashMap.class),
+ in.readBoolean(),
+ in.readBoolean(),
+ in.readBoolean(),
+ in.readString(),
+ kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
+ kryo.readObject(in, HashMap.class));
}
// Wrapper only used internally for Kryo serialization/deserialization
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarNativeLibrariesTest.java
similarity index 58%
copy from
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
copy to
cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarNativeLibrariesTest.java
index 5d30ad8..454fdf3 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/clients/SidecarNativeLibrariesTest.java
@@ -17,10 +17,24 @@
* under the License.
*/
-package org.apache.cassandra.spark.bulkwriter;
+package org.apache.cassandra.clients;
-public enum RowBufferMode
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.io.vertx.core.net.OpenSSLEngineOptions;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests to ensure that Sidecar client native libraries are loaded
correctly
+ */
+public class SidecarNativeLibrariesTest
{
- BUFFERED,
- UNBUFFERED
+ @DisplayName("Ensures that the shading is correct for the
vertx-client-shaded project")
+ @Test
+ void openSslIsAvailable()
+ {
+ assertTrue(OpenSSLEngineOptions.isAvailable());
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index 0877d23..8a86a09 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
import org.apache.cassandra.spark.utils.BuildInfo;
import org.apache.spark.SparkConf;
@@ -237,7 +238,7 @@ public class BulkSparkConfTest
options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid");
IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
() -> new
BulkSparkConf(sparkConf, options));
- assertEquals("Key row buffering mode with value invalid is not a valid
Enum of type class org.apache.cassandra.spark.bulkwriter.RowBufferMode.",
+ assertEquals("Key row buffering mode with value invalid is not a valid
Enum of type class org.apache.cassandra.bridge.RowBufferMode.",
exception.getMessage());
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
index d86506f..d64b3a7 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraRingMonitorTest.java
@@ -132,6 +132,6 @@ public class CassandraRingMonitorTest
private CassandraRing<RingInstance> buildRing(int initialToken)
{
- return RingUtils.buildRing(initialToken, "dev3", "dev3", "DEV",
"test", 3);
+ return RingUtils.buildRing(initialToken, "DEV", "test", 3);
}
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 7d1908b..b6d3ff5 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -33,9 +33,11 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
@@ -50,6 +52,7 @@ import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.NotNull;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -61,6 +64,7 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
private static final long serialVersionUID = -2912371629236770646L;
private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED;
private ConsistencyLevel.CL consistencyLevel;
+ private int sstableDataSizeInMB = 128;
public interface CommitResultSupplier extends BiFunction<List<String>,
String, RemoteCommitResult>
{
@@ -96,9 +100,9 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
this.cassandraVersion = cassandraVersion;
this.consistencyLevel = consistencyLevel;
validPair = TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns(
- new String[]{"id", "date", "course", "marks"},
- new
org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType,
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
- new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE),
mockCqlType(VARCHAR), mockCqlType(INT)});
+ new String[]{"id", "date", "course", "marks"},
+ new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType,
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
+ new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE),
mockCqlType(VARCHAR), mockCqlType(INT)});
StructType validDataFrameSchema = validPair.getKey();
ImmutableMap<String, CqlField.CqlType> validCqlColumns =
validPair.getValue();
String[] partitionKeyColumns = {"id", "date"};
@@ -173,6 +177,7 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
return "DC1";
}
+ @NotNull
@Override
public RowBufferMode getRowBufferMode()
{
@@ -187,13 +192,19 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
@Override
public int getSstableDataSizeInMB()
{
- return 128;
+ return sstableDataSizeInMB;
+ }
+
+ @VisibleForTesting
+ void setSstableDataSizeInMB(int sstableDataSizeInMB)
+ {
+ this.sstableDataSizeInMB = sstableDataSizeInMB;
}
@Override
public int getSstableBatchSize()
{
- return 1;
+ return 2;
}
@Override
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
index 1d1c167..1be5c2c 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.spark.bulkwriter;
import java.nio.file.Path;
+import org.jetbrains.annotations.NotNull;
+
class NonValidatingTestSSTableWriter extends SSTableWriter
{
NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path)
@@ -29,7 +31,7 @@ class NonValidatingTestSSTableWriter extends SSTableWriter
}
@Override
- public void validateSSTables(BulkWriterContext writerContext, int
partitionId)
+ public void validateSSTables(@NotNull BulkWriterContext writerContext, int
partitionId)
{
// Skip validation for these tests
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 4dd8841..940f5a2 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -31,9 +31,11 @@ import java.util.stream.Stream;
import com.google.common.collect.Range;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.apache.cassandra.bridge.RowBufferMode;
import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import scala.Tuple2;
@@ -68,7 +70,7 @@ public class RecordWriterTest
public void setUp()
{
tw = new MockTableWriter(folder);
- ring = RingUtils.buildRing(0, "app", "cluster", "DC1", "test", 12);
+ ring = RingUtils.buildRing(0, "DC1", "test", 12);
writerContext = new MockBulkWriterContext(ring);
tc = new TestTaskContext();
range =
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
@@ -85,7 +87,7 @@ public class RecordWriterTest
@Test
public void testWriteWithConstantTTL()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
}
@@ -93,7 +95,7 @@ public class RecordWriterTest
@Test
public void testWriteWithTTLColumn()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, false);
String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
@@ -102,7 +104,7 @@ public class RecordWriterTest
@Test
public void testWriteWithConstantTimestamp()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
}
@@ -110,7 +112,7 @@ public class RecordWriterTest
@Test
public void testWriteWithTimestampColumn()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, true);
String[] columnNamesWithTimestamp = {"id", "date", "course", "marks",
"timestamp"};
validateSuccessfulWrite(bulkWriterContext, data,
columnNamesWithTimestamp);
@@ -119,7 +121,7 @@ public class RecordWriterTest
@Test
public void testWriteWithTimestampAndTTLColumn()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, true);
String[] columnNames = {"id", "date", "course", "marks", "ttl",
"timestamp"};
validateSuccessfulWrite(bulkWriterContext, data, columnNames);
@@ -128,7 +130,7 @@ public class RecordWriterTest
@Test
public void testCorruptSSTable()
{
- rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc,
path) -> new SSTableWriter(tw.setOutDir(path), path));
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc,
path) -> new SSTableWriter(tw.setOutDir(path), path));
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
// TODO: Add better error handling with human-readable exception
messages in SSTableReader::new
// That way we can assert on the exception thrown here
@@ -143,7 +145,7 @@ public class RecordWriterTest
RuntimeException ex = assertThrows(RuntimeException.class, () ->
rw.write(data));
assertThat(ex.getMessage(),
matchesPattern("java.lang.IllegalStateException: Received
Token "
- + "5765203080415074583 outside of expected range
\\[-9223372036854775808(‥|..)0]"));
+ + "5765203080415074583 outside of expected
range \\[-9223372036854775808(‥|..)0]"));
}
@Test
@@ -180,17 +182,46 @@ public class RecordWriterTest
rw.write(data);
}
+ @DisplayName("Write 20 rows, in unbuffered mode with BATCH_SIZE of 2")
+ @Test()
+ void writeUnbuffered()
+ {
+ int numberOfRows = 20;
+ Iterator<Tuple2<DecoratedKey, Object[]>> data =
generateData(numberOfRows, true);
+ validateSuccessfulWrite(writerContext, data, COLUMN_NAMES, (int)
Math.ceil(numberOfRows / writerContext.getSstableBatchSize()));
+ }
+
+ @DisplayName("Write 20 rows, in buffered mode with SSTABLE_DATA_SIZE_IN_MB
of 10")
+ @Test()
+ void writeBuffered()
+ {
+ int numberOfRows = 20;
+ Iterator<Tuple2<DecoratedKey, Object[]>> data =
generateData(numberOfRows, true);
+ writerContext.setRowBufferMode(RowBufferMode.BUFFERED);
+ writerContext.setSstableDataSizeInMB(10);
+ // only a single data sstable file is created
+ validateSuccessfulWrite(writerContext, data, COLUMN_NAMES, 1);
+ }
+
private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
Iterator<Tuple2<DecoratedKey,
Object[]>> data,
String[] columnNames)
+ {
+ validateSuccessfulWrite(writerContext, data, columnNames,
UPLOADED_TABLES);
+ }
+
+ private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
+ Iterator<Tuple2<DecoratedKey,
Object[]>> data,
+ String[] columnNames,
+ int uploadedTables)
{
RecordWriter rw = new RecordWriter(writerContext, columnNames, () ->
tc, SSTableWriter::new);
rw.write(data);
Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
- assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+ assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * uploadedTables));
List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
- for (UploadRequest ur: requests)
+ for (UploadRequest ur : requests)
{
assertNotNull(ur.fileHash);
}
@@ -200,6 +231,7 @@ public class RecordWriterTest
{
return generateData(numValues, onlyInRange, false, false);
}
+
private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int
numValues, boolean onlyInRange, boolean withTTL, boolean withTimestamp)
{
Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0,
integer -> integer + 1).mapToObj(index -> {
@@ -226,6 +258,12 @@ public class RecordWriterTest
{
source = source.filter(val -> range.contains(val._1.getToken()));
}
- return source.limit(numValues).iterator();
+ Stream<Tuple2<DecoratedKey, Object[]>> limitedStream =
source.limit(numValues);
+ if (onlyInRange)
+ {
+ return limitedStream.sorted((o1, o2) -> o1._1.compareTo(o2._1))
+ .iterator();
+ }
+ return limitedStream.iterator();
}
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
index 4916cb8..12207dd 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingUtils.java
@@ -40,30 +40,24 @@ public final class RingUtils
@NotNull
static CassandraRing<RingInstance> buildRing(int initialToken,
- String app,
- String cluster,
String dataCenter,
String keyspace)
{
- return buildRing(initialToken, app, cluster, dataCenter, keyspace, 3);
+ return buildRing(initialToken, dataCenter, keyspace, 3);
}
@NotNull
public static CassandraRing<RingInstance> buildRing(int initialToken,
- String app,
- String cluster,
String dataCenter,
String keyspace,
int instancesPerDC)
{
ImmutableMap<String, Integer> rfByDC = ImmutableMap.of(dataCenter, 3);
- return buildRing(initialToken, app, cluster, rfByDC, keyspace,
instancesPerDC);
+ return buildRing(initialToken, rfByDC, keyspace, instancesPerDC);
}
@NotNull
static CassandraRing<RingInstance> buildRing(int initialToken,
- String app,
- String cluster,
ImmutableMap<String, Integer>
rfByDC,
String keyspace,
int instancesPerDC)
@@ -77,7 +71,7 @@ public final class RingUtils
private static ReplicationFactor getReplicationFactor(Map<String, Integer>
rfByDC)
{
ImmutableMap.Builder<String, String> optionsBuilder =
ImmutableMap.<String, String>builder()
- .put("class",
"org.apache.cassandra.locator.NetworkTopologyStrategy");
+
.put("class", "org.apache.cassandra.locator.NetworkTopologyStrategy");
rfByDC.forEach((key, value) -> optionsBuilder.put(key,
value.toString()));
return new ReplicationFactor(optionsBuilder.build());
}
@@ -93,17 +87,17 @@ public final class RingUtils
for (int instance = 0; instance < instancesPerDc; instance++)
{
instances.add(new RingInstance(new RingEntry.Builder()
- .address("127.0." + dcOffset + "." + instance)
- .datacenter(datacenter)
- .load("0")
- .token(Integer.toString(initialToken + dcOffset +
100_000 * instance))
- .fqdn(datacenter + "-i" + instance)
- .rack("Rack")
- .hostId("")
- .status("UP")
- .state("NORMAL")
- .owns("")
- .build()));
+ .address("127.0." + dcOffset +
"." + instance)
+ .datacenter(datacenter)
+ .load("0")
+
.token(Integer.toString(initialToken + dcOffset + 100_000 * instance))
+ .fqdn(datacenter + "-i" +
instance)
+ .rack("Rack")
+ .hostId("")
+ .status("UP")
+ .state("NORMAL")
+ .owns("")
+ .build()));
}
dcOffset++;
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
index 18eb482..c9f88b1 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
@@ -74,7 +74,7 @@ public class SSTableWriterTest
}
@NotNull
- public CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", "DC1", "test", 12); // CHECKSTYLE IGNORE: Public mutable field
+ public CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1",
"test", 12); // CHECKSTYLE IGNORE: Public mutable field
@TempDir
public Path tmpDir; // CHECKSTYLE IGNORE: Public mutable field for testing
@@ -91,8 +91,8 @@ public class SSTableWriterTest
try (DirectoryStream<Path> dataFileStream =
Files.newDirectoryStream(tw.getOutDir(), "*Data.db"))
{
dataFileStream.forEach(dataFilePath ->
-
assertEquals(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion(),
-
SSTables.cassandraVersionFromTable(dataFilePath).getMajorVersion()));
+
assertEquals(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion(),
+
SSTables.cassandraVersionFromTable(dataFilePath).getMajorVersion()));
}
tw.validateSSTables(writerContext, 1);
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 642181d..727598b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -60,8 +60,6 @@ public class StreamSessionConsistencyTest
private static final Range<BigInteger> RANGE =
Range.range(BigInteger.valueOf(101L), BoundType.CLOSED,
BigInteger.valueOf(199L), BoundType.CLOSED);
private static final CassandraRing<RingInstance> RING =
RingUtils.buildRing(0,
-
"app",
-
"cluster",
ImmutableMap.of("DC1", 3, "DC2", 3),
"test",
6);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
index 58805e8..92a6763 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
@@ -72,7 +72,7 @@ public class StreamSessionTest
public void setup()
{
range = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED,
BigInteger.valueOf(199L), BoundType.CLOSED);
- ring = RingUtils.buildRing(0, "app", "cluster", "DC1", "test", 12);
+ ring = RingUtils.buildRing(0, "DC1", "test", 12);
writerContext = getBulkWriterContext();
tableWriter = new MockTableWriter(folder);
executor = new MockScheduledExecutorService();
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
index 6940e50..cea354f 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java
@@ -44,7 +44,7 @@ public class TokenPartitionerTest
@Test
public void testOneSplit()
{
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", "DC1", "test");
+ CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1",
"test");
partitioner = new TokenPartitioner(ring, 1, 2, 1, false);
assertEquals(4, partitioner.numPartitions());
assertEquals(0, getPartitionForToken(new
BigInteger("-9223372036854775808")));
@@ -58,7 +58,7 @@ public class TokenPartitionerTest
@Test
public void testTwoSplits()
{
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", "DC1", "test");
+ CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1",
"test");
partitioner = new TokenPartitioner(ring, 2, 2, 1, false);
assertEquals(10, partitioner.numPartitions());
assertEquals(0, getPartitionForToken(new
BigInteger("-4611686018427387905")));
@@ -90,7 +90,7 @@ public class TokenPartitionerTest
@Test
public void testReplicationFactorInOneDCOnly()
{
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", ImmutableMap.of("DC1", 3, "DC2", 0), "test", 3);
+ CassandraRing<RingInstance> ring = RingUtils.buildRing(0,
ImmutableMap.of("DC1", 3, "DC2", 0), "test", 3);
partitioner = new TokenPartitioner(ring, 1, 2, 1, false);
assertEquals(4, partitioner.numPartitions());
assertEquals(0, getPartitionForToken(new
BigInteger("-9223372036854775808")));
@@ -104,7 +104,7 @@ public class TokenPartitionerTest
@Test
public void testSplitCalculationsUsingCores()
{
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", "DC1", "test");
+ CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1",
"test");
// When passed "-1" for numberSplits, the token partitioner should
calculate it on its own based on
// the number of cores. This ring has 4 ranges when no splits are
used, therefore we expect the number
// of splits to be 25 for 100 cores and a default parallelism of 50
(as we take the max of the two).
@@ -117,7 +117,7 @@ public class TokenPartitionerTest
@Test
public void testSplitCalculationsUsingDefaultParallelism()
{
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", "DC1", "test");
+ CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1",
"test");
// When passed "-1" for numberSplits, the token partitioner should
calculate it on its own based on
// the number of cores. This ring has 4 ranges when no splits are
used, therefore we expect the number
// of splits to be 50 for 100 cores and a default parallelism of 200
(as we take the max of the two).
@@ -136,7 +136,7 @@ public class TokenPartitionerTest
.put("DC3", 3)
.put("DC4", 3)
.build();
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "app",
"cluster", dcMap, "test", 20);
+ CassandraRing<RingInstance> ring = RingUtils.buildRing(0, dcMap,
"test", 20);
assertEquals(80, ring.getInstances().size());
partitioner = new TokenPartitioner(ring, -1, 1, 750, false);
assertEquals(10, partitioner.numSplits());
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 81e11b7..907c656 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -385,7 +385,7 @@ public abstract class CassandraBridge
String partitioner,
String createStatement,
String insertStatement,
- boolean isSorted,
+ RowBufferMode rowBufferMode,
int bufferSizeMB);
// CDC Configuration
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
similarity index 51%
rename from
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
rename to
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
index 5d30ad8..70be08c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
@@ -17,10 +17,25 @@
* under the License.
*/
-package org.apache.cassandra.spark.bulkwriter;
+package org.apache.cassandra.bridge;
+/**
+ * Configures how data is flushed to an SSTable
+ */
public enum RowBufferMode
{
+ /**
+ * In this mode, Cassandra will flush an SSTable to disk once it reaches
the configured BufferSizeInMB.
+ * This parameter is configured by the user-configurable
SSTABLE_DATA_SIZE_IN_MB WriterOption. Note:
+ * This is the uncompressed size of data before being written to disk, and
the actual size of an SSTable
+ * can be smaller based on the compression configuration for the SSTable
and how compressible the data is.
+ */
BUFFERED,
+
+ /**
+ * Cassandra expects rows in sorted order and will not flush an SSTable
automatically. The size of an
+ * SSTable is based on the number of rows we write to the SSTable. This
parameter is configured by the
+ * user-configurable BATCH_SIZE WriterOption.
+ */
UNBUFFERED
}
diff --git a/cassandra-four-zero/build.gradle b/cassandra-four-zero/build.gradle
index 0e85e06..01bbe7c 100644
--- a/cassandra-four-zero/build.gradle
+++ b/cassandra-four-zero/build.gradle
@@ -77,9 +77,14 @@ project(':cassandra-four-zero') {
exclude('com/google/common/collect/Range*')
}
- task javadocJar(type: Jar, dependsOn: javadoc) {
- classifier = 'javadoc'
- from javadoc.destinationDir
+ tasks.register('javadocJar', Jar) {
+ dependsOn javadoc
+ classifier = 'javadoc'
+ from javadoc.destinationDir
+ }
+
+ test {
+ useJUnitPlatform()
}
artifacts {
diff --git
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 7bdd4c1..9b50486 100644
---
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -623,10 +623,10 @@ public class CassandraBridgeImplementation extends
CassandraBridge
String partitioner,
String createStatement,
String insertStatement,
- boolean isSorted,
+ RowBufferMode rowBufferMode,
int bufferSizeMB)
{
- return new SSTableWriterImplementation(inDirectory, partitioner,
createStatement, insertStatement, isSorted, bufferSizeMB);
+ return new SSTableWriterImplementation(inDirectory, partitioner,
createStatement, insertStatement, rowBufferMode, bufferSizeMB);
}
// CDC Configuration
diff --git
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index 05fb8bd..facfad6 100644
---
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -22,6 +22,8 @@ package org.apache.cassandra.bridge;
import java.io.IOException;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.config.Config;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -42,28 +44,25 @@ public class SSTableWriterImplementation implements
SSTableWriter
String partitioner,
String createStatement,
String insertStatement,
- boolean isSorted,
+ RowBufferMode rowBufferMode,
int bufferSizeMB)
{
IPartitioner cassPartitioner =
partitioner.toLowerCase().contains("random") ? new RandomPartitioner()
: new Murmur3Partitioner();
- CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
-
.inDirectory(inDirectory)
-
.forTable(createStatement)
-
.withPartitioner(cassPartitioner)
-
.using(insertStatement)
-
.withBufferSizeInMB(bufferSizeMB);
- if (isSorted)
- {
- builder.sorted();
- }
+
+ CQLSSTableWriter.Builder builder = configureBuilder(inDirectory,
+ createStatement,
+ insertStatement,
+ rowBufferMode,
+ bufferSizeMB,
+ cassPartitioner);
// TODO: Remove me once CQLSSTableWriter.Builder synchronize on schema
(see CASSANDRA-TBD)
// build update schema, we need to synchronize
writer = CassandraSchema.apply(s -> builder.build());
}
@Override
- public void addRow(Map<String, Object> values) throws IOException
+ public void addRow(Map<String, Object> values) throws IOException
{
try
{
@@ -80,4 +79,28 @@ public class SSTableWriterImplementation implements
SSTableWriter
{
writer.close();
}
+
+ @VisibleForTesting
+ static CQLSSTableWriter.Builder configureBuilder(String inDirectory,
+ String createStatement,
+ String insertStatement,
+ RowBufferMode
rowBufferMode,
+ int bufferSizeMB,
+ IPartitioner
cassPartitioner)
+ {
+ CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
+
.inDirectory(inDirectory)
+
.forTable(createStatement)
+
.withPartitioner(cassPartitioner)
+
.using(insertStatement);
+ if (rowBufferMode == RowBufferMode.UNBUFFERED)
+ {
+ builder.sorted();
+ }
+ else if (rowBufferMode == RowBufferMode.BUFFERED)
+ {
+ builder.withBufferSizeInMB(bufferSizeMB);
+ }
+ return builder;
+ }
}
diff --git
a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
new file mode 100644
index 0000000..6ede1d0
--- /dev/null
+++
b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.io.File;
+import java.lang.reflect.Field;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.utils.ReflectionUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for configuring {@link SSTableWriterImplementation}
+ */
+class SSTableWriterImplementationTest
+{
+ public static final String CREATE_STATEMENT = "CREATE TABLE
test_keyspace.test_table (a int, b text)";
+ public static final String INSERT_STATEMENT = "INSERT INTO
test_keyspace.test_table (a, b) VALUES (?, ?)";
+
+ @TempDir
+ File writeDirectory;
+
+ @Test
+ void testUnbufferedRowBufferModeConfiguration() throws
NoSuchFieldException, IllegalAccessException
+ {
+ CQLSSTableWriter.Builder builder =
SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(),
+
CREATE_STATEMENT,
+
INSERT_STATEMENT,
+
RowBufferMode.UNBUFFERED,
+
250,
+
new Murmur3Partitioner());
+
+
+ assertTrue(peekSorted(builder));
+ assertNotEquals(250, peekBufferSizeInMB(builder)); // 250 should not
be set
+ }
+
+ @Test
+ void testBufferedRowBufferModeConfiguration() throws NoSuchFieldException,
IllegalAccessException
+ {
+ CQLSSTableWriter.Builder builder =
SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(),
+
CREATE_STATEMENT,
+
INSERT_STATEMENT,
+
RowBufferMode.BUFFERED,
+
250,
+
new Murmur3Partitioner());
+
+
+ assertFalse(peekSorted(builder));
+ assertEquals(250, peekBufferSizeInMB(builder));
+ }
+
+ static boolean peekSorted(CQLSSTableWriter.Builder builder) throws
NoSuchFieldException, IllegalAccessException
+ {
+ Field sortedField = ReflectionUtils.getField(builder.getClass(),
"sorted");
+ sortedField.setAccessible(true);
+ return (boolean) sortedField.get(builder);
+ }
+
+ static long peekBufferSizeInMB(CQLSSTableWriter.Builder builder) throws
NoSuchFieldException, IllegalAccessException
+ {
+ Field bufferSizeInMBField;
+ try
+ {
+ bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(),
"bufferSizeInMB");
+ }
+ catch (NoSuchFieldException noSuchFieldException)
+ {
+ // The bufferSizeInMB field has been renamed to bufferSizeInMiB in
trunk, so we expect this to
+ // fail at some point, and we have a way to recover from the
failure without causing the test
+ // to fail.
+ bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(),
"bufferSizeInMiB");
+ }
+
+ bufferSizeInMBField.setAccessible(true);
+ return (long) bufferSizeInMBField.get(builder);
+ }
+}
diff --git
a/cassandra-four-zero/src/test/java/org/apache/cassandra/utils/ReflectionUtils.java
b/cassandra-four-zero/src/test/java/org/apache/cassandra/utils/ReflectionUtils.java
new file mode 100644
index 0000000..b71de0f
--- /dev/null
+++
b/cassandra-four-zero/src/test/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+/**
+ * Reflection utilities used for testing
+ */
+public final class ReflectionUtils
+{
+ private ReflectionUtils()
+ {
+ throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
+ }
+
+ public static Field getField(Class<?> clazz, String fieldName) throws
NoSuchFieldException
+ {
+ // below code works before Java 12
+ try
+ {
+ return clazz.getDeclaredField(fieldName);
+ }
+ catch (NoSuchFieldException e)
+ {
+ // this is mitigation for JDK 17
(https://bugs.openjdk.org/browse/JDK-8210522)
+ try
+ {
+ Method getDeclaredFields0 =
Class.class.getDeclaredMethod("getDeclaredFields0", boolean.class);
+ getDeclaredFields0.setAccessible(true);
+ Field[] fields = (Field[]) getDeclaredFields0.invoke(clazz,
false);
+ for (Field field : fields)
+ {
+ if (fieldName.equals(field.getName()))
+ {
+ return field;
+ }
+ }
+ }
+ catch (ReflectiveOperationException ex)
+ {
+ e.addSuppressed(ex);
+ }
+ throw e;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]