This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 8e6c527a CASSANALYTICS-19: Fix double digesting (#105) 8e6c527a is described below commit 8e6c527add29d61b5ee4a0b176c8d3b8d2bcb3c0 Author: Yifan Cai <y...@apache.org> AuthorDate: Mon Apr 14 16:17:08 2025 -0700 CASSANALYTICS-19: Fix double digesting (#105) Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANALYTICS-19 --- CHANGES.txt | 1 + .../cassandra/spark/bulkwriter/BulkSparkConf.java | 8 ++- .../spark/bulkwriter/cloudstorage/Bundle.java | 21 +++++-- .../cloudstorage/CloudStorageStreamSession.java | 4 +- .../bulkwriter/cloudstorage/SSTableCollector.java | 15 +++++ .../bulkwriter/cloudstorage/SSTableLister.java | 22 ++++++++ .../bulkwriter/cloudstorage/SSTablesBundler.java | 12 ++++ .../spark/bulkwriter/BulkSparkConfTest.java | 15 +++++ .../spark/bulkwriter/cloudstorage/BundleTest.java | 66 +++++++++++++++++++++- .../CloudStorageStreamSessionTest.java | 5 ++ .../bulkwriter/cloudstorage/SSTableListerTest.java | 42 ++++++++++++++ .../cloudstorage/SSTablesBundlerTest.java | 6 ++ 12 files changed, 209 insertions(+), 8 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f29ef75d..74e482ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Bulk writer in S3_COMPAT mode calculates file digest twice (CASSANALYTICS-19) * Add Apache Cassandra Sidecar CDC implementation (CASSANALYTICS-7) * Upgrade sidecar client dependency to 0.1.0 (CASSANALYTICS-16) * Remove JDK8 support (CASSANALYTICS-17) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index fc870957..1adec0e3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -186,7 +186,6 @@ public class BulkSparkConf implements Serializable this.truststoreBase64Encoded = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_BASE64_ENCODED.name(), null); this.truststoreType = MapUtils.getOrDefault(options, WriterOptions.TRUSTSTORE_TYPE.name(), null); this.writeMode = MapUtils.getEnumOption(options, WriterOptions.WRITE_MODE.name(), WriteMode.INSERT, "write mode"); - this.digestAlgorithmSupplier = digestAlgorithmSupplierFromOptions(options); // For backwards-compatibility with port settings, use writer option if available, // else fall back to props, and then default if neither specified this.useOpenSsl = getBoolean(USE_OPENSSL, true); @@ -228,6 +227,7 @@ public class BulkSparkConf implements Serializable this.configuredJobId = MapUtils.getOrDefault(options, WriterOptions.JOB_ID.name(), null); this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, WriterOptions.COORDINATED_WRITE_CONFIG.name(), null); this.coordinatedWriteConf = buildCoordinatedWriteConf(dataTransportInfo.getTransport()); + this.digestAlgorithmSupplier = digestAlgorithmSupplierFromOptions(dataTransport, options); validateEnvironment(); } @@ -238,8 +238,12 @@ public class BulkSparkConf implements Serializable * @return the configured {@link DigestAlgorithmSupplier} */ @NotNull - protected DigestAlgorithmSupplier digestAlgorithmSupplierFromOptions(Map<String, String> options) + protected DigestAlgorithmSupplier digestAlgorithmSupplierFromOptions(DataTransport dataTransport, Map<String, String> options) { + if (dataTransport == DataTransport.S3_COMPAT) + { + return DigestAlgorithms.XXHASH32; + } return MapUtils.getEnumOption(options, WriterOptions.DIGEST.name(), DigestAlgorithms.XXHASH32, "digest type"); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java index 7e853c08..1580918e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java @@ -25,14 +25,16 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Objects; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; import org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableCollector.SSTableFilesAndRange; -import org.apache.cassandra.spark.bulkwriter.util.IOUtils; import org.apache.cassandra.spark.common.DataObjectBuilder; +import org.apache.cassandra.spark.common.Digest; /** * Bundle represents a set of SSTables bundled, as per bundle size set by clients through writer option. @@ -149,6 +151,7 @@ public class Bundle private long bundleUncompressedSize; private long bundleCompressedSize; private BundleNameGenerator bundleNameGenerator; + private Map<Path, Digest> fileDigests; Builder() { @@ -176,6 +179,11 @@ public class Bundle return with(b -> b.bundleNameGenerator = bundleNameGenerator); } + public Builder fileDigests(Map<Path, Digest> fileDigests) + { + return with(b -> b.fileDigests = fileDigests); + } + /** * Set the sequence of the bundle. It should be monotonically increasing * @param bundleSequence sequence of the bundle @@ -211,7 +219,7 @@ public class Bundle { prepareBuild(); } - catch (IOException ioe) + catch (Exception ioe) { throw new RuntimeException("Unable to produce bundle manifest", ioe); } @@ -238,21 +246,26 @@ public class Bundle private void populateBundleManifestAndPersist() throws IOException { + int totalFiles = 0; for (SSTableFilesAndRange sstable : sourceSSTables) { + totalFiles += sstable.files.size(); // all SSTable components related to one SSTable moved under same bundle BundleManifest.Entry manifestEntry = new BundleManifest.Entry(sstable.summary); for (Path componentPath : sstable.files) { - String checksum = IOUtils.xxhash32(componentPath); + Digest digest = Objects.requireNonNull(fileDigests.get(componentPath), () -> "No digest found for file: " + componentPath); Path targetPath = bundleDirectory.resolve(componentPath.getFileName()); // link the original files to the bundle dir to avoid copying data Files.createLink(targetPath, componentPath); - manifestEntry.addComponentChecksum(componentPath.getFileName().toString(), checksum); + manifestEntry.addComponentChecksum(componentPath.getFileName().toString(), digest.value()); } addManifestEntry(manifestEntry); } + Preconditions.checkState(totalFiles == fileDigests.size(), + "SSTable files: %s does not match with the size of fileDigests: %s", totalFiles, fileDigests.size()); + bundleManifest.persistTo(bundleDirectory.resolve(Bundle.MANIFEST_FILE_NAME)); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java index c26f7416..83c85ad7 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java @@ -117,6 +117,7 @@ public class CloudStorageStreamSession extends StreamSession<TransportContext.Cl try { Map<Path, Digest> fileDigests = sstableWriter.prepareSStablesToSend(writerContext, sstables); + sstablesBundler.includeFileDigests(fileDigests); // sstablesBundler keeps track of the known files. No need to record the streamed files. // group the files by sstable (unique) basename and add to bundler fileDigests.keySet() @@ -159,7 +160,8 @@ public class CloudStorageStreamSession extends StreamSession<TransportContext.Cl protected StreamResult doFinalizeStream() { sstablesBundler.includeDirectory(sstableWriter.getOutDir()); - + // Note: there are likely more sstables produced when closing the writer. Include and merge with the full file digest map collected by the writer. + sstablesBundler.includeFileDigests(sstableWriter.fileDigestMap()); sstablesBundler.finish(); // last stream produces no bundle, and there is no bundle before that (as tracked by bundleCount) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java index aba8556f..1e7b830e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java @@ -23,9 +23,11 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.cassandra.bridge.SSTableSummary; +import org.apache.cassandra.spark.common.Digest; /** * Collect SSTables from listing the included directories @@ -44,6 +46,19 @@ public interface SSTableCollector */ void includeSSTable(List<Path> sstableComponents); + /** + * Include the digests of the files + * @param fileDigests digest of the files + */ + void includeFileDigests(Map<Path, Digest> fileDigests); + + /** + * Get the file digests of the input files + * @param files files to gather the digests + * @return file digests of the input files + */ + Map<Path, Digest> fileDigests(Set<Path> files); + /** * @return total size of all sstables included */ diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java index b18181e7..2897956d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java @@ -42,10 +42,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.SSTableSummary; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.data.FileSystemSSTable; import org.apache.cassandra.spark.data.QualifiedTableName; import org.apache.cassandra.spark.data.SSTable; import org.apache.cassandra.analytics.stats.Stats; +import org.apache.parquet.Preconditions; import org.jetbrains.annotations.NotNull; /** @@ -65,6 +67,7 @@ public class SSTableLister implements SSTableCollector private final Queue<SSTableFilesAndRange> sstables; private final Set<Path> sstableDirectories; private final Set<Path> knownFiles; + private final Map<Path, Digest> fileDigests; private long totalSize; public SSTableLister(QualifiedTableName qualifiedTableName, CassandraBridge bridge) @@ -73,6 +76,7 @@ public class SSTableLister implements SSTableCollector this.bridge = bridge; this.sstables = new LinkedBlockingQueue<>(); this.sstableDirectories = new HashSet<>(); + this.fileDigests = new HashMap<>(); this.knownFiles = new HashSet<>(); } @@ -99,6 +103,24 @@ public class SSTableLister implements SSTableCollector sstables.add(sstableAndRange); } + @Override + public void includeFileDigests(Map<Path, Digest> fileDigests) + { + this.fileDigests.putAll(fileDigests); + } + + @Override + public Map<Path, Digest> fileDigests(Set<Path> files) + { + Map<Path, Digest> result = new HashMap<>(); + for (Path file : files) + { + Preconditions.checkState(fileDigests.containsKey(file), "File not found in the fileDigests map. File: %s", file); + result.put(file, fileDigests.get(file)); + } + return result; + } + @Override public long totalSize() { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java index 3ecbae04..947f8095 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java @@ -23,8 +23,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.stream.Stream; import java.util.zip.ZipEntry; @@ -34,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableCollector.SSTableFilesAndRange; +import org.apache.cassandra.spark.common.Digest; /** * {@link SSTablesBundler} bundles SSTables in the output directory provided by @@ -112,6 +115,11 @@ public class SSTablesBundler implements Iterator<Bundle> collector.includeSSTable(sstableComponents); } + public void includeFileDigests(Map<Path, Digest> fileDigests) + { + collector.includeFileDigests(fileDigests); + } + public void finish() { reachedEnd = true; @@ -140,6 +148,7 @@ public class SSTablesBundler implements Iterator<Bundle> private Bundle computeNext() throws IOException { List<SSTableFilesAndRange> sstableFiles = new ArrayList<>(); + Map<Path, Digest> fileDigests = new HashMap<>(); long size = 0; while (!collector.isEmpty()) { @@ -158,6 +167,8 @@ public class SSTablesBundler implements Iterator<Bundle> else { sstableFiles.add(sstable); + // include all the digests of the files in this sstable + fileDigests.putAll(collector.fileDigests(sstable.files)); collector.consumeOne(); } } @@ -169,6 +180,7 @@ public class SSTablesBundler implements Iterator<Bundle> .bundleStagingDirectory(bundleStagingDir) .sourceSSTables(sstableFiles) .bundleNameGenerator(bundleNameGenerator) + .fileDigests(fileDigests) .build(); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java index 23394dc1..bec89bad 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java @@ -341,6 +341,21 @@ class BulkSparkConfTest assertThat(cluster2.localDc()).isEqualTo("dc1"); } + @Test + void testDigestAlgorithmSupplierIsXXHashForS3CompactMode() + { + Map<String, String> options = copyDefaultOptions(); + options.put(WriterOptions.DATA_TRANSPORT.name(), DataTransport.S3_COMPAT.name()); + BulkSparkConf conf = new BulkSparkConf(sparkConf, options); + assertThat(conf.digestAlgorithmSupplier).isSameAs(DigestAlgorithms.XXHASH32); + + options.put(WriterOptions.DIGEST.name(), DigestAlgorithms.MD5.name()); + conf = new BulkSparkConf(sparkConf, options); + assertThat(conf.digestAlgorithmSupplier) + .describedAs("Even if DIGEST option is set, the digest algorithm is XXHash32 for S3_COMPAT mode") + .isSameAs(DigestAlgorithms.XXHASH32); + } + private Map<String, String> copyDefaultOptions() { TreeMap<String, String> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java index d6fabf38..3be69787 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java @@ -19,20 +19,26 @@ package org.apache.cassandra.spark.bulkwriter.cloudstorage; +import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.math.BigInteger; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.cassandra.bridge.SSTableSummary; import org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableCollector.SSTableFilesAndRange; +import org.apache.cassandra.spark.common.Digest; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -57,11 +63,20 @@ class BundleTest sourceSSTables.add(mockSSTableFilesAndRange(componentCount, 100)); totalSize += 100; } + Map<Path, Digest> fileDigests = new HashMap<>(); + for (SSTableFilesAndRange sstable : sourceSSTables) + { + for (Path file : sstable.files) + { + fileDigests.put(file, mockDigest(file.getFileName().toString())); + } + } Bundle bundle = Bundle.builder() .bundleSequence(0) .sourceSSTables(sourceSSTables) .bundleNameGenerator(new BundleNameGenerator("jobId", "sessionId")) .bundleStagingDirectory(stagingDir) + .fileDigests(fileDigests) .build(); assertEquals(totalSize, bundle.bundleUncompressedSize); assertEquals(BigInteger.ONE, bundle.firstToken); @@ -70,10 +85,22 @@ class BundleTest assertTrue(Files.exists(bundle.bundleFile)); ZipInputStream zis = new ZipInputStream(new FileInputStream(bundle.bundleFile.toFile())); int acutalFilesCount = 0; - while (zis.getNextEntry() != null) + ZipEntry entry = null; + ObjectMapper objectMapper = new ObjectMapper(); + boolean hasManifest = false; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + while ((entry = zis.getNextEntry()) != null) { acutalFilesCount++; + if (entry.getName().endsWith("manifest.json")) + { + hasManifest = true; + zis.transferTo(baos); + } } + assertTrue(hasManifest); + Map manifest = objectMapper.readValue(baos.toByteArray(), Map.class); + assertManifestEntries(manifest, sourceSSTables); // the extra file (+ 1) is the manifest file assertEquals(sstableCount * componentCount + 1, acutalFilesCount); @@ -84,6 +111,25 @@ class BundleTest assertEquals(0, filesCount); } + private void assertManifestEntries(Map manifest, List<SSTableFilesAndRange> sourceSSTables) + { + Map<String, String> digests = new HashMap<>(); + manifest.values().forEach(value -> { + Map<String, String> componentsChecksum = (Map<String, String>) ((Map<String, Object>) value).get("components_checksum"); + digests.putAll(componentsChecksum); + }); + + for (SSTableFilesAndRange sstable : sourceSSTables) + { + for (Path file : sstable.files) + { + String fileName = file.getFileName().toString(); + assertEquals(digests.getOrDefault(fileName, "File: " + fileName + " is not found in manifest"), fileName, + "The digest in the manifest.json does not match with the filename (test configures filename as digest)"); + } + } + } + private SSTableFilesAndRange mockSSTableFilesAndRange(int fileCount, long size) throws Exception { SSTableSummary summary = new SSTableSummary(BigInteger.ONE, BigInteger.TEN, @@ -95,4 +141,22 @@ class BundleTest } return new SSTableFilesAndRange(summary, paths, size); } + + private Digest mockDigest(String checksum) + { + return new Digest() + { + @Override + public String value() + { + return checksum; + } + + @Override + public o.a.c.sidecar.client.shaded.common.request.data.Digest toSidecarDigest() + { + return null; + } + }; + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java index 7888d2bc..987b5e12 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java @@ -60,6 +60,7 @@ import org.apache.cassandra.spark.bulkwriter.TransportContext; import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.data.FileSystemSSTable; import org.apache.cassandra.spark.data.QualifiedTableName; import org.apache.cassandra.spark.exception.SidecarApiCallException; @@ -69,6 +70,7 @@ import org.apache.cassandra.spark.transports.storage.extensions.StorageTransport import org.apache.cassandra.spark.utils.TemporaryDirectory; import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; +import static org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableListerTest.calculateFileDigests; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -124,8 +126,11 @@ class CloudStorageStreamSessionTest Path outputDir = tempDir.path(); FileUtils.copyDirectory(sourceDir.toFile(), outputDir.toFile()); + Map<Path, Digest> fileDigests = calculateFileDigests(outputDir); + CassandraBridge bridge = generateBridge(outputDir); SSTableLister ssTableLister = new SSTableLister(new QualifiedTableName("ks", "table1"), bridge); + ssTableLister.includeFileDigests(fileDigests); SSTablesBundler ssTablesBundler = new SSTablesBundler(outputDir, ssTableLister, nameGenerator, 5 * 1024); ssTablesBundler.includeDirectory(outputDir); ssTablesBundler.finish(); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java index 598d55b7..6083a852 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java @@ -27,21 +27,30 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.SSTableSummary; +import org.apache.cassandra.spark.bulkwriter.DigestAlgorithms; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.data.FileSystemSSTable; import org.apache.cassandra.spark.data.QualifiedTableName; +import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.apache.cassandra.spark.utils.TemporaryDirectory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -56,6 +65,7 @@ class SSTableListerTest { SSTableLister sstableLister = setupSSTableLister(); sstableLister.includeDirectory(outputDir); + sstableLister.includeFileDigests(calculateFileDigests(outputDir)); List<SSTableLister.SSTableFilesAndRange> sstables = new ArrayList<>(); // 10196 is the total size of files in /data/ks/table1-ea3b3e6b-0d78-4913-89f2-15fcf98711d0 // If this line fails, maybe something has been changed in the folder. @@ -86,6 +96,14 @@ class SSTableListerTest assertTrue(range2Files.contains(outputDir.resolve("na-2-big-Summary.db"))); assertTrue(range2Files.contains(outputDir.resolve("na-2-big-Statistics.db"))); assertTrue(range2Files.contains(outputDir.resolve("na-2-big-TOC.txt"))); + + for (SSTableLister.SSTableFilesAndRange sstable : sstables) + { + for (Path file : sstable.files) + { + assertNotNull(sstableLister.fileDigests(Collections.singleton(file)), "Digest for file should exist. file: " + file); + } + } } @Test @@ -165,4 +183,28 @@ class SSTableListerTest when(bridge.getSSTableSummary("ks", "table1", ssTable2)).thenReturn(summary2); return new SSTableLister(new QualifiedTableName("ks", "table1"), bridge); } + + + static Map<Path, Digest> calculateFileDigests(Path dir) + { + DigestAlgorithm digester = DigestAlgorithms.XXHASH32.get(); + Map<Path, Digest> result = new HashMap<>(); + try (Stream<Path> files = Files.walk(dir)) + { + Iterator<Path> it = files.iterator(); + while (it.hasNext()) + { + Path file = it.next(); + if (Files.isRegularFile(file)) + { + result.put(file, digester.calculateFileDigest(file)); + } + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return result; + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java index 838c45a9..f9d92279 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java @@ -38,10 +38,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.SSTableSummary; import org.apache.cassandra.spark.bulkwriter.util.IOUtils; +import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.data.FileSystemSSTable; import org.apache.cassandra.spark.data.QualifiedTableName; import org.apache.cassandra.spark.utils.TemporaryDirectory; +import static org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableListerTest.calculateFileDigests; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -63,9 +65,11 @@ class SSTablesBundlerTest Path sourceDir = Paths.get(getClass().getResource("/data/ks/table1-ea3b3e6b-0d78-4913-89f2-15fcf98711d0").toURI()); Path outputDir = tempDir.path(); FileUtils.copyDirectory(sourceDir.toFile(), outputDir.toFile()); + Map<Path, Digest> fileDigests = calculateFileDigests(outputDir); CassandraBridge bridge = mockCassandraBridge(outputDir); SSTableLister ssTableLister = new SSTableLister(new QualifiedTableName("ks", "table1"), bridge); + ssTableLister.includeFileDigests(fileDigests); SSTablesBundler ssTablesBundler = new SSTablesBundler(outputDir, ssTableLister, nameGenerator, 5 * 1024); ssTablesBundler.includeDirectory(outputDir); ssTablesBundler.finish(); @@ -94,8 +98,10 @@ class SSTablesBundlerTest Path outputDir = tempDir.path(); FileUtils.copyDirectory(sourceDir.toFile(), outputDir.toFile()); + Map<Path, Digest> fileDigests = calculateFileDigests(outputDir); CassandraBridge bridge = mockCassandraBridge(outputDir); SSTableLister writerOutputAnalyzer = new SSTableLister(new QualifiedTableName("ks", "table1"), bridge); + writerOutputAnalyzer.includeFileDigests(fileDigests); SSTablesBundler ssTablesBundler = new SSTablesBundler(outputDir, writerOutputAnalyzer, nameGenerator, 5 * 1024); ssTablesBundler.includeDirectory(outputDir); ssTablesBundler.finish(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org