This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29d1d521 CASSANALYTICS-107: Fix race condition in 
DirectStreamSession#onSSTablesProduced and SortedSStableWriter#close (#162)
29d1d521 is described below

commit 29d1d5218495f393c49d90153b5124081af436ff
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Dec 16 09:07:27 2025 -0800

    CASSANALYTICS-107: Fix race condition in 
DirectStreamSession#onSSTablesProduced and SortedSStableWriter#close (#162)
    
    Patch by Yifan Cai; Reviewed by Lukasz Antoniak for CASSANALYTICS-107
---
 CHANGES.txt                                        |   1 +
 .../spark/bulkwriter/DirectStreamSession.java      |  17 +-
 .../spark/bulkwriter/SortedSSTableWriter.java      | 116 +++++++--
 .../cassandra/spark/bulkwriter/StreamSession.java  |  27 ++-
 .../spark/bulkwriter/SortedSSTableWriterTest.java  | 262 ++++++++++++++++++++-
 .../bridge/SSTableWriterImplementation.java        |   3 +
 6 files changed, 385 insertions(+), 41 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ee7da68c..94e62135 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Fix race condition in DirectStreamSession#onSSTablesProduced and 
SortedSStableWriter#close (CASSANALYTICS-107)
  * Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
  * Add TimeRangeFilter to filter out SSTables outside given time window 
(CASSANALYTICS-102)
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
index dbae672e..7af61709 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.file.DirectoryStream;
@@ -37,7 +36,6 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +74,9 @@ public class DirectStreamSession extends 
StreamSession<TransportContext.DirectDa
             return;
         }
 
-        // send sstables asynchronously
+        // Send sstables asynchronously.
+        // SAFETY: sstableWriter.prepareSStablesToSend() is synchronized and 
can be called
+        // concurrently with close() from the RecordWriter thread.
         executorService.submit(() -> {
             try
             {
@@ -178,16 +178,7 @@ public class DirectStreamSession extends 
StreamSession<TransportContext.DirectDa
         finally
         {
             // Clean up SSTable files once the task is complete
-            File tempDir = sstableWriter.getOutDir().toFile();
-            LOGGER.info("[{}]: Removing temporary files after stream session 
from {}", sessionID, tempDir);
-            try
-            {
-                FileUtils.deleteDirectory(tempDir);
-            }
-            catch (IOException exception)
-            {
-                LOGGER.warn("[{}]: Failed to delete temporary directory {}", 
sessionID, tempDir, exception);
-            }
+            cleanupSSTables(LOGGER);
         }
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index bbea732e..3d09be47 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -63,6 +63,21 @@ import org.jetbrains.annotations.Nullable;
  * entire partition, i.e. repartitionAndSortWithinPartitions. By doing so, it 
eliminates the nice property of the
  * output sstable being globally sorted and non-overlapping.
  * Unless you can think of a better use case, we should stick with this 
SortedSSTableWriter
+ * <br>
+ * <p>Threading Model:</p>
+ * This class has limited thread-safety guarantees:
+ * <ul>
+ *   <li>{@link #addRow(BigInteger, Map)} and {@link 
#close(BulkWriterContext)} MUST be called from the same thread
+ *       (typically the RecordWriter thread). These methods are NOT 
synchronized and must not be called concurrently.</li>
+ *   <li>{@link #prepareSStablesToSend(BulkWriterContext, Set)} MAY be called 
concurrently from background threads
+ *       (via {@link StreamSession}'s executor service) and is synchronized to 
protect shared state.</li>
+ *   <li>{@link #close(BulkWriterContext)} is synchronized to prevent races 
with concurrent
+ *       {@link #prepareSStablesToSend(BulkWriterContext, Set)} calls.</li>
+ *   <li>Getter methods ({@link #rowCount()}, {@link #bytesWritten()}, {@link 
#sstableCount()}) may return stale
+ *       values if called concurrently with {@link 
#prepareSStablesToSend(BulkWriterContext, Set)} or
+ *       {@link #close(BulkWriterContext)}. They are only guaranteed accurate 
after {@link #close(BulkWriterContext)}
+ *       completes.</li>
+ * </ul>
  */
 @SuppressWarnings("WeakerAccess")
 public class SortedSSTableWriter
@@ -74,15 +89,17 @@ public class SortedSSTableWriter
     private final Path outDir;
     private final org.apache.cassandra.bridge.SSTableWriter cqlSSTableWriter;
     private final int partitionId;
-    private BigInteger minToken = null;
-    private BigInteger maxToken = null;
-    private final Map<Path, Digest> overallFileDigests = new HashMap<>();
     private final DigestAlgorithm digestAlgorithm;
 
-    private volatile boolean isClosed = false;
+    // Fields accessed only from the RecordWriter thread (addRow/close caller)
+    private BigInteger minToken = null;
+    private BigInteger maxToken = null;
+    private long rowCount = 0;
 
+    // Fields protected by synchronization - accessed from both RecordWriter 
thread and executor threads
+    private final Map<Path, Digest> overallFileDigests = new HashMap<>();
+    private boolean isClosed = false;
     private int sstableCount = 0;
-    private long rowCount = 0;
     private long bytesWritten = 0;
 
     public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter 
tableWriter, Path outDir,
@@ -125,6 +142,11 @@ public class SortedSSTableWriter
 
     /**
      * Add a row to be written.
+     * <p>
+     * <b>Threading:</b> This method MUST be called from the same thread that 
calls {@link #close(BulkWriterContext)}
+     * (typically the RecordWriter thread). It is NOT thread-safe and must not 
be called concurrently with any other
+     * method on this instance.
+     *
      * @param token the hashed token of the row's partition key.
      *              The value must be monotonically increasing in the 
subsequent calls.
      * @param boundValues bound values of the columns in the row
@@ -171,11 +193,37 @@ public class SortedSSTableWriter
         return sstableCount;
     }
 
-    public Map<Path, Digest> prepareSStablesToSend(@NotNull BulkWriterContext 
writerContext, Set<SSTableDescriptor> sstables) throws IOException
+    /**
+     * Prepares a set of SSTables to be sent to replicas by calculating 
digests and validating them.
+     * <p>
+     * This method is called when SSTables are produced during the write 
process (before final close).
+     * It processes newly-produced SSTables, calculates their file digests, 
validates them, and updates
+     * the internal counters.
+     * <p>
+     * <b>Threading:</b> This method is thread-safe and may be called 
concurrently from background threads
+     * (e.g., from {@link DirectStreamSession#onSSTablesProduced(Set)} via the 
executor service).
+     * It is synchronized to protect shared state ({@code overallFileDigests}, 
{@code sstableCount},
+     * {@code bytesWritten}) from concurrent access with {@link 
#close(BulkWriterContext)}.
+     *
+     * @param writerContext the bulk writer context
+     * @param sstables the set of SSTable descriptors to prepare
+     * @return a map of file paths to their digests, or an empty map if the 
writer is already closed
+     * @throws IOException if an I/O error occurs
+     */
+    public synchronized Map<Path, Digest> prepareSStablesToSend(@NotNull 
BulkWriterContext writerContext, Set<SSTableDescriptor> sstables) throws 
IOException
     {
+        // If the writer is already closed, return empty map
+        // The remaining SSTables will be handled by sendRemainingSSTables()
+        if (isClosed)
+        {
+            LOGGER.debug("Writer is already closed, returning empty digest 
map. Remaining SSTables will be handled by sendRemainingSSTables()");
+            return Collections.emptyMap();
+        }
+
+        // Filter for SSTables that match the requested descriptors AND 
haven't been hashed yet
         DirectoryStream.Filter<Path> sstableFilter = path -> {
             SSTableDescriptor baseName = SSTables.getSSTableDescriptor(path);
-            return sstables.contains(baseName);
+            return sstables.contains(baseName) && 
!overallFileDigests.containsKey(path);
         };
         Set<Path> dataFilePaths = new HashSet<>();
         Map<Path, Digest> fileDigests = new HashMap<>();
@@ -200,7 +248,23 @@ public class SortedSSTableWriter
         return fileDigests;
     }
 
-    public void close(BulkWriterContext writerContext) throws IOException
+    /**
+     * Closes this writer, flushes any remaining data, calculates digests, and 
validates all SSTables.
+     * <p>
+     * This method performs the final flush of the SSTable writer, processes 
any SSTables that were not
+     * already handled by {@link #prepareSStablesToSend(BulkWriterContext, 
Set)}, calculates their digests,
+     * and validates all written SSTables.
+     * <p>
+     * <b>Threading:</b> This method MUST be called from the same thread that 
calls {@link #addRow(BigInteger, Map)}
+     * (typically the RecordWriter thread). It is synchronized to prevent 
races with concurrent
+     * {@link #prepareSStablesToSend(BulkWriterContext, Set)} calls from 
background threads.
+     * <p>
+     * This method is idempotent - calling it multiple times will return early 
after the first call completes.
+     *
+     * @param writerContext the bulk writer context
+     * @throws IOException if an I/O error occurs during closing
+     */
+    public synchronized void close(BulkWriterContext writerContext) throws 
IOException
     {
         if (isClosed)
         {
@@ -209,24 +273,27 @@ public class SortedSSTableWriter
         }
         isClosed = true;
         cqlSSTableWriter.close();
-        for (Path dataFile : getDataFileStream())
+
+        Set<Path> hashedFiles = new HashSet<>(overallFileDigests.keySet());
+        Set<Path> newlyHashedFiles = new HashSet<>();
+
+        // Filter out SSTables that were already hashed during production (via 
prepareSStablesToSend)
+        // Only process new SSTables produced during final flush
+        DirectoryStream.Filter<Path> unhashedFilter = path -> 
!hashedFiles.contains(path);
+
+        for (Path dataFile : getDataFileStream(unhashedFilter))
         {
             // NOTE: We calculate file hashes before re-reading so that we 
know what we hashed
             //       is what we validated. Then we send these along with the 
files and the
             //       receiving end re-hashes the files to make sure they still 
match.
-
-            // Skip hash calculation for SSTables that were already hashed 
during production
-            // (via prepareSStablesToSend). Only hash new SSTables produced 
during final flush.
-            boolean alreadyHashed = overallFileDigests.keySet()
-                                                      .stream()
-                                                      .anyMatch(path -> 
SSTables.getSSTableBaseName(path).equals(SSTables.getSSTableBaseName(dataFile)));
-            if (!alreadyHashed)
-            {
-                overallFileDigests.putAll(calculateFileDigestMap(dataFile));
-            }
+            Map<Path, Digest> newFileDigests = 
calculateFileDigestMap(dataFile);
+            overallFileDigests.putAll(newFileDigests);
+            newlyHashedFiles.addAll(newFileDigests.keySet());
             sstableCount += 1;
         }
-        bytesWritten += calculatedTotalSize(overallFileDigests.keySet());
+        // Only calculate size for newly hashed files, not all files in 
overallFileDigests
+        // (previously hashed files may have been deleted by 
prepareSStablesToSend)
+        bytesWritten += calculatedTotalSize(newlyHashedFiles);
         validateSSTables(writerContext);
     }
 
@@ -287,9 +354,14 @@ public class SortedSSTableWriter
         }
     }
 
-    private DirectoryStream<Path> getDataFileStream() throws IOException
+    private DirectoryStream<Path> 
getDataFileStream(DirectoryStream.Filter<Path> filter) throws IOException
     {
-        return Files.newDirectoryStream(getOutDir(), "*Data.db");
+        // Combine the data file filter with the provided filter
+        DirectoryStream.Filter<Path> combinedFilter = path -> {
+            String fileName = path.getFileName().toString();
+            return fileName.endsWith("Data.db") && filter.accept(path);
+        };
+        return Files.newDirectoryStream(getOutDir(), combinedFilter);
     }
 
     private Map<Path, Digest> calculateFileDigestMap(Path dataFile) throws 
IOException
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
index 88a97f78..9a05dca6 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
+import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.file.Path;
@@ -36,6 +37,7 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,7 +141,16 @@ public abstract class StreamSession<T extends 
TransportContext>
                                  "SSTable range %s should be enclosed in the 
partition range %s",
                                  sstableWriter.getTokenRange(), tokenRange);
         // close the writer before finalizing stream
-        sstableWriter.close(writerContext);
+        try
+        {
+            sstableWriter.close(writerContext);
+        }
+        catch (IOException ioe)
+        {
+            // clean up the sstables and rethrow on I/O errors when closing
+            cleanupSSTables(LOGGER);
+            throw ioe;
+        }
         return executorService.submit(this::doFinalizeStream);
     }
 
@@ -186,4 +197,18 @@ public abstract class StreamSession<T extends 
TransportContext>
         Collections.shuffle(replicasForTokenRange);
         return replicasForTokenRange;
     }
+
+    protected void cleanupSSTables(Logger logger)
+    {
+        File tempDir = sstableWriter.getOutDir().toFile();
+        logger.info("[{}]: Removing temporary files after stream session from 
{}", sessionID, tempDir);
+        try
+        {
+            FileUtils.deleteDirectory(tempDir);
+        }
+        catch (IOException exception)
+        {
+            logger.warn("[{}]: Failed to delete temporary directory {}", 
sessionID, tempDir, exception);
+        }
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
index f1c8ea74..6328f125 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
@@ -28,11 +28,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.io.TempDir;
@@ -44,15 +51,17 @@ import org.apache.cassandra.bridge.CassandraVersionFeatures;
 import org.apache.cassandra.bridge.SSTableDescriptor;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 
 public class SortedSSTableWriterTest
 {
     private static String previousMbeanState;
 
-    public static Iterable<Object[]> data()
+    public static Iterable<Object[]> supportedVersions()
     {
         return Arrays.stream(CassandraVersion.supportedVersions())
                      .map(version -> new Object[]{version})
@@ -86,7 +95,7 @@ public class SortedSSTableWriterTest
     private Path tmpDir;
 
     @ParameterizedTest
-    @MethodSource("data")
+    @MethodSource("supportedVersions")
     public void canCreateWriterForVersion(String version) throws IOException
     {
         MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
@@ -102,10 +111,19 @@ public class SortedSSTableWriterTest
         {
             case 40:
             case 41:
-                assertThat(baseFileName).isEqualTo("nb-1-big");
+                // Format is "nb-<generation>-big"
+                assertThat(baseFileName).matches("nb-\\d+-big");
                 break;
             case 50:
-                
assertThat(baseFileName).isEqualTo("big".equals(CassandraVersion.sstableFormat())
 ? "oa-2-big" : "da-2-bti");
+                // Format is "oa-<generation>-big" or "da-<generation>-bti"
+                if ("big".equals(CassandraVersion.sstableFormat()))
+                {
+                    assertThat(baseFileName).matches("oa-\\d+-big");
+                }
+                else
+                {
+                    assertThat(baseFileName).matches("da-\\d+-bti");
+                }
                 break;
             default:
                 throw new UnsupportedOperationException("Unsupported version: 
" + version);
@@ -116,11 +134,245 @@ public class SortedSSTableWriterTest
             dataFileStream.forEach(dataFilePath -> {
                 dataFilePaths.add(dataFilePath);
                 
assertThat(SSTables.cassandraVersionFromTable(dataFilePath).getMajorVersion())
-                        
.isEqualTo(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion());
+                
.isEqualTo(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion());
             });
         }
         // no exception should be thrown from both the validate methods
         tw.validateSSTables(writerContext);
         tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
     }
+
+    /**
+     * Tests the race condition fix between prepareSStablesToSend (called from 
background threads)
+     * and close (called from the main thread). This test exercises 
CASSANALYTICS-107.
+     *
+     * This test focuses on verifying thread safety when:
+     * 1. prepareSStablesToSend is called repeatedly from a background thread
+     * 2. close is called from the main thread
+     * 3. Both methods access shared state concurrently
+     */
+    @ParameterizedTest
+    @MethodSource("supportedVersions")
+    public void testConcurrentPrepareSStablesToSendAndClose(String version) 
throws Exception
+    {
+        MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
+
+        // First, create real SSTables that will be used to simulate the race
+        // These SSTables will be in tmpDir and represent the "intermediate 
flush" scenario
+        List<SSTableDescriptor> existingSSTables = 
mockSSTableProduced(writerContext);
+
+        // Verify we have SSTables to work with
+        assertThat(existingSSTables).as("Should have produced 
SSTables").isNotEmpty();
+
+        // Now create the writer that will be tested for the race condition
+        // It will share the same directory (tmpDir) where the SSTables 
already exist, i.e. already have sstables produced
+        SortedSSTableWriter testWriter = new 
SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1);
+        testWriter.setSSTablesProducedListener(x -> {});
+
+        // Add a row to the test writer (this will produce another SSTable on 
close)
+        testWriter.addRow(BigInteger.valueOf(100), ImmutableMap.of("id", 2, 
"date", 2, "course", "test2", "marks", 200));
+
+        // Simulate the race: prepareSStablesToSend in background thread 
processing existing SSTables,
+        // while close is called in main thread
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try
+        {
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(2);
+
+            // Background thread: repeatedly call prepareSStablesToSend with 
the existing SSTables
+            // This simulates DirectStreamSession#onSSTablesProduced 
processing an intermediate flush
+            Future<?> prepareFuture = executor.submit(() -> {
+                Uninterruptibles.awaitUninterruptibly(startLatch);
+                try
+                {
+                    // Repeatedly call prepareSStablesToSend to increase 
chance of race
+                    for (int i = 0; i < 50; i++)
+                    {
+                        try
+                        {
+                            // Use the existing SSTables to simulate real 
scenario
+                            testWriter.prepareSStablesToSend(writerContext, 
new HashSet<>(existingSSTables));
+                            Thread.yield();
+                        }
+                        catch (IOException e)
+                        {
+                            // IOException is acceptable (e.g., files already 
processed)
+                            // But ConcurrentModificationException would 
indicate a threading bug
+                            String message = e.getMessage();
+                            if (message != null && 
message.toLowerCase().contains("concurrent"))
+                            {
+                                throw new RuntimeException("Thread safety 
violation detected", e);
+                            }
+                        }
+                    }
+                }
+                finally
+                {
+                    completionLatch.countDown();
+                }
+            });
+
+            // Main thread: call close
+            Future<?> closeFuture = executor.submit(() -> {
+                Uninterruptibles.awaitUninterruptibly(startLatch);
+                try
+                {
+                    // Small delay to let prepareSStablesToSend start
+                    Uninterruptibles.sleepUninterruptibly(5, 
TimeUnit.MILLISECONDS);
+                    testWriter.close(writerContext);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException("close failed", e);
+                }
+                finally
+                {
+                    completionLatch.countDown();
+                }
+            });
+
+            // Start both operations concurrently
+            startLatch.countDown();
+
+            // Wait for both to complete
+            assertThat(completionLatch.await(30, TimeUnit.SECONDS))
+            .as("Both operations should complete within timeout")
+            .isTrue();
+
+            // Verify neither future threw an exception
+            prepareFuture.get(5, TimeUnit.SECONDS);
+            closeFuture.get(5, TimeUnit.SECONDS);
+
+            // Verify the writer is in a consistent state
+            assertThat(testWriter.sstableCount()).isEqualTo(2);
+            assertThat(testWriter.bytesWritten()).isGreaterThan(0);
+
+            // Verify file digests map is not corrupted
+            assertThat(testWriter.fileDigestMap()).isNotEmpty();
+
+            // Verify SSTables can still be validated (no data corruption)
+            testWriter.validateSSTables(writerContext);
+        }
+        finally
+        {
+            executor.shutdown();
+            assertThat(executor.awaitTermination(10, TimeUnit.SECONDS))
+            .as("Executor should terminate cleanly")
+            .isTrue();
+        }
+    }
+
+    /**
+     * Tests the scenario where prepareSStablesToSend is called with produced 
SSTables,
+     * those files are then deleted (simulating DirectStreamSession behavior), 
and then
+     * close() is called. This verifies that close() only calculates 
bytesWritten for
+     * newly produced files, not the already-processed (and deleted) ones.
+     * This test exercises CASSANALYTICS-107.
+     */
+    @ParameterizedTest
+    @MethodSource("supportedVersions")
+    public void testBytesWrittenWithDeletedFiles(String version) throws 
Exception
+    {
+        MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
+
+        // Create initial SSTables to simulate intermediate flush
+        List<SSTableDescriptor> existingSSTables = 
mockSSTableProduced(writerContext);
+        assertThat(existingSSTables).as("Should have produced 
SSTables").isNotEmpty();
+
+        // Create a new writer that will process the existing SSTables
+        SortedSSTableWriter writer = new SortedSSTableWriter(writerContext, 
tmpDir, new XXHash32DigestAlgorithm(), 1);
+        writer.setSSTablesProducedListener(x -> {});
+
+        // Add a row to produce another SSTable on close (use higher token to 
maintain order)
+        writer.addRow(BigInteger.valueOf(100), ImmutableMap.of("id", 2, 
"date", 2, "course", "test2", "marks", 200));
+
+        // Call prepareSStablesToSend with the existing SSTables
+        Map<Path, Digest> processedFiles = 
writer.prepareSStablesToSend(writerContext, new HashSet<>(existingSSTables));
+        assertThat(processedFiles).as("Should have processed existing 
SSTables").isNotEmpty();
+
+        long bytesAfterPrepare = writer.bytesWritten();
+
+        // Delete the files that were processed (simulating 
DirectStreamSession behavior)
+        for (Path path : processedFiles.keySet())
+        {
+            Files.deleteIfExists(path);
+        }
+
+        // Now close - this should only count NEW files, not trying to 
re-count deleted ones
+        // No NoSuchFileException should be thrown
+        assertThatNoException().isThrownBy(() -> writer.close(writerContext));
+
+        // Verify bytesWritten increased (from close processing new files)
+        assertThat(writer.bytesWritten())
+        .as("bytesWritten should have increased after close")
+        .isGreaterThanOrEqualTo(bytesAfterPrepare);
+
+        assertThat(writer.sstableCount()).as("Should have correct sstable 
count").isEqualTo(2);
+    }
+
+    /**
+     * Tests that prepareSStablesToSend returns an empty map when called after 
close().
+     * This verifies that the method properly guards against being called 
after the writer is closed,
+     * which would otherwise cause double-counting of SSTables and bytes.
+     */
+    @ParameterizedTest
+    @MethodSource("supportedVersions")
+    public void testPrepareSStablesToSendAfterClose(String version) throws 
Exception
+    {
+        MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
+
+        // Create a writer and add a row
+        SortedSSTableWriter writer = new SortedSSTableWriter(writerContext, 
tmpDir, new XXHash32DigestAlgorithm(), 1);
+        writer.setSSTablesProducedListener(x -> {});
+
+        writer.addRow(BigInteger.valueOf(100), ImmutableMap.of("id", 2, 
"date", 2, "course", "test2", "marks", 200));
+
+        // Close the writer first
+        writer.close(writerContext);
+
+        // Record the state after close
+        int sstableCountAfterClose = writer.sstableCount();
+        long bytesWrittenAfterClose = writer.bytesWritten();
+        int fileDigestCountAfterClose = writer.fileDigestMap().size();
+
+        // Try to call prepareSStablesToSend after close - it should return 
empty map
+        Map<Path, Digest> result = writer.prepareSStablesToSend(writerContext, 
new HashSet<>());
+
+        // Verify it returned an empty map
+        assertThat(result)
+        .as("prepareSStablesToSend should return empty map when called after 
close")
+        .isEmpty();
+
+        // Verify that counters were NOT incremented (no double-counting)
+        assertThat(writer.sstableCount())
+        .as("sstableCount should not change when prepareSStablesToSend is 
called after close")
+        .isEqualTo(sstableCountAfterClose);
+
+        assertThat(writer.bytesWritten())
+        .as("bytesWritten should not change when prepareSStablesToSend is 
called after close")
+        .isEqualTo(bytesWrittenAfterClose);
+
+        assertThat(writer.fileDigestMap().size())
+        .as("fileDigestMap size should not change when prepareSStablesToSend 
is called after close")
+        .isEqualTo(fileDigestCountAfterClose);
+    }
+
+    /**
+     * Helper method to create initial SSTables for testing.
+     * This simulates the scenario where SSTables are produced during 
intermediate flushes, i.e. prepareSStablesToSend
+     */
+    private List<SSTableDescriptor> mockSSTableProduced(MockBulkWriterContext 
writerContext) throws IOException
+    {
+        SortedSSTableWriter initialWriter = new 
SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1);
+        List<SSTableDescriptor> producedSSTables = new ArrayList<>();
+        initialWriter.setSSTablesProducedListener(producedSSTables::addAll);
+
+        // Write a row to produce real SSTables
+        // Use token BigInteger.ONE for consistency
+        initialWriter.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 
1, "course", "test1", "marks", 100));
+        initialWriter.close(writerContext);
+
+        return producedSSTables;
+    }
 }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index a6f9783e..c923b702 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -27,6 +27,8 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.Config;
@@ -38,6 +40,7 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.jetbrains.annotations.NotNull;
 
+@NotThreadSafe // The underlying CQLSSTableWriter is not thread-safe
 public class SSTableWriterImplementation implements SSTableWriter
 {
     static


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to