This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e6c527a CASSANALYTICS-19: Fix double digesting (#105)
8e6c527a is described below

commit 8e6c527add29d61b5ee4a0b176c8d3b8d2bcb3c0
Author: Yifan Cai <y...@apache.org>
AuthorDate: Mon Apr 14 16:17:08 2025 -0700

    CASSANALYTICS-19: Fix double digesting (#105)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANALYTICS-19
---
 CHANGES.txt                                        |  1 +
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  8 ++-
 .../spark/bulkwriter/cloudstorage/Bundle.java      | 21 +++++--
 .../cloudstorage/CloudStorageStreamSession.java    |  4 +-
 .../bulkwriter/cloudstorage/SSTableCollector.java  | 15 +++++
 .../bulkwriter/cloudstorage/SSTableLister.java     | 22 ++++++++
 .../bulkwriter/cloudstorage/SSTablesBundler.java   | 12 ++++
 .../spark/bulkwriter/BulkSparkConfTest.java        | 15 +++++
 .../spark/bulkwriter/cloudstorage/BundleTest.java  | 66 +++++++++++++++++++++-
 .../CloudStorageStreamSessionTest.java             |  5 ++
 .../bulkwriter/cloudstorage/SSTableListerTest.java | 42 ++++++++++++++
 .../cloudstorage/SSTablesBundlerTest.java          |  6 ++
 12 files changed, 209 insertions(+), 8 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f29ef75d..74e482ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Bulk writer in S3_COMPAT mode calculates file digest twice 
(CASSANALYTICS-19)
  * Add Apache Cassandra Sidecar CDC implementation (CASSANALYTICS-7)
  * Upgrade sidecar client dependency to 0.1.0 (CASSANALYTICS-16)
  * Remove JDK8 support (CASSANALYTICS-17)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index fc870957..1adec0e3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -186,7 +186,6 @@ public class BulkSparkConf implements Serializable
         this.truststoreBase64Encoded = MapUtils.getOrDefault(options, 
WriterOptions.TRUSTSTORE_BASE64_ENCODED.name(), null);
         this.truststoreType = MapUtils.getOrDefault(options, 
WriterOptions.TRUSTSTORE_TYPE.name(), null);
         this.writeMode = MapUtils.getEnumOption(options, 
WriterOptions.WRITE_MODE.name(), WriteMode.INSERT, "write mode");
-        this.digestAlgorithmSupplier = 
digestAlgorithmSupplierFromOptions(options);
         // For backwards-compatibility with port settings, use writer option 
if available,
         // else fall back to props, and then default if neither specified
         this.useOpenSsl = getBoolean(USE_OPENSSL, true);
@@ -228,6 +227,7 @@ public class BulkSparkConf implements Serializable
         this.configuredJobId = MapUtils.getOrDefault(options, 
WriterOptions.JOB_ID.name(), null);
         this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, 
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
         this.coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+        this.digestAlgorithmSupplier = 
digestAlgorithmSupplierFromOptions(dataTransport, options);
         validateEnvironment();
     }
 
@@ -238,8 +238,12 @@ public class BulkSparkConf implements Serializable
      * @return the configured {@link DigestAlgorithmSupplier}
      */
     @NotNull
-    protected DigestAlgorithmSupplier 
digestAlgorithmSupplierFromOptions(Map<String, String> options)
+    protected DigestAlgorithmSupplier 
digestAlgorithmSupplierFromOptions(DataTransport dataTransport, Map<String, 
String> options)
     {
+        if (dataTransport == DataTransport.S3_COMPAT)
+        {
+            return DigestAlgorithms.XXHASH32;
+        }
         return MapUtils.getEnumOption(options, WriterOptions.DIGEST.name(), 
DigestAlgorithms.XXHASH32, "digest type");
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java
index 7e853c08..1580918e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/Bundle.java
@@ -25,14 +25,16 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableCollector.SSTableFilesAndRange;
-import org.apache.cassandra.spark.bulkwriter.util.IOUtils;
 import org.apache.cassandra.spark.common.DataObjectBuilder;
+import org.apache.cassandra.spark.common.Digest;
 
 /**
  * Bundle represents a set of SSTables bundled, as per bundle size set by 
clients through writer option.
@@ -149,6 +151,7 @@ public class Bundle
         private long bundleUncompressedSize;
         private long bundleCompressedSize;
         private BundleNameGenerator bundleNameGenerator;
+        private Map<Path, Digest> fileDigests;
 
         Builder()
         {
@@ -176,6 +179,11 @@ public class Bundle
             return with(b -> b.bundleNameGenerator = bundleNameGenerator);
         }
 
+        public Builder fileDigests(Map<Path, Digest> fileDigests)
+        {
+            return with(b -> b.fileDigests = fileDigests);
+        }
+
         /**
          * Set the sequence of the bundle. It should be monotonically 
increasing
          * @param bundleSequence sequence of the bundle
@@ -211,7 +219,7 @@ public class Bundle
             {
                 prepareBuild();
             }
-            catch (IOException ioe)
+            catch (Exception ioe)
             {
                 throw new RuntimeException("Unable to produce bundle 
manifest", ioe);
             }
@@ -238,21 +246,26 @@ public class Bundle
 
         private void populateBundleManifestAndPersist() throws IOException
         {
+            int totalFiles = 0;
             for (SSTableFilesAndRange sstable : sourceSSTables)
             {
+                totalFiles += sstable.files.size();
                 // all SSTable components related to one SSTable moved under 
same bundle
                 BundleManifest.Entry manifestEntry = new 
BundleManifest.Entry(sstable.summary);
                 for (Path componentPath : sstable.files)
                 {
-                    String checksum = IOUtils.xxhash32(componentPath);
+                    Digest digest = 
Objects.requireNonNull(fileDigests.get(componentPath), () -> "No digest found 
for file: " + componentPath);
                     Path targetPath = 
bundleDirectory.resolve(componentPath.getFileName());
                     // link the original files to the bundle dir to avoid 
copying data
                     Files.createLink(targetPath, componentPath);
-                    
manifestEntry.addComponentChecksum(componentPath.getFileName().toString(), 
checksum);
+                    
manifestEntry.addComponentChecksum(componentPath.getFileName().toString(), 
digest.value());
                 }
                 addManifestEntry(manifestEntry);
             }
 
+            Preconditions.checkState(totalFiles == fileDigests.size(),
+                                     "SSTable files: %s does not match with 
the size of fileDigests: %s", totalFiles, fileDigests.size());
+
             
bundleManifest.persistTo(bundleDirectory.resolve(Bundle.MANIFEST_FILE_NAME));
         }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
index c26f7416..83c85ad7 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java
@@ -117,6 +117,7 @@ public class CloudStorageStreamSession extends 
StreamSession<TransportContext.Cl
             try
             {
                 Map<Path, Digest> fileDigests = 
sstableWriter.prepareSStablesToSend(writerContext, sstables);
+                sstablesBundler.includeFileDigests(fileDigests);
                 // sstablesBundler keeps track of the known files. No need to 
record the streamed files.
                 // group the files by sstable (unique) basename and add to 
bundler
                 fileDigests.keySet()
@@ -159,7 +160,8 @@ public class CloudStorageStreamSession extends 
StreamSession<TransportContext.Cl
     protected StreamResult doFinalizeStream()
     {
         sstablesBundler.includeDirectory(sstableWriter.getOutDir());
-
+        // Note: there are likely more sstables produced when closing the 
writer. Include and merge with the full file digest map collected by the writer.
+        sstablesBundler.includeFileDigests(sstableWriter.fileDigestMap());
         sstablesBundler.finish();
 
         // last stream produces no bundle, and there is no bundle before that 
(as tracked by bundleCount)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java
index aba8556f..1e7b830e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableCollector.java
@@ -23,9 +23,11 @@ import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.bridge.SSTableSummary;
+import org.apache.cassandra.spark.common.Digest;
 
 /**
  * Collect SSTables from listing the included directories
@@ -44,6 +46,19 @@ public interface SSTableCollector
      */
     void includeSSTable(List<Path> sstableComponents);
 
+    /**
+     * Include the digests of the files
+     * @param fileDigests digest of the files
+     */
+    void includeFileDigests(Map<Path, Digest> fileDigests);
+
+    /**
+     * Get the file digests of the input files
+     * @param files files to gather the digests
+     * @return file digests of the input files
+     */
+    Map<Path, Digest> fileDigests(Set<Path> files);
+
     /**
      * @return total size of all sstables included
      */
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java
index b18181e7..2897956d 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java
@@ -42,10 +42,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.SSTableSummary;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.data.FileSystemSSTable;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.parquet.Preconditions;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -65,6 +67,7 @@ public class SSTableLister implements SSTableCollector
     private final Queue<SSTableFilesAndRange> sstables;
     private final Set<Path> sstableDirectories;
     private final Set<Path> knownFiles;
+    private final Map<Path, Digest> fileDigests;
     private long totalSize;
 
     public SSTableLister(QualifiedTableName qualifiedTableName, 
CassandraBridge bridge)
@@ -73,6 +76,7 @@ public class SSTableLister implements SSTableCollector
         this.bridge = bridge;
         this.sstables = new LinkedBlockingQueue<>();
         this.sstableDirectories = new HashSet<>();
+        this.fileDigests = new HashMap<>();
         this.knownFiles = new HashSet<>();
     }
 
@@ -99,6 +103,24 @@ public class SSTableLister implements SSTableCollector
         sstables.add(sstableAndRange);
     }
 
+    @Override
+    public void includeFileDigests(Map<Path, Digest> fileDigests)
+    {
+        this.fileDigests.putAll(fileDigests);
+    }
+
+    @Override
+    public Map<Path, Digest> fileDigests(Set<Path> files)
+    {
+        Map<Path, Digest> result = new HashMap<>();
+        for (Path file : files)
+        {
+            Preconditions.checkState(fileDigests.containsKey(file), "File not 
found in the fileDigests map. File: %s", file);
+            result.put(file, fileDigests.get(file));
+        }
+        return result;
+    }
+
     @Override
     public long totalSize()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java
index 3ecbae04..947f8095 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundler.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.stream.Stream;
 import java.util.zip.ZipEntry;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableCollector.SSTableFilesAndRange;
+import org.apache.cassandra.spark.common.Digest;
 
 /**
  * {@link SSTablesBundler} bundles SSTables in the output directory provided by
@@ -112,6 +115,11 @@ public class SSTablesBundler implements Iterator<Bundle>
         collector.includeSSTable(sstableComponents);
     }
 
+    public void includeFileDigests(Map<Path, Digest> fileDigests)
+    {
+        collector.includeFileDigests(fileDigests);
+    }
+
     public void finish()
     {
         reachedEnd = true;
@@ -140,6 +148,7 @@ public class SSTablesBundler implements Iterator<Bundle>
     private Bundle computeNext() throws IOException
     {
         List<SSTableFilesAndRange> sstableFiles = new ArrayList<>();
+        Map<Path, Digest> fileDigests = new HashMap<>();
         long size = 0;
         while (!collector.isEmpty())
         {
@@ -158,6 +167,8 @@ public class SSTablesBundler implements Iterator<Bundle>
             else
             {
                 sstableFiles.add(sstable);
+                // include all the digests of the files in this sstable
+                fileDigests.putAll(collector.fileDigests(sstable.files));
                 collector.consumeOne();
             }
         }
@@ -169,6 +180,7 @@ public class SSTablesBundler implements Iterator<Bundle>
                      .bundleStagingDirectory(bundleStagingDir)
                      .sourceSSTables(sstableFiles)
                      .bundleNameGenerator(bundleNameGenerator)
+                     .fileDigests(fileDigests)
                      .build();
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index 23394dc1..bec89bad 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -341,6 +341,21 @@ class BulkSparkConfTest
         assertThat(cluster2.localDc()).isEqualTo("dc1");
     }
 
+    @Test
+    void testDigestAlgorithmSupplierIsXXHashForS3CompactMode()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.DATA_TRANSPORT.name(), 
DataTransport.S3_COMPAT.name());
+        BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
+        
assertThat(conf.digestAlgorithmSupplier).isSameAs(DigestAlgorithms.XXHASH32);
+
+        options.put(WriterOptions.DIGEST.name(), DigestAlgorithms.MD5.name());
+        conf = new BulkSparkConf(sparkConf, options);
+        assertThat(conf.digestAlgorithmSupplier)
+        .describedAs("Even if DIGEST option is set, the digest algorithm is 
XXHash32 for S3_COMPAT mode")
+        .isSameAs(DigestAlgorithms.XXHASH32);
+    }
+
     private Map<String, String> copyDefaultOptions()
     {
         TreeMap<String, String> map = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java
index d6fabf38..3be69787 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/BundleTest.java
@@ -19,20 +19,26 @@
 
 package org.apache.cassandra.spark.bulkwriter.cloudstorage;
 
+import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.math.BigInteger;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.cassandra.bridge.SSTableSummary;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableCollector.SSTableFilesAndRange;
+import org.apache.cassandra.spark.common.Digest;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -57,11 +63,20 @@ class BundleTest
             sourceSSTables.add(mockSSTableFilesAndRange(componentCount, 100));
             totalSize += 100;
         }
+        Map<Path, Digest> fileDigests = new HashMap<>();
+        for (SSTableFilesAndRange sstable : sourceSSTables)
+        {
+            for (Path file : sstable.files)
+            {
+                fileDigests.put(file, 
mockDigest(file.getFileName().toString()));
+            }
+        }
         Bundle bundle = Bundle.builder()
                               .bundleSequence(0)
                               .sourceSSTables(sourceSSTables)
                               .bundleNameGenerator(new 
BundleNameGenerator("jobId", "sessionId"))
                               .bundleStagingDirectory(stagingDir)
+                              .fileDigests(fileDigests)
                               .build();
         assertEquals(totalSize, bundle.bundleUncompressedSize);
         assertEquals(BigInteger.ONE, bundle.firstToken);
@@ -70,10 +85,22 @@ class BundleTest
         assertTrue(Files.exists(bundle.bundleFile));
         ZipInputStream zis = new ZipInputStream(new 
FileInputStream(bundle.bundleFile.toFile()));
         int acutalFilesCount = 0;
-        while (zis.getNextEntry() != null)
+        ZipEntry entry = null;
+        ObjectMapper objectMapper = new ObjectMapper();
+        boolean hasManifest = false;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        while ((entry = zis.getNextEntry()) != null)
         {
             acutalFilesCount++;
+            if (entry.getName().endsWith("manifest.json"))
+            {
+                hasManifest = true;
+                zis.transferTo(baos);
+            }
         }
+        assertTrue(hasManifest);
+        Map manifest = objectMapper.readValue(baos.toByteArray(), Map.class);
+        assertManifestEntries(manifest, sourceSSTables);
         // the extra file (+ 1) is the manifest file
         assertEquals(sstableCount * componentCount + 1, acutalFilesCount);
 
@@ -84,6 +111,25 @@ class BundleTest
         assertEquals(0, filesCount);
     }
 
+    private void assertManifestEntries(Map manifest, 
List<SSTableFilesAndRange> sourceSSTables)
+    {
+        Map<String, String> digests = new HashMap<>();
+        manifest.values().forEach(value -> {
+            Map<String, String> componentsChecksum = (Map<String, String>) 
((Map<String, Object>) value).get("components_checksum");
+            digests.putAll(componentsChecksum);
+        });
+
+        for (SSTableFilesAndRange sstable : sourceSSTables)
+        {
+            for (Path file : sstable.files)
+            {
+                String fileName = file.getFileName().toString();
+                assertEquals(digests.getOrDefault(fileName, "File: " + 
fileName + " is not found in manifest"), fileName,
+                             "The digest in the manifest.json does not match 
with the filename (test configures filename as digest)");
+            }
+        }
+    }
+
     private SSTableFilesAndRange mockSSTableFilesAndRange(int fileCount, long 
size) throws Exception
     {
         SSTableSummary summary = new SSTableSummary(BigInteger.ONE, 
BigInteger.TEN,
@@ -95,4 +141,22 @@ class BundleTest
         }
         return new SSTableFilesAndRange(summary, paths, size);
     }
+
+    private Digest mockDigest(String checksum)
+    {
+        return new Digest()
+        {
+            @Override
+            public String value()
+            {
+                return checksum;
+            }
+
+            @Override
+            public o.a.c.sidecar.client.shaded.common.request.data.Digest 
toSidecarDigest()
+            {
+                return null;
+            }
+        };
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
index 7888d2bc..987b5e12 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSessionTest.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.spark.bulkwriter.TransportContext;
 import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.data.FileSystemSSTable;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.apache.cassandra.spark.exception.SidecarApiCallException;
@@ -69,6 +70,7 @@ import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransport
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 
+import static 
org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableListerTest.calculateFileDigests;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -124,8 +126,11 @@ class CloudStorageStreamSessionTest
             Path outputDir = tempDir.path();
             FileUtils.copyDirectory(sourceDir.toFile(), outputDir.toFile());
 
+            Map<Path, Digest> fileDigests = calculateFileDigests(outputDir);
+
             CassandraBridge bridge = generateBridge(outputDir);
             SSTableLister ssTableLister = new SSTableLister(new 
QualifiedTableName("ks", "table1"), bridge);
+            ssTableLister.includeFileDigests(fileDigests);
             SSTablesBundler ssTablesBundler = new SSTablesBundler(outputDir, 
ssTableLister, nameGenerator, 5 * 1024);
             ssTablesBundler.includeDirectory(outputDir);
             ssTablesBundler.finish();
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java
index 598d55b7..6083a852 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableListerTest.java
@@ -27,21 +27,30 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 
 import org.junit.jupiter.api.Test;
 
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.SSTableSummary;
+import org.apache.cassandra.spark.bulkwriter.DigestAlgorithms;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.data.FileSystemSSTable;
 import org.apache.cassandra.spark.data.QualifiedTableName;
+import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -56,6 +65,7 @@ class SSTableListerTest
     {
         SSTableLister sstableLister = setupSSTableLister();
         sstableLister.includeDirectory(outputDir);
+        sstableLister.includeFileDigests(calculateFileDigests(outputDir));
         List<SSTableLister.SSTableFilesAndRange> sstables = new ArrayList<>();
         // 10196 is the total size of files in 
/data/ks/table1-ea3b3e6b-0d78-4913-89f2-15fcf98711d0
         // If this line fails, maybe something has been changed in the folder.
@@ -86,6 +96,14 @@ class SSTableListerTest
         
assertTrue(range2Files.contains(outputDir.resolve("na-2-big-Summary.db")));
         
assertTrue(range2Files.contains(outputDir.resolve("na-2-big-Statistics.db")));
         
assertTrue(range2Files.contains(outputDir.resolve("na-2-big-TOC.txt")));
+
+        for (SSTableLister.SSTableFilesAndRange sstable : sstables)
+        {
+            for (Path file : sstable.files)
+            {
+                
assertNotNull(sstableLister.fileDigests(Collections.singleton(file)), "Digest 
for file should exist. file: " + file);
+            }
+        }
     }
 
     @Test
@@ -165,4 +183,28 @@ class SSTableListerTest
         when(bridge.getSSTableSummary("ks", "table1", 
ssTable2)).thenReturn(summary2);
         return new SSTableLister(new QualifiedTableName("ks", "table1"), 
bridge);
     }
+
+
+    static Map<Path, Digest> calculateFileDigests(Path dir)
+    {
+        DigestAlgorithm digester = DigestAlgorithms.XXHASH32.get();
+        Map<Path, Digest> result = new HashMap<>();
+        try (Stream<Path> files = Files.walk(dir))
+        {
+            Iterator<Path> it = files.iterator();
+            while (it.hasNext())
+            {
+                Path file = it.next();
+                if (Files.isRegularFile(file))
+                {
+                    result.put(file, digester.calculateFileDigest(file));
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return result;
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java
index 838c45a9..f9d92279 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTablesBundlerTest.java
@@ -38,10 +38,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.SSTableSummary;
 import org.apache.cassandra.spark.bulkwriter.util.IOUtils;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.data.FileSystemSSTable;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 
+import static 
org.apache.cassandra.spark.bulkwriter.cloudstorage.SSTableListerTest.calculateFileDigests;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -63,9 +65,11 @@ class SSTablesBundlerTest
             Path sourceDir = 
Paths.get(getClass().getResource("/data/ks/table1-ea3b3e6b-0d78-4913-89f2-15fcf98711d0").toURI());
             Path outputDir = tempDir.path();
             FileUtils.copyDirectory(sourceDir.toFile(), outputDir.toFile());
+            Map<Path, Digest> fileDigests = calculateFileDigests(outputDir);
 
             CassandraBridge bridge = mockCassandraBridge(outputDir);
             SSTableLister ssTableLister = new SSTableLister(new 
QualifiedTableName("ks", "table1"), bridge);
+            ssTableLister.includeFileDigests(fileDigests);
             SSTablesBundler ssTablesBundler = new SSTablesBundler(outputDir, 
ssTableLister, nameGenerator, 5 * 1024);
             ssTablesBundler.includeDirectory(outputDir);
             ssTablesBundler.finish();
@@ -94,8 +98,10 @@ class SSTablesBundlerTest
             Path outputDir = tempDir.path();
             FileUtils.copyDirectory(sourceDir.toFile(), outputDir.toFile());
 
+            Map<Path, Digest> fileDigests = calculateFileDigests(outputDir);
             CassandraBridge bridge = mockCassandraBridge(outputDir);
             SSTableLister writerOutputAnalyzer = new SSTableLister(new 
QualifiedTableName("ks", "table1"), bridge);
+            writerOutputAnalyzer.includeFileDigests(fileDigests);
             SSTablesBundler ssTablesBundler = new SSTablesBundler(outputDir, 
writerOutputAnalyzer, nameGenerator, 5 * 1024);
             ssTablesBundler.includeDirectory(outputDir);
             ssTablesBundler.finish();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to