This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new dc0e79b CASSANDRA-19369 Use XXHash32 for digest calculation of SSTables dc0e79b is described below commit dc0e79b9c483562ec0920d69e886715eb329c426 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Wed Jan 31 13:44:23 2024 -0800 CASSANDRA-19369 Use XXHash32 for digest calculation of SSTables This commit adds the ability to use the newly supported in Cassandra Sidecar XXhash32 digest algorithm. The commit allows for backwards compatibility to perform MD5 checksumming, but it now defaults to XXHash32. A new Writer option is added: ``` .option(WriterOptions.DIGEST.name(), "XXHASH32") // or .option(WriterOptions.DIGEST.name(), "MD5") ``` This option defaults to XXHash32, when not provided, but it can be configured to use the legacy MD5 algorithm. Path by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19369 --- CHANGES.txt | 1 + cassandra-analytics-core/build.gradle | 1 + .../cassandra/spark/bulkwriter/BulkSparkConf.java | 16 ++- .../spark/bulkwriter/CassandraJobInfo.java | 7 ++ .../spark/bulkwriter/DataTransferApi.java | 4 +- .../spark/bulkwriter/DigestAlgorithmSupplier.java} | 19 +--- .../spark/bulkwriter/DigestAlgorithms.java} | 39 +++++-- .../apache/cassandra/spark/bulkwriter/JobInfo.java | 6 + .../cassandra/spark/bulkwriter/RecordWriter.java | 36 ++++-- .../cassandra/spark/bulkwriter/SSTableWriter.java | 42 ++++--- .../spark/bulkwriter/SidecarDataTransferApi.java | 7 +- .../cassandra/spark/bulkwriter/StreamSession.java | 19 ++-- .../cassandra/spark/bulkwriter/WriterOptions.java | 5 + .../spark/common/{MD5Hash.java => Digest.java} | 31 ++---- .../spark/common/{MD5Hash.java => MD5Digest.java} | 44 ++++++-- .../cassandra/spark/common/XXHash32Digest.java | 79 +++++++++++++ .../cassandra/spark/utils/DigestAlgorithm.java} | 28 ++--- .../cassandra/spark/utils/MD5DigestAlgorithm.java | 55 ++++++++++ .../spark/utils/XXHash32DigestAlgorithm.java | 67 +++++++++++ .../spark/bulkwriter/MockBulkWriterContext.java | 14 ++- .../bulkwriter/NonValidatingTestSSTableWriter.java | 5 +- .../spark/bulkwriter/RecordWriterTest.java | 24 ++-- .../spark/bulkwriter/SSTableWriterTest.java | 8 +- .../bulkwriter/StreamSessionConsistencyTest.java | 10 +- .../spark/bulkwriter/StreamSessionTest.java | 18 +-- .../cassandra/spark/bulkwriter/UploadRequest.java | 14 +-- .../spark/utils/MD5DigestAlgorithmTest.java | 68 ++++++++++++ .../cassandra/spark/utils/ResourceUtils.java | 79 +++++++++++++ ...pTests.java => SSTableInputStreamHttpTest.java} | 10 +- .../spark/utils/XXHash32DigestAlgorithmTest.java | 71 ++++++++++++ .../src/test/resources/digest/file1.txt | 1 + .../src/test/resources/digest/file2.txt | 1 + .../src/test/resources/digest/file3.txt | 1 + .../distributed/impl/AbstractClusterUtils.java | 2 +- .../testing/SharedClusterIntegrationTestBase.java | 24 ++++ .../analytics/QuoteIdentifiersWriteTest.java | 91 ++------------- .../apache/cassandra/analytics/SparkTestUtils.java | 34 ++++++ .../analytics/WriterDigestIntegrationTest.java | 122 +++++++++++++++++++++ .../org/apache/cassandra/spark/utils/MapUtils.java | 2 +- scripts/build-sidecar.sh | 2 +- 40 files changed, 872 insertions(+), 235 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e0ff584..9daa5f0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Use XXHash32 for digest calculation of SSTables (CASSANDRA-19369) * Startup Validation Failures when Checking Sidecar Connectivity (CASSANDRA-19377) * No longer need to synchronize on Schema.instance after Cassandra 4.0.12 (CASSANDRA-19351) * Upgrade to Cassandra 4.0.12 and remove RowBufferMode and BatchSize options (CASSANDRA-19334) diff --git a/cassandra-analytics-core/build.gradle b/cassandra-analytics-core/build.gradle index 9228741..e015a27 100644 --- a/cassandra-analytics-core/build.gradle +++ b/cassandra-analytics-core/build.gradle @@ -58,6 +58,7 @@ project(':cassandra-analytics-core') { // This dependency must be built by running `scripts/build-dependencies.sh` implementation(group: "${sidecarClientGroup}", name: "${sidecarClientName}", version: "${sidecarVersion}") + implementation(group: 'org.lz4', name: 'lz4-java', version: '1.8.0') if ("${scalaMajorVersion}" == "2.11") { implementation(group: 'org.scala-lang.modules', name: "scala-java8-compat_2.11", version: '1.0.1', transitive: false) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 022c1db..f079558 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -130,6 +130,7 @@ public class BulkSparkConf implements Serializable protected boolean useOpenSsl; protected int ringRetryCount; protected final Set<String> blockedInstances; + protected final DigestAlgorithmSupplier digestAlgorithmSupplier; public BulkSparkConf(SparkConf conf, Map<String, String> options) { @@ -157,6 +158,7 @@ public class BulkSparkConf implements Serializable this.truststoreBase64Encoded = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_BASE64_ENCODED.name(), null); this.truststoreType = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_TYPE.name(), null); this.writeMode = MapUtils.getEnumOption(options, WriterOptions.WRITE_MODE.name(), WriteMode.INSERT, "write mode"); + this.digestAlgorithmSupplier = digestAlgorithmSupplierFromOptions(options); // For backwards-compatibility with port settings, use writer option if available, // else fall back to props, and then default if neither specified this.useOpenSsl = getBoolean(USE_OPENSSL, true); @@ -168,7 +170,19 @@ public class BulkSparkConf implements Serializable validateEnvironment(); } - private Set<String> buildBlockedInstances(Map<String, String> options) + /** + * Returns the supplier for the digest algorithm from the configured {@code options}. + * + * @param options a key-value map with options for the bulk write job + * @return the configured {@link DigestAlgorithmSupplier} + */ + @NotNull + protected DigestAlgorithmSupplier digestAlgorithmSupplierFromOptions(Map<String, String> options) + { + return MapUtils.getEnumOption(options, WriterOptions.DIGEST.name(), DigestAlgorithms.XXHASH32, "digest type"); + } + + protected Set<String> buildBlockedInstances(Map<String, String> options) { String blockedInstances = MapUtils.getOrDefault(options, WriterOptions.BLOCKED_CASSANDRA_INSTANCES.name(), ""); return Arrays.stream(blockedInstances.split(",")) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java index b385289..f91b495 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java @@ -109,4 +109,11 @@ public class CassandraJobInfo implements JobInfo { return conf.table; } + + @NotNull + @Override + public DigestAlgorithmSupplier digestAlgorithmSupplier() + { + return conf.digestAlgorithmSupplier; + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java index c0786a2..f36ed73 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DataTransferApi.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.nio.file.Path; import java.util.List; -import org.apache.cassandra.spark.common.MD5Hash; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.client.ClientException; import org.apache.cassandra.spark.common.model.CassandraInstance; import org.jetbrains.annotations.Nullable; @@ -59,5 +59,5 @@ public interface DataTransferApi extends Serializable int ssTableIdx, CassandraInstance instance, String sessionID, - MD5Hash fileHash) throws ClientException; + Digest digest) throws ClientException; } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithmSupplier.java similarity index 67% copy from cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java copy to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithmSupplier.java index 1be5c2c..89c4772 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithmSupplier.java @@ -19,20 +19,13 @@ package org.apache.cassandra.spark.bulkwriter; -import java.nio.file.Path; +import java.util.function.Supplier; -import org.jetbrains.annotations.NotNull; +import org.apache.cassandra.spark.utils.DigestAlgorithm; -class NonValidatingTestSSTableWriter extends SSTableWriter +/** + * An interface that defines a {@link DigestAlgorithm} for a concrete digest type + */ +public interface DigestAlgorithmSupplier extends Supplier<DigestAlgorithm> { - NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path) - { - super(tableWriter, path); - } - - @Override - public void validateSSTables(@NotNull BulkWriterContext writerContext, int partitionId) - { - // Skip validation for these tests - } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithms.java similarity index 50% copy from cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java copy to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithms.java index 1be5c2c..3a70e89 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DigestAlgorithms.java @@ -19,20 +19,37 @@ package org.apache.cassandra.spark.bulkwriter; -import java.nio.file.Path; +import org.apache.cassandra.spark.utils.DigestAlgorithm; +import org.apache.cassandra.spark.utils.MD5DigestAlgorithm; +import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; -import org.jetbrains.annotations.NotNull; - -class NonValidatingTestSSTableWriter extends SSTableWriter +/** + * Represents the user-provided digest type configuration to be used to validate SSTable files during bulk writes + */ +public enum DigestAlgorithms implements DigestAlgorithmSupplier { - NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path) + /** + * Represents an MD5 digest type option. This option is supported for legacy reasons, but its use + * is strongly discouraged. + */ + MD5 { - super(tableWriter, path); - } + @Override + public DigestAlgorithm get() + { + return new MD5DigestAlgorithm(); + } + }, - @Override - public void validateSSTables(@NotNull BulkWriterContext writerContext, int partitionId) + /** + * Represents an xxhash32 digest type option + */ + XXHASH32 { - // Skip validation for these tests - } + @Override + public DigestAlgorithm get() + { + return new XXHash32DigestAlgorithm(); + } + }; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java index 91da29a..10635b1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java @@ -60,4 +60,10 @@ public interface JobInfo extends Serializable } boolean getSkipClean(); + + /** + * @return the digest type provider for the bulk job, and used to calculate digests for SSTable components + */ + @NotNull + DigestAlgorithmSupplier digestAlgorithmSupplier(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index 908ecba..5ea1205 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.sidecar.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.apache.spark.InterruptibleIterator; import org.apache.spark.TaskContext; import scala.Tuple2; @@ -62,14 +62,16 @@ import static org.apache.cassandra.spark.utils.ScalaConversionUtils.asScalaItera public class RecordWriter implements Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(RecordWriter.class); - + private static final long serialVersionUID = 3746578054834640428L; private final BulkWriterContext writerContext; private final String[] columnNames; - private final BiFunction<BulkWriterContext, Path, SSTableWriter> tableWriterSupplier; + private final SSTableWriterFactory tableWriterFactory; + private final DigestAlgorithm digestAlgorithm; + private final BulkWriteValidator writeValidator; private final ReplicaAwareFailureHandler<RingInstance> failureHandler; - private Supplier<TaskContext> taskContextSupplier; + private final Supplier<TaskContext> taskContextSupplier; private SSTableWriter sstableWriter = null; private int outputSequence = 0; // sub-folder for possible subrange splits @@ -82,14 +84,15 @@ public class RecordWriter implements Serializable RecordWriter(BulkWriterContext writerContext, String[] columnNames, Supplier<TaskContext> taskContextSupplier, - BiFunction<BulkWriterContext, Path, SSTableWriter> tableWriterSupplier) + SSTableWriterFactory tableWriterFactory) { this.writerContext = writerContext; this.columnNames = columnNames; this.taskContextSupplier = taskContextSupplier; - this.tableWriterSupplier = tableWriterSupplier; + this.tableWriterFactory = tableWriterFactory; this.failureHandler = new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); + this.digestAlgorithm = this.writerContext.job().digestAlgorithmSupplier().get(); writerContext.cluster().startupValidate(); } @@ -360,7 +363,7 @@ public class RecordWriter implements Serializable Path outDir = Paths.get(baseDir.toString(), Integer.toString(outputSequence++)); Files.createDirectories(outDir); - sstableWriter = tableWriterSupplier.apply(writerContext, outDir); + sstableWriter = tableWriterFactory.create(writerContext, outDir, digestAlgorithm); LOGGER.info("[{}] Created new SSTable writer", partitionId); } } @@ -400,4 +403,23 @@ public class RecordWriter implements Serializable LOGGER.info("[{}] Creating stream session for range={}", taskContext.partitionId(), tokenRange); return new StreamSession(writerContext, getStreamId(taskContext), tokenRange, failureHandler); } + + /** + * Functional interface that helps with creating {@link SSTableWriter} instances. + */ + public interface SSTableWriterFactory + { + /** + * Creates a new instance of the {@link SSTableWriter} with the provided {@code writerContext}, + * {@code outDir}, and {@code digestProvider} parameters. + * + * @param writerContext the context for the bulk writer job + * @param outDir an output directory where SSTables components will be written to + * @param digestAlgorithm a digest provider to calculate digests for every SSTable component + * @return a new {@link SSTableWriter} + */ + SSTableWriter create(BulkWriterContext writerContext, + Path outDir, + DigestAlgorithm digestAlgorithm); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java index 18021f2..acffca0 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java @@ -20,31 +20,29 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.IOException; -import java.io.InputStream; import java.math.BigInteger; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.security.MessageDigest; import java.util.Collections; import java.util.HashMap; import java.util.Map; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Range; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.bridge.CassandraVersionFeatures; -import org.apache.cassandra.spark.common.MD5Hash; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.SSTables; import org.apache.cassandra.spark.data.DataLayer; import org.apache.cassandra.spark.data.LocalDataLayer; import org.apache.cassandra.spark.reader.Rid; import org.apache.cassandra.spark.reader.StreamScanner; +import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.jetbrains.annotations.NotNull; @SuppressWarnings("WeakerAccess") @@ -58,17 +56,21 @@ public class SSTableWriter private final org.apache.cassandra.bridge.SSTableWriter cqlSSTableWriter; private BigInteger minToken = null; private BigInteger maxToken = null; - private final Map<Path, MD5Hash> fileHashes = new HashMap<>(); + private final Map<Path, Digest> fileDigestMap = new HashMap<>(); + private final DigestAlgorithm digestAlgorithm; - public SSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter, Path outDir) + public SSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter, Path outDir, + DigestAlgorithm digestAlgorithm) { cqlSSTableWriter = tableWriter; this.outDir = outDir; + this.digestAlgorithm = digestAlgorithm; } - public SSTableWriter(BulkWriterContext writerContext, Path outDir) + public SSTableWriter(BulkWriterContext writerContext, Path outDir, DigestAlgorithm digestAlgorithm) { this.outDir = outDir; + this.digestAlgorithm = digestAlgorithm; String lowestCassandraVersion = writerContext.cluster().getLowestCassandraVersion(); String packageVersion = getPackageVersion(lowestCassandraVersion); @@ -108,7 +110,7 @@ public class SSTableWriter // NOTE: We calculate file hashes before re-reading so that we know what we hashed // is what we validated. Then we send these along with the files and the // receiving end re-hashes the files to make sure they still match. - fileHashes.putAll(calculateFileHashes(dataFile)); + fileDigestMap.putAll(calculateFileDigestMap(dataFile)); } validateSSTables(writerContext, partitionId); } @@ -146,29 +148,22 @@ public class SSTableWriter return Files.newDirectoryStream(getOutDir(), "*Data.db"); } - private Map<Path, MD5Hash> calculateFileHashes(Path dataFile) throws IOException + private Map<Path, Digest> calculateFileDigestMap(Path dataFile) throws IOException { - Map<Path, MD5Hash> fileHashes = new HashMap<>(); + Map<Path, Digest> fileHashes = new HashMap<>(); try (DirectoryStream<Path> filesToHash = Files.newDirectoryStream(dataFile.getParent(), SSTables.getSSTableBaseName(dataFile) + "*")) { for (Path path : filesToHash) { - fileHashes.put(path, calculateFileHash(path)); + Digest digest = digestAlgorithm.calculateFileDigest(path); + fileHashes.put(path, digest); + LOGGER.debug("Calculated digest={} for path={}", digest, path); } } return fileHashes; } - private MD5Hash calculateFileHash(Path path) throws IOException - { - try (InputStream is = Files.newInputStream(path)) - { - MessageDigest computedMd5 = DigestUtils.updateDigest(DigestUtils.getMd5Digest(), is); - return MD5Hash.fromDigest(computedMd5); - } - } - public Range<BigInteger> getTokenRange() { return Range.closed(minToken, maxToken); @@ -179,8 +174,11 @@ public class SSTableWriter return outDir; } - public Map<Path, MD5Hash> getFileHashes() + /** + * @return a view of the file digest map + */ + public Map<Path, Digest> fileDigestMap() { - return fileHashes; + return Collections.unmodifiableMap(fileDigestMap); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java index b6a492b..46f8f11 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java @@ -31,9 +31,8 @@ import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.request.ImportSSTableRequest; -import org.apache.cassandra.sidecar.common.data.MD5Digest; import org.apache.cassandra.sidecar.common.data.SSTableImportResponse; -import org.apache.cassandra.spark.common.MD5Hash; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.client.ClientException; import org.apache.cassandra.spark.common.model.CassandraInstance; @@ -69,7 +68,7 @@ public class SidecarDataTransferApi implements DataTransferApi int ssTableIdx, CassandraInstance instance, String sessionID, - MD5Hash fileHash) throws ClientException + Digest digest) throws ClientException { String componentName = updateComponentName(componentFile, ssTableIdx); String uploadId = getUploadId(sessionID, job.getId().toString()); @@ -80,7 +79,7 @@ public class SidecarDataTransferApi implements DataTransferApi maybeQuotedIdentifier(bridge, conf.quoteIdentifiers, conf.table), uploadId, componentName, - new MD5Digest(fileHash.toString()), + digest.toSidecarDigest(), componentFile.toAbsolutePath().toString()) .get(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java index 044f1a8..4035c11 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java @@ -48,12 +48,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; -import org.apache.cassandra.spark.common.MD5Hash; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.SSTables; public class StreamSession { private static final Logger LOGGER = LoggerFactory.getLogger(StreamSession.class); + private static final String WRITE_PHASE = "UploadAndCommit"; + private final BulkWriterContext writerContext; private final String sessionID; private final Range<BigInteger> tokenRange; @@ -64,7 +66,6 @@ public class StreamSession private final ExecutorService executor; private final List<Future<?>> futures = new ArrayList<>(); private final TokenRangeMapping<RingInstance> tokenRangeMapping; - private static final String WRITE_PHASE = "UploadAndCommit"; public StreamSession(final BulkWriterContext writerContext, final String sessionID, @@ -244,7 +245,7 @@ public class StreamSession { try { - sendSSTableToReplica(writerContext, dataFile, ssTableIdx, replica, ssTableWriter.getFileHashes()); + sendSSTableToReplica(writerContext, dataFile, ssTableIdx, replica, ssTableWriter.fileDigestMap()); return true; } catch (Exception exception) @@ -273,7 +274,7 @@ public class StreamSession Path dataFile, int ssTableIdx, RingInstance instance, - Map<Path, MD5Hash> fileHashes) throws Exception + Map<Path, Digest> fileDigestMap) throws Exception { try (DirectoryStream<Path> componentFileStream = Files.newDirectoryStream(dataFile.getParent(), SSTables.getSSTableBaseName(dataFile) + "*")) { @@ -283,9 +284,9 @@ public class StreamSession { continue; } - sendSSTableComponent(writerContext, componentFile, ssTableIdx, instance, fileHashes.get(componentFile)); + sendSSTableComponent(writerContext, componentFile, ssTableIdx, instance, fileDigestMap.get(componentFile)); } - sendSSTableComponent(writerContext, dataFile, ssTableIdx, instance, fileHashes.get(dataFile)); + sendSSTableComponent(writerContext, dataFile, ssTableIdx, instance, fileDigestMap.get(dataFile)); } } @@ -293,12 +294,12 @@ public class StreamSession Path componentFile, int ssTableIdx, RingInstance instance, - MD5Hash fileHash) throws Exception + Digest digest) throws Exception { - Preconditions.checkNotNull(fileHash, "All files must have a hash. SSTableWriter should have calculated these. This is a bug."); + Preconditions.checkNotNull(digest, "All files must have a hash. SSTableWriter should have calculated these. This is a bug."); long fileSize = Files.size(componentFile); LOGGER.info("[{}]: Uploading {} to {}: Size is {}", sessionID, componentFile, instance.nodeName(), fileSize); - writerContext.transfer().uploadSSTableComponent(componentFile, ssTableIdx, instance, sessionID, fileHash); + writerContext.transfer().uploadSSTableComponent(componentFile, ssTableIdx, instance, sessionID, digest); } public static void clean(BulkWriterContext writerContext, RingInstance instance, String sessionID) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index 3dd68dd..fd04d97 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -56,4 +56,9 @@ public enum WriterOptions implements WriterOption * the write consistency level validations. */ BLOCKED_CASSANDRA_INSTANCES, + /** + * Option that specifies the type of digest to compute when uploading SSTables for checksum validation. + * If unspecified, it defaults to {@code XXHash32} digests. The legacy {@code MD5} digest is also supported. + */ + DIGEST, } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Digest.java similarity index 66% copy from cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java copy to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Digest.java index df8c29e..07d7c22 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Digest.java @@ -19,25 +19,18 @@ package org.apache.cassandra.spark.common; -import java.security.MessageDigest; -import java.util.Base64; - -public final class MD5Hash +/** + * Interface that represents the computed digest + */ +public interface Digest { - private final String value; - - private MD5Hash(MessageDigest digest) - { - value = Base64.getEncoder().encodeToString(digest.digest()); - } - - public static MD5Hash fromDigest(MessageDigest messageDigest) - { - return new MD5Hash(messageDigest); - } + /** + * @return the string representation of the digest + */ + String value(); - public String toString() - { - return value; - } + /** + * @return the digest translated to Sidecar digest + */ + org.apache.cassandra.sidecar.common.data.Digest toSidecarDigest(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Digest.java similarity index 52% rename from cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java rename to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Digest.java index df8c29e..3c62244 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Hash.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/MD5Digest.java @@ -19,25 +19,51 @@ package org.apache.cassandra.spark.common; -import java.security.MessageDigest; -import java.util.Base64; -public final class MD5Hash +import java.util.Objects; + +import org.jetbrains.annotations.NotNull; + +/** + * An implementation of {@link Digest} that represents an MD5 digest + */ +public class MD5Digest implements Digest { - private final String value; + private final @NotNull String value; + + /** + * Constructs a new MD5Digest with the provided MD5 {@code value} + * + * @param value the MD5 value + */ + public MD5Digest(@NotNull String value) + { + this.value = Objects.requireNonNull(value, "value is required"); + } - private MD5Hash(MessageDigest digest) + /** + * {@inheritDoc} + */ + @Override + public String value() { - value = Base64.getEncoder().encodeToString(digest.digest()); + return value; } - public static MD5Hash fromDigest(MessageDigest messageDigest) + /** + * {@inheritDoc} + */ + @Override + public org.apache.cassandra.sidecar.common.data.Digest toSidecarDigest() { - return new MD5Hash(messageDigest); + return new org.apache.cassandra.sidecar.common.data.MD5Digest(value); } + @Override public String toString() { - return value; + return "MD5Digest{" + + "value='" + value + '\'' + + '}'; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/XXHash32Digest.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/XXHash32Digest.java new file mode 100644 index 0000000..1ad8547 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/XXHash32Digest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.common; + +import java.util.Objects; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of {@link Digest} that represents an XXHash32 digest + */ +public class XXHash32Digest implements Digest +{ + private final @NotNull String value; + private final @NotNull String seedHex; + + /** + * Constructs a new XXHashDigest with the provided XXHash {@code value} and the seed value represented as + * a hexadecimal string + * + * @param value the xxhash value + * @param seed the value of the seed used to calculate the digest + */ + public XXHash32Digest(@NotNull String value, int seed) + { + this.value = Objects.requireNonNull(value, "value is required"); + this.seedHex = Integer.toHexString(seed); + } + + /** + * @return the string representation of the digest + */ + @Override + public String value() + { + return value; + } + + /** + * @return the optional seed in hexadecimal format + */ + public @Nullable String seedHex() + { + return seedHex; + } + + @Override + public org.apache.cassandra.sidecar.common.data.Digest toSidecarDigest() + { + return new org.apache.cassandra.sidecar.common.data.XXHash32Digest(value, seedHex); + } + + @Override + public String toString() + { + return "XXHash32Digest{" + + "value='" + value + '\'' + + ", seedHex='" + seedHex + '\'' + + '}'; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/DigestAlgorithm.java similarity index 59% copy from cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java copy to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/DigestAlgorithm.java index 1be5c2c..dac1254 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/DigestAlgorithm.java @@ -17,22 +17,24 @@ * under the License. */ -package org.apache.cassandra.spark.bulkwriter; +package org.apache.cassandra.spark.utils; +import java.io.IOException; import java.nio.file.Path; -import org.jetbrains.annotations.NotNull; +import org.apache.cassandra.spark.common.Digest; -class NonValidatingTestSSTableWriter extends SSTableWriter +/** + * Interface that computes a {@link Digest} + */ +public interface DigestAlgorithm { - NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path) - { - super(tableWriter, path); - } - - @Override - public void validateSSTables(@NotNull BulkWriterContext writerContext, int partitionId) - { - // Skip validation for these tests - } + /** + * Calculates the {@link Digest} for the given file in the {@code path}. + * + * @param path the path of the file + * @return the calculated digest for the given {@code path} + * @throws IOException when an error occurs while reading the file or calculating the digest + */ + Digest calculateFileDigest(Path path) throws IOException; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithm.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithm.java new file mode 100644 index 0000000..b7665da --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithm.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.util.Base64; + +import org.apache.commons.codec.digest.DigestUtils; + +import org.apache.cassandra.spark.common.Digest; +import org.apache.cassandra.spark.common.MD5Digest; + +/** + * A {@link DigestAlgorithm} implementation that computes MD5 digests + */ +public class MD5DigestAlgorithm implements DigestAlgorithm +{ + /** + * Calculates the {@link Digest} for the given file in the {@code path}. + * + * @param path the path of the file + * @return the calculated digest for the given {@code path} + * @throws IOException when an error occurs while reading the file or calculating the digest + */ + @Override + public Digest calculateFileDigest(Path path) throws IOException + { + try (InputStream is = Files.newInputStream(path)) + { + MessageDigest computedMd5 = DigestUtils.updateDigest(DigestUtils.getMd5Digest(), is); + return new MD5Digest(Base64.getEncoder().encodeToString(computedMd5.digest())); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithm.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithm.java new file mode 100644 index 0000000..724125b --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithm.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; + +import net.jpountz.xxhash.StreamingXXHash32; +import net.jpountz.xxhash.XXHashFactory; +import org.apache.cassandra.spark.common.Digest; +import org.apache.cassandra.spark.common.XXHash32Digest; + +/** + * A {@link DigestAlgorithm} implementation that computes XXHash32 digests + */ +public class XXHash32DigestAlgorithm implements DigestAlgorithm +{ + private static final int KIB_512 = 512 * 1024; + /** + * A seed used to calculate the XXHash32 digest + */ + private static final int SEED = 0; + + /** + * Calculates the {@link org.apache.cassandra.spark.common.XXHash32Digest} for the given file in the {@code path}. + * + * @param path the path of the file + * @return the calculated digest for the given {@code path} + * @throws IOException when an error occurs while reading the file or calculating the digest + */ + @Override + public Digest calculateFileDigest(Path path) throws IOException + { + // might have shared hashers with ThreadLocal + XXHashFactory factory = XXHashFactory.safeInstance(); + try (InputStream inputStream = Files.newInputStream(path); + StreamingXXHash32 hasher = factory.newStreamingHash32(SEED)) + { + int len; + byte[] buffer = new byte[KIB_512]; + while ((len = inputStream.read(buffer)) != -1) + { + hasher.update(buffer, 0, len); + } + return new XXHash32Digest(Long.toHexString(hasher.getValue()), SEED); + } + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index e864dac..e059170 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -43,7 +43,7 @@ import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.sidecar.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; -import org.apache.cassandra.spark.common.MD5Hash; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.client.ClientException; import org.apache.cassandra.spark.common.client.InstanceState; import org.apache.cassandra.spark.common.model.BulkFeatures; @@ -55,6 +55,7 @@ import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; +import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT; @@ -245,6 +246,13 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return skipClean; } + @NotNull + @Override + public DigestAlgorithmSupplier digestAlgorithmSupplier() + { + return DigestAlgorithms.XXHASH32; + } + public void setSkipCleanOnFailures(boolean skipClean) { this.skipClean = skipClean; @@ -326,7 +334,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo int ssTableIdx, CassandraInstance instance, String sessionID, - MD5Hash fileHash) throws ClientException + Digest digest) throws ClientException { boolean uploadSucceeded = uploadRequestConsumer.test(instance); uploads.compute(instance, (k, pathList) -> { @@ -334,7 +342,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo { pathList = new ArrayList<>(); } - pathList.add(new UploadRequest(componentFile, ssTableIdx, instance, sessionID, fileHash, uploadSucceeded)); + pathList.add(new UploadRequest(componentFile, ssTableIdx, instance, sessionID, digest, uploadSucceeded)); return pathList; }); if (!uploadSucceeded) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java index 1be5c2c..08ae58a 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSSTableWriter.java @@ -21,13 +21,14 @@ package org.apache.cassandra.spark.bulkwriter; import java.nio.file.Path; +import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.jetbrains.annotations.NotNull; class NonValidatingTestSSTableWriter extends SSTableWriter { - NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path) + NonValidatingTestSSTableWriter(MockTableWriter tableWriter, Path path, DigestAlgorithm digestAlgorithm) { - super(tableWriter, path); + super(tableWriter, path, digestAlgorithm); } @Override diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java index 92fda4c..05f7b74 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java @@ -47,6 +47,8 @@ import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.model.CassandraInstance; import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.utils.DigestAlgorithm; +import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; @@ -84,7 +86,7 @@ public class RecordWriterTest }; @TempDir - public Path folder; // CHECKSTYLE IGNORE: Public mutable field for parameterized testing + private Path folder; private TokenRangeMapping<RingInstance> tokenRangeMapping; private RecordWriter rw; @@ -93,10 +95,12 @@ public class RecordWriterTest private Range<BigInteger> range; private MockBulkWriterContext writerContext; private TestTaskContext tc; + private DigestAlgorithm digestAlgorithm; @BeforeEach public void setUp() { + digestAlgorithm = new XXHash32DigestAlgorithm(); tw = new MockTableWriter(folder.getRoot()); tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 3), 12); writerContext = new MockBulkWriterContext(tokenRangeMapping); @@ -237,7 +241,7 @@ public class RecordWriterTest List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); for (UploadRequest ur : requests) { - assertNotNull(ur.fileHash); + assertNotNull(ur.digest); } } @@ -312,7 +316,7 @@ public class RecordWriterTest List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); for (UploadRequest ur : requests) { - assertNotNull(ur.fileHash); + assertNotNull(ur.digest); } } @@ -342,7 +346,7 @@ public class RecordWriterTest List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); for (UploadRequest ur : requests) { - assertNotNull(ur.fileHash); + assertNotNull(ur.digest); } } @@ -377,14 +381,14 @@ public class RecordWriterTest List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); for (UploadRequest ur : requests) { - assertNotNull(ur.fileHash); + assertNotNull(ur.digest); } } @Test public void testCorruptSSTable() { - rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw.setOutDir(path), path)); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path, dp) -> new SSTableWriter(tw.setOutDir(path), path, digestAlgorithm)); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); // TODO: Add better error handling with human-readable exception messages in SSTableReader::new // That way we can assert on the exception thrown here @@ -394,7 +398,7 @@ public class RecordWriterTest @Test public void testWriteWithOutOfRangeTokenFails() { - rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder)); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path, dp) -> new SSTableWriter(tw, folder, digestAlgorithm)); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, Range.all(), false, false, false); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); String expectedErr = "java.lang.IllegalStateException: Received Token " + @@ -405,7 +409,7 @@ public class RecordWriterTest @Test public void testAddRowThrowingFails() { - rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder)); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path, dp) -> new SSTableWriter(tw, folder, digestAlgorithm)); tw.setAddRowThrows(true); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); @@ -417,7 +421,7 @@ public class RecordWriterTest { // Mock context returns a 60-minute allowable time skew, so we use something just outside the limits long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61); - rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder)); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path, dp) -> new SSTableWriter(tw, folder, digestAlgorithm)); writerContext.setTimeProvider(() -> System.currentTimeMillis() - sixtyOneMinutesInMillis); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); @@ -463,7 +467,7 @@ public class RecordWriterTest List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); for (UploadRequest ur : requests) { - assertNotNull(ur.fileHash); + assertNotNull(ur.digest); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java index 7155f12..58dc982 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.bridge.CassandraVersionFeatures; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,7 +54,8 @@ public class SSTableWriterTest .collect(Collectors.toList()); } - private @NotNull TokenRangeMapping<RingInstance> tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 12); + @NotNull + private final TokenRangeMapping<RingInstance> tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 12); @BeforeAll public static void setProps() @@ -76,7 +78,7 @@ public class SSTableWriterTest } @TempDir - public Path tmpDir; // CHECKSTYLE IGNORE: Public mutable field for testing + private Path tmpDir; @ParameterizedTest @@ -84,7 +86,7 @@ public class SSTableWriterTest public void canCreateWriterForVersion(String version) throws IOException { MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM); - SSTableWriter tw = new SSTableWriter(writerContext, tmpDir); + SSTableWriter tw = new SSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm()); tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, "course", "foo", "marks", 1)); tw.close(writerContext, 1); try (DirectoryStream<Path> dataFileStream = Files.newDirectoryStream(tw.getOutDir(), "*Data.db")) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java index 25147ca..b809f88 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java @@ -45,6 +45,8 @@ import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.utils.DigestAlgorithm; +import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -64,11 +66,12 @@ public class StreamSessionConsistencyTest private static final Map<String, Object> COLUMN_BIND_VALUES = ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2); @TempDir - public Path folder; // CHECKSTYLE IGNORE: Public mutable field for testing + private Path folder; private MockTableWriter tableWriter; private StreamSession streamSession; private MockBulkWriterContext writerContext; private final MockScheduledExecutorService executor = new MockScheduledExecutorService(); + private DigestAlgorithm digestAlgorithm; public static Collection<Object[]> data() { @@ -81,6 +84,7 @@ public class StreamSessionConsistencyTest private void setup(ConsistencyLevel.CL consistencyLevel, List<Integer> failuresPerDc) { + digestAlgorithm = new XXHash32DigestAlgorithm(); tableWriter = new MockTableWriter(folder); writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-4.0.0", consistencyLevel); streamSession = new StreamSession(writerContext, @@ -118,7 +122,7 @@ public class StreamSessionConsistencyTest return new DataTransferApi.RemoteCommitResult(true, Collections.emptyList(), uuids, ""); } }); - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES); tr.close(writerContext, 1); streamSession.scheduleStream(tr); @@ -163,7 +167,7 @@ public class StreamSessionConsistencyTest ImmutableMap<String, AtomicInteger> dcFailures = ImmutableMap.of("DC1", dc1Failures, "DC2", dc2Failures); boolean shouldFail = calculateFailure(consistencyLevel, dc1Failures.get(), dc2Failures.get()); writerContext.setUploadSupplier(instance -> dcFailures.get(instance.datacenter()).getAndDecrement() <= 0); - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES); tr.close(writerContext, 1); streamSession.scheduleStream(tr); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java index 3a71640..82924cb 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java @@ -40,6 +40,8 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.utils.DigestAlgorithm; +import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; import org.jetbrains.annotations.NotNull; import static org.hamcrest.MatcherAssert.assertThat; @@ -58,7 +60,7 @@ public class StreamSessionTest public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 ranges with LOCAL_QUORUM"; private static final Map<String, Object> COLUMN_BOUND_VALUES = ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2); @TempDir - public Path folder; // CHECKSTYLE IGNORE: Public mutable field for testing + private Path folder; private static final int FILES_PER_SSTABLE = 8; private static final int RF = 3; @@ -69,10 +71,12 @@ public class StreamSessionTest private MockScheduledExecutorService executor; private MockTableWriter tableWriter; private Range<BigInteger> range; + private DigestAlgorithm digestAlgorithm; @BeforeEach public void setup() { + digestAlgorithm = new XXHash32DigestAlgorithm(); range = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, BigInteger.valueOf(199L), BoundType.CLOSED); tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 12); writerContext = getBulkWriterContext(); @@ -94,7 +98,7 @@ public class StreamSessionTest @Test public void testScheduleStreamSendsCorrectFilesToCorrectInstances() throws IOException, ExecutionException, InterruptedException { - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); @@ -109,7 +113,7 @@ public class StreamSessionTest @Test public void testMismatchedTokenRangeFails() throws IOException { - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(9999L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, @@ -158,7 +162,7 @@ public class StreamSessionTest public void testOutDirCreationFailureCleansAllReplicas() { assertThrows(RuntimeException.class, () -> { - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, tableWriter.getOutDir()); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, tableWriter.getOutDir(), digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); tableWriter.removeOutDir(); @@ -208,7 +212,7 @@ public class StreamSessionTest return new DataTransferApi.RemoteCommitResult(true, null, uuids, ""); } }); - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); @@ -236,7 +240,7 @@ public class StreamSessionTest } }); - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); @@ -252,7 +256,7 @@ public class StreamSessionTest private void runFailedUpload() throws IOException, ExecutionException, InterruptedException { writerContext.setUploadSupplier(instance -> false); - SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder); + SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder, digestAlgorithm); tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java index 637ad2d..c011fc3 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/UploadRequest.java @@ -21,7 +21,7 @@ package org.apache.cassandra.spark.bulkwriter; import java.nio.file.Path; -import org.apache.cassandra.spark.common.MD5Hash; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.model.CassandraInstance; public class UploadRequest @@ -29,22 +29,22 @@ public class UploadRequest public final Path path; public final int ssTableIdx; public final CassandraInstance instance; - public final String sesssionId; - public final MD5Hash fileHash; + public final String sessionId; + public final Digest digest; public final boolean uploadSucceeded; public UploadRequest(Path path, int ssTableIdx, CassandraInstance instance, - String sesssionId, - MD5Hash fileHash, + String sessionId, + Digest digest, boolean uploadSucceeded) { this.path = path; this.ssTableIdx = ssTableIdx; this.instance = instance; - this.sesssionId = sesssionId; - this.fileHash = fileHash; + this.sessionId = sessionId; + this.digest = digest; this.uploadSucceeded = uploadSucceeded; } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithmTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithmTest.java new file mode 100644 index 0000000..153bc6d --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/MD5DigestAlgorithmTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.io.IOException; +import java.nio.file.Path; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import org.apache.cassandra.spark.common.Digest; +import org.apache.cassandra.spark.common.MD5Digest; + +import static org.apache.cassandra.spark.utils.ResourceUtils.writeResourceToPath; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link MD5DigestAlgorithm} + */ +class MD5DigestAlgorithmTest +{ + @TempDir + private Path tempPath; + + // To generate test files I used: + // $ base64 -i /dev/urandom | head -c 1048576 > file1.txt + // $ base64 -i /dev/urandom | head -c 524288 > file2.txt + // $ base64 -i /dev/urandom | head -c 131072 > file3.txt + // To calculate MD5 I used: + // $ cat file1.txt | openssl dgst -md5 -binary | openssl enc -base64 # -> VqSURYiCXjZIgP+CO9IkLQ== + // $ cat file2.txt | openssl dgst -md5 -binary | openssl enc -base64 # -> vFoVTqVngw7JRj8yJfk3UA== + // $ cat file3.txt | openssl dgst -md5 -binary | openssl enc -base64 # -> RXASCHthSSrMt7YOKJ6ODQ== + + @ParameterizedTest(name = "{index} fileName={0} expectedMd5={1}") + @CsvSource({ + "file1.txt,VqSURYiCXjZIgP+CO9IkLQ==", + "file2.txt,vFoVTqVngw7JRj8yJfk3UA==", + "file3.txt,RXASCHthSSrMt7YOKJ6ODQ==", + }) + void testMD5Provider(String fileName, String expectedMd5) throws IOException + { + ClassLoader classLoader = MD5DigestAlgorithmTest.class.getClassLoader(); + Path path = writeResourceToPath(classLoader, tempPath, "digest/" + fileName); + assertThat(path).exists(); + + Digest digest = new MD5DigestAlgorithm().calculateFileDigest(path); + assertThat(digest).isInstanceOf(MD5Digest.class); + assertThat(digest.value()).isEqualTo(expectedMd5); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/ResourceUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/ResourceUtils.java new file mode 100644 index 0000000..1d6d68f --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/ResourceUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** + * A utilities class to handle resources for tests + */ +public final class ResourceUtils +{ + private ResourceUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + /** + * Writes a resource with {@code resourceName} loaded from the {@link ClassLoader classLoader} into the + * {@code destinationPath} + * + * @param classLoader the class loader for the resource + * @param destinationPath the destination path to write the file + * @param resourceName the name of the resource to be loaded + * @return the {@link Path} to the created resource + */ + public static Path writeResourceToPath(ClassLoader classLoader, Path destinationPath, String resourceName) + { + try + { + Path resourcePath = destinationPath.resolve(resourceName); + + // ensure parent directory is created + Files.createDirectories(resourcePath.getParent()); + + try (InputStream inputStream = classLoader.getResourceAsStream(resourceName); + OutputStream outputStream = Files.newOutputStream(resourcePath)) + { + assertThat(inputStream).isNotNull(); + + int length; + byte[] buffer = new byte[1024]; + while ((length = inputStream.read(buffer)) != -1) + { + outputStream.write(buffer, 0, length); + } + } + return resourcePath; + } + catch (IOException exception) + { + String failureMessage = "Unable to create resource " + resourceName; + fail(failureMessage, exception); + throw new RuntimeException(failureMessage, exception); + } + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTest.java similarity index 97% rename from cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTests.java rename to cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTest.java index b7bbf10..81ad0fc 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/SSTableInputStreamHttpTest.java @@ -64,7 +64,7 @@ import static org.quicktheories.generators.SourceDSL.arbitrary; * Test the {@link SSTableInputStream} by firing up an in-test HTTP server and reading the files with an HTTP client * Compares the MD5s to verify file bytes match bytes returned by {@link SSTableInputStream}. */ -public class SSTableInputStreamHttpTests +public class SSTableInputStreamHttpTest { static final ExecutorService HTTP_EXECUTOR = Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("http-server-%d") @@ -74,10 +74,10 @@ public class SSTableInputStreamHttpTests Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("http-client-%d") .setDaemon(true) .build()); - private static final Logger LOGGER = LoggerFactory.getLogger(SSTableInputStreamHttpTests.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SSTableInputStreamHttpTest.class); @TempDir - public static Path DIRECTORY; // CHECKSTYLE IGNORE: Constant cannot be made final + private static Path directory; private static final String HOST = "localhost"; private static final int PORT = 8001; private static final int HTTP_CLIENT_CHUNK_SIZE = 8192; @@ -95,7 +95,7 @@ public class SSTableInputStreamHttpTests try { String uri = exchange.getRequestURI().getPath().replaceFirst("/", ""); - Path path = DIRECTORY.resolve(uri); + Path path = directory.resolve(uri); // Extract Range from header long size = Files.size(path); @@ -266,7 +266,7 @@ public class SSTableInputStreamHttpTests { try { - Path path = Files.createTempFile(DIRECTORY, null, null); + Path path = Files.createTempFile(directory, null, null); MessageDigest digest = DigestUtils.getMd5Digest(); try (BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(path))) { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithmTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithmTest.java new file mode 100644 index 0000000..92b32e9 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/XXHash32DigestAlgorithmTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.io.IOException; +import java.nio.file.Path; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import org.apache.cassandra.spark.common.Digest; +import org.apache.cassandra.spark.common.XXHash32Digest; + +import static org.apache.cassandra.spark.utils.ResourceUtils.writeResourceToPath; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link XXHash32DigestAlgorithm} + */ +class XXHash32DigestAlgorithmTest +{ + @TempDir + private Path tempPath; + + // To generate test files I used: + // $ base64 -i /dev/urandom | head -c 1048576 > file1.txt + // $ base64 -i /dev/urandom | head -c 524288 > file2.txt + // $ base64 -i /dev/urandom | head -c 131072 > file3.txt + // To calculate xxhash I installed xxhash (brew install xxhash): + // $ xxh32sum file1.txt # -> d76a44a5 + // $ xxh32sum file2.txt # -> ef976cbe + // $ xxh32sum file3.txt # -> 08321e1e + + @ParameterizedTest(name = "{index} fileName={0} expectedMd5={1}") + @CsvSource({ + "file1.txt,ffffffffd76a44a5", + "file2.txt,ffffffffef976cbe", + "file3.txt,8321e1e", + }) + void testXXHash32Provider(String fileName, String expectedXXHash32) throws IOException + { + ClassLoader classLoader = MD5DigestAlgorithmTest.class.getClassLoader(); + Path path = writeResourceToPath(classLoader, tempPath, "digest/" + fileName); + assertThat(path).exists(); + + Digest digest = new XXHash32DigestAlgorithm().calculateFileDigest(path); + assertThat(digest).isInstanceOf(XXHash32Digest.class); + + XXHash32Digest xxHash32Digest = (XXHash32Digest) digest; + assertThat(xxHash32Digest.value()).isEqualTo(expectedXXHash32); + assertThat(xxHash32Digest.seedHex()).isEqualTo("0"); + } +} diff --git a/cassandra-analytics-core/src/test/resources/digest/file1.txt b/cassandra-analytics-core/src/test/resources/digest/file1.txt new file mode 100644 index 0000000..6a5c5cb --- /dev/null +++ b/cassandra-analytics-core/src/test/resources/digest/file1.txt @@ -0,0 +1 @@ +MGz17YVmWKTHp3Q1eKm34XBSSVZmBogGnFOC//tXbny83crrQ256VuXF6copnLayBe18eOQos9ZIWlhBP/sGB2Dl+JIUKhnZ9fc3eBSYv0V9cnqh+ImC02k4XijF7T0H27VKf+EcbNCRmERnROuIeX6oLDlUeKpsoErOQgVW3BjwBKf08TrK8lgc4BTZfF77uTmAGJJz9AA5N0ZV3LQ6qn2W7I8MiMqllak37VNI2J2aI5V46lq40/w3a4au3AvtD16UM+lHvsxyOvbEPKd6fqdFU80IVjZXfrtA6eRpuVc03w8ptFyODHmy7Lpy8aVFtI3tXQODDbeC5kWMYP7KSgTVG5fO1P/W3WcPKPchj6h3SXRlW46wXtEmi6y16eBNxVT2tjinS4tZheXwQBeTV03P9PJg3pDnarKGyRaRaWYN+yvK8DAYlfxyy27kvSTx/6e4Pou8GJW00RJMQ+oQHffIjLnubPPH35K5/5Dwu8XO [...] \ No newline at end of file diff --git a/cassandra-analytics-core/src/test/resources/digest/file2.txt b/cassandra-analytics-core/src/test/resources/digest/file2.txt new file mode 100644 index 0000000..3deea33 --- /dev/null +++ b/cassandra-analytics-core/src/test/resources/digest/file2.txt @@ -0,0 +1 @@ +l9a7GQdACyjuRrhsZqNkTBvl+1kfYLrSpn9RH/jyP0eAjS6Sl/VQxfsNAgPHFOOc0wtj0VwntcJN2loQSXdyQakhSOJpyT/EybE4hYzRqU6yCQmNl6Oi9P8No+hA0PBnlZIJT5rmKpj05CcHNdMADbj2K9+Oa0fJGMYhL7hzSzdOcimp4LnwURIWxzI7jDK8ve0Mo7BUfHuLSideMBBjfNcCny3dY43dL0PIwGtWnrBLZF81Dvjc4csGkz4OTJ31biW0ivHqVCsMRKx2eAXw0FB2/0s7iarPcpkyHk20BnIsi2tjOv0btnQC9p7PL+HArealU3rZW/y1uKupKjdFPGdD9+1kCQdVR+J5qi+udp5Bj22qfioTXd3BU2xWxDgxtHUB9Z8k4CFx6/dD46cz+1dlCtXi2HWvgsyUVdvL3rJJWP2yyzvsdn/3KmCj/uc1lYD7yvFsIInGzjRy8vCcwg2cNFNlNIuxZnXB4GONKUVp [...] \ No newline at end of file diff --git a/cassandra-analytics-core/src/test/resources/digest/file3.txt b/cassandra-analytics-core/src/test/resources/digest/file3.txt new file mode 100644 index 0000000..e20cf24 --- /dev/null +++ b/cassandra-analytics-core/src/test/resources/digest/file3.txt @@ -0,0 +1 @@ +Mfwzkymsh9jm1FZSEXzTDelmCU69gjd9fzrUQcQtfYxNS4rOZWBbZGvEXOzUumuF2fW6xCZ3o7ZZuO4cvL5ZqqO+fPT+CRlt4KwDeRXyZMApiXdMXiVdoRwaQXIjlIf10dEn2KMbenVb9ClZlvWV8JcOWZxoXHs+P/LJPeq06mYY4/SzdCJOQdzyTVWASsHaMtNRgcjEWjigvPsxctXPGX7Lp9Du7E9WRFm5xmoeJGgYslOI6aU54GYfDAavV5cIdwCHawbu7VH1aTDj3VImC+rHlUm+/ZCsczqpjJSe37KCd/SrL1vqlYe0zVf4WyX4R6zJCvmh0/A2XaRkMYeM9wxxAP7+dw43dGI7gceh+zWzyFk/vnZj1ZSvdrI38WgnC0NWP2r0ilvB1OQemSv6XnL2m5n92s0aRTiw6SxFYSSUJ4yu9kNudPdVWGhEVm9S1P3VWYvfsQsdIKwh9j26ascLZHJ0Si3Tdnsz3mqT/8sM [...] \ No newline at end of file diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java index 29e949c..05dd7e2 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/AbstractClusterUtils.java @@ -34,7 +34,7 @@ public class AbstractClusterUtils * @param nodeNumber the number of the node for which a configuration should be created * @return the instance configuration for the specified node number */ - public static InstanceConfig createInstanceConfig(AbstractCluster cluster, int nodeNumber) + public static InstanceConfig createInstanceConfig(AbstractCluster<?> cluster, int nodeNumber) { return cluster.createInstanceConfig(nodeNumber); } diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index cdc0708..e9b9542 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -53,6 +53,7 @@ import io.vertx.core.Vertx; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.impl.AbstractCluster; @@ -322,6 +323,29 @@ public abstract class SharedClusterIntegrationTestBase .collect(Collectors.joining(",")); } + /** + * Convenience method to query all data from the provided {@code table} at consistency level {@code LOCAL_QUORUM}. + * + * @param table the qualified Cassandra table name + * @return all the data queried from the table + */ + protected Object[][] queryAllData(QualifiedName table) + { + return queryAllData(table, ConsistencyLevel.LOCAL_QUORUM); + } + + /** + * Convenience method to query all data from the provided {@code table} at the specified consistency level. + * + * @param table the qualified Cassandra table name + * @param consistencyLevel the consistency level to use for querying the data + * @return all the data queried from the table + */ + protected Object[][] queryAllData(QualifiedName table, ConsistencyLevel consistencyLevel) + { + return cluster.coordinator(1).execute(String.format("SELECT * FROM %s;", table), consistencyLevel); + } + static class IntegrationTestModule extends AbstractModule { private final AbstractCluster<? extends IInstance> cluster; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java index 3acbcb1..2650cd7 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java @@ -20,15 +20,8 @@ package org.apache.cassandra.analytics; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.LongStream; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -37,7 +30,6 @@ import org.junit.jupiter.params.provider.MethodSource; import com.vdurmont.semver4j.Semver; import org.apache.cassandra.distributed.UpgradeableCluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.Versions; @@ -45,24 +37,16 @@ import org.apache.cassandra.sidecar.testing.JvmDTestSharedClassesPredicate; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.testing.TestVersion; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructType; +import static org.apache.cassandra.analytics.DataGenerationUtils.generateCourseData; +import static org.apache.cassandra.analytics.SparkTestUtils.validateWrites; import static org.apache.cassandra.testing.TestUtils.DC1_RF1; import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; -import static org.apache.spark.sql.types.DataTypes.BinaryType; -import static org.apache.spark.sql.types.DataTypes.LongType; -import static org.assertj.core.api.Assertions.assertThat; /** * Tests the bulk writer behavior when requiring quoted identifiers for keyspace, table name, and column names. @@ -83,47 +67,14 @@ class QuoteIdentifiersWriteTest extends SharedClusterSparkIntegrationTestBase void testQuoteIdentifiersBulkWrite(QualifiedName tableName) { SparkSession spark = getOrCreateSparkSession(); - SparkContext sc = spark.sparkContext(); - JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc); - SQLContext sql = spark.sqlContext(); - - int parallelism = sc.defaultParallelism(); - JavaRDD<Row> rows = genDataset(javaSparkContext, ROW_COUNT, parallelism); - Dataset<Row> df = sql.createDataFrame(rows, writeSchema()); - + // Generates course data from and renames the dataframe columns to use case-sensitive and reserved + // words in the dataframe + Dataset<Row> df = generateCourseData(spark, ROW_COUNT).toDF("IdEnTiFiEr", // case-sensitive struct + "course", + "limit"); // limit is a reserved word in Cassandra bulkWriterDataFrameWriter(df, tableName).option(WriterOptions.QUOTE_IDENTIFIERS.name(), "true") .save(); - validateWrites(tableName, rows); - } - - void validateWrites(QualifiedName tableName, JavaRDD<Row> rowsWritten) - { - // build a set of entries read from Cassandra into a set - Set<String> actualEntries = Arrays.stream(cluster.coordinator(1) - .execute(String.format("SELECT * FROM %s;", tableName), ConsistencyLevel.LOCAL_QUORUM)) - .map((Object[] columns) -> String.format("%s:%s:%s", - new String(((ByteBuffer) columns[1]).array(), StandardCharsets.UTF_8), - columns[0], - columns[2])) - .collect(Collectors.toSet()); - - // Number of entries in Cassandra must match the original datasource - assertThat(actualEntries.size()).isEqualTo(rowsWritten.count()); - - // remove from actual entries to make sure that the data read is the same as the data written - rowsWritten.collect() - .forEach(row -> { - String key = String.format("%s:%d:%d", - new String((byte[]) row.get(0), StandardCharsets.UTF_8), - row.getLong(1), - row.getLong(2)); - assertThat(actualEntries.remove(key)).as(key + " is expected to exist in the actual entries") - .isTrue(); - }); - - // If this fails, it means there was more data in the database than we expected - assertThat(actualEntries).as("All entries are expected to be read from database") - .isEmpty(); + validateWrites(df.collectAsList(), queryAllData(tableName)); } static Stream<Arguments> testInputs() @@ -159,7 +110,7 @@ class QuoteIdentifiersWriteTest extends SharedClusterSparkIntegrationTestBase protected void initializeSchemaForTest() { String createTableStatement = "CREATE TABLE IF NOT EXISTS %s " + - "(\"IdEnTiFiEr\" bigint, course blob, \"limit\" bigint," + + "(\"IdEnTiFiEr\" int, course text, \"limit\" int," + " PRIMARY KEY(\"IdEnTiFiEr\"));"; TABLE_NAMES.forEach(name -> { @@ -167,28 +118,4 @@ class QuoteIdentifiersWriteTest extends SharedClusterSparkIntegrationTestBase createTestTable(name, createTableStatement); }); } - - static StructType writeSchema() - { - return new StructType() - .add("course", BinaryType, false) - .add("IdEnTiFiEr", LongType, false) // case-sensitive struct - .add("limit", LongType, false); // limit is a reserved word in Cassandra - } - - static JavaRDD<Row> genDataset(JavaSparkContext sc, int recordCount, Integer parallelism) - { - long recordsPerPartition = recordCount / parallelism; - long remainder = recordCount - (recordsPerPartition * parallelism); - List<Integer> seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList()); - return sc.parallelize(seq, parallelism).mapPartitionsWithIndex( - (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, integerIterator) -> { - long firstRecordNumber = index * recordsPerPartition; - long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition; - return LongStream.range(0, recordsToGenerate).mapToObj(offset -> { - long limit = firstRecordNumber + offset; - return RowFactory.create(("course-" + limit).getBytes(), limit, limit); - }).iterator(); - }, false); - } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java index 3697956..53806cb 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java @@ -19,7 +19,11 @@ package org.apache.cassandra.analytics; +import java.util.Arrays; +import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.KryoRegister; @@ -33,6 +37,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; +import static org.assertj.core.api.Assertions.assertThat; + /** * Helper methods for Spark tests */ @@ -127,4 +133,32 @@ public final class SparkTestUtils KryoRegister.setup(sparkConf); return sparkConf; } + + public static void validateWrites(List<Row> sourceData, Object[][] queriedData) + { + // build a set of entries read from Cassandra into a set + Set<String> actualEntries = Arrays.stream(queriedData) + .map((Object[] columns) -> String.format("%s:%s:%s", + columns[0], + columns[1], + columns[2])) + .collect(Collectors.toSet()); + + // Number of entries in Cassandra must match the original datasource + assertThat(actualEntries.size()).isEqualTo(sourceData.size()); + + // remove from actual entries to make sure that the data read is the same as the data written + sourceData.forEach(row -> { + String key = String.format("%d:%s:%d", + row.getInt(0), + row.getString(1), + row.getInt(2)); + assertThat(actualEntries.remove(key)).as(key + " is expected to exist in the actual entries") + .isTrue(); + }); + + // If this fails, it means there was more data in the database than we expected + assertThat(actualEntries).as("All entries are expected to be read from database") + .isEmpty(); + } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/WriterDigestIntegrationTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/WriterDigestIntegrationTest.java new file mode 100644 index 0000000..8589a19 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/WriterDigestIntegrationTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.vdurmont.semver4j.Semver; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.Versions; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; +import org.apache.cassandra.testing.TestVersion; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static org.apache.cassandra.analytics.SparkTestUtils.validateWrites; +import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; + +/** + * Tests bulk writes with different digest options + */ +class WriterDigestIntegrationTest extends SharedClusterSparkIntegrationTestBase +{ + static final QualifiedName DEFAULT_DIGEST_TABLE = uniqueTestTableFullName("default_digest"); + static final QualifiedName MD5_DIGEST_TABLE = uniqueTestTableFullName("md5_digest"); + static final QualifiedName CORRUPT_SSTABLE_TABLE = uniqueTestTableFullName("corrupt_sstable"); + static final List<QualifiedName> TABLE_NAMES = Arrays.asList(DEFAULT_DIGEST_TABLE, MD5_DIGEST_TABLE, + CORRUPT_SSTABLE_TABLE); + Dataset<Row> df; + + @Test + void testDefaultDigest() + { + bulkWriterDataFrameWriter(df, DEFAULT_DIGEST_TABLE).save(); + validateWrites(df.collectAsList(), queryAllData(DEFAULT_DIGEST_TABLE)); + } + + @Test + void testMD5Digest() + { + SparkSession spark = getOrCreateSparkSession(); + Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + bulkWriterDataFrameWriter(df, MD5_DIGEST_TABLE).option(WriterOptions.DIGEST.name(), "MD5").save(); + validateWrites(df.collectAsList(), queryAllData(MD5_DIGEST_TABLE)); + } + + @Test + void failsOnInvalidDigestOption() + { + assertThatIllegalArgumentException() + .isThrownBy(() -> bulkWriterDataFrameWriter(df, DEFAULT_DIGEST_TABLE).option(WriterOptions.DIGEST.name(), "invalid") + .save()) + .withMessageContaining("Key digest type with value invalid is not a valid Enum of type class org.apache.cassandra.spark.bulkwriter.DigestAlgorithms"); + } + + @Override + protected void beforeTestStart() + { + SparkSession spark = getOrCreateSparkSession(); + df = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + } + + @Override + protected UpgradeableCluster provisionCluster(TestVersion testVersion) throws IOException + { + // spin up a C* cluster using the in-jvm dtest + Versions versions = Versions.find(); + Versions.Version requestedVersion = versions.getLatest(new Semver(testVersion.version(), Semver.SemverType.LOOSE)); + + UpgradeableCluster.Builder clusterBuilder = + UpgradeableCluster.build(1) + .withDynamicPortAllocation(true) + .withVersion(requestedVersion) + .withDCs(1) + .withDataDirCount(1) + .withConfig(config -> config.with(Feature.NATIVE_PROTOCOL) + .with(Feature.GOSSIP) + .with(Feature.JMX)); + TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(1, clusterBuilder.getTokenCount()); + clusterBuilder.withTokenSupplier(tokenSupplier); + UpgradeableCluster cluster = clusterBuilder.createWithoutStarting(); + cluster.startup(); + return cluster; + } + + @Override + protected void initializeSchemaForTest() + { + TABLE_NAMES.forEach(name -> { + createTestKeyspace(name, DC1_RF1); + createTestTable(name, CREATE_TEST_TABLE_STATEMENT); + }); + } +} diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java index cd06425..b2fa27c 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java @@ -202,7 +202,7 @@ public final class MapUtils String value = options.get(lowerCaseKey(key)); try { - return value != null ? (T) Enum.valueOf(defaultValue.getClass(), value) : defaultValue; + return value != null ? (T) Enum.valueOf(defaultValue.getDeclaringClass(), value) : defaultValue; } catch (IllegalArgumentException exception) { diff --git a/scripts/build-sidecar.sh b/scripts/build-sidecar.sh index eb1767f..cc3f13f 100755 --- a/scripts/build-sidecar.sh +++ b/scripts/build-sidecar.sh @@ -24,7 +24,7 @@ else SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; ) SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}" SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}" - SIDECAR_COMMIT="${SIDECAR_COMMIT:-2eb3474d7037a2887bcd9dee1f64c2a36a7e8d26}" + SIDECAR_COMMIT="${SIDECAR_COMMIT:-2914d57f0428643b3a92b6af8f4da1b209d80c2a}" SIDECAR_JAR_DIR="$(dirname "${SCRIPT_DIR}/")/dependencies" SIDECAR_JAR_DIR=${CASSANDRA_DEP_DIR:-$SIDECAR_JAR_DIR} SIDECAR_BUILD_DIR="${SIDECAR_JAR_DIR}/sidecar-build" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org