This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29d1d521 CASSANALYTICS-107: Fix race condition in
DirectStreamSession#onSSTablesProduced and SortedSStableWriter#close (#162)
29d1d521 is described below
commit 29d1d5218495f393c49d90153b5124081af436ff
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Dec 16 09:07:27 2025 -0800
CASSANALYTICS-107: Fix race condition in
DirectStreamSession#onSSTablesProduced and SortedSStableWriter#close (#162)
Patch by Yifan Cai; Reviewed by Lukasz Antoniak for CASSANALYTICS-107
---
CHANGES.txt | 1 +
.../spark/bulkwriter/DirectStreamSession.java | 17 +-
.../spark/bulkwriter/SortedSSTableWriter.java | 116 +++++++--
.../cassandra/spark/bulkwriter/StreamSession.java | 27 ++-
.../spark/bulkwriter/SortedSSTableWriterTest.java | 262 ++++++++++++++++++++-
.../bridge/SSTableWriterImplementation.java | 3 +
6 files changed, 385 insertions(+), 41 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ee7da68c..94e62135 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Fix race condition in DirectStreamSession#onSSTablesProduced and
SortedSStableWriter#close (CASSANALYTICS-107)
* Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
* Add TimeRangeFilter to filter out SSTables outside given time window
(CASSANALYTICS-102)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
index dbae672e..7af61709 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.DirectoryStream;
@@ -37,7 +36,6 @@ import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +74,9 @@ public class DirectStreamSession extends
StreamSession<TransportContext.DirectDa
return;
}
- // send sstables asynchronously
+ // Send sstables asynchronously.
+ // SAFETY: sstableWriter.prepareSStablesToSend() is synchronized and
can be called
+ // concurrently with close() from the RecordWriter thread.
executorService.submit(() -> {
try
{
@@ -178,16 +178,7 @@ public class DirectStreamSession extends
StreamSession<TransportContext.DirectDa
finally
{
// Clean up SSTable files once the task is complete
- File tempDir = sstableWriter.getOutDir().toFile();
- LOGGER.info("[{}]: Removing temporary files after stream session
from {}", sessionID, tempDir);
- try
- {
- FileUtils.deleteDirectory(tempDir);
- }
- catch (IOException exception)
- {
- LOGGER.warn("[{}]: Failed to delete temporary directory {}",
sessionID, tempDir, exception);
- }
+ cleanupSSTables(LOGGER);
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index bbea732e..3d09be47 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -63,6 +63,21 @@ import org.jetbrains.annotations.Nullable;
* entire partition, i.e. repartitionAndSortWithinPartitions. By doing so, it
eliminates the nice property of the
* output sstable being globally sorted and non-overlapping.
* Unless you can think of a better use case, we should stick with this
SortedSSTableWriter
+ * <br>
+ * <p>Threading Model:</p>
+ * This class has limited thread-safety guarantees:
+ * <ul>
+ * <li>{@link #addRow(BigInteger, Map)} and {@link
#close(BulkWriterContext)} MUST be called from the same thread
+ * (typically the RecordWriter thread). These methods are NOT
synchronized and must not be called concurrently.</li>
+ * <li>{@link #prepareSStablesToSend(BulkWriterContext, Set)} MAY be called
concurrently from background threads
+ * (via {@link StreamSession}'s executor service) and is synchronized to
protect shared state.</li>
+ * <li>{@link #close(BulkWriterContext)} is synchronized to prevent races
with concurrent
+ * {@link #prepareSStablesToSend(BulkWriterContext, Set)} calls.</li>
+ * <li>Getter methods ({@link #rowCount()}, {@link #bytesWritten()}, {@link
#sstableCount()}) may return stale
+ * values if called concurrently with {@link
#prepareSStablesToSend(BulkWriterContext, Set)} or
+ * {@link #close(BulkWriterContext)}. They are only guaranteed accurate
after {@link #close(BulkWriterContext)}
+ * completes.</li>
+ * </ul>
*/
@SuppressWarnings("WeakerAccess")
public class SortedSSTableWriter
@@ -74,15 +89,17 @@ public class SortedSSTableWriter
private final Path outDir;
private final org.apache.cassandra.bridge.SSTableWriter cqlSSTableWriter;
private final int partitionId;
- private BigInteger minToken = null;
- private BigInteger maxToken = null;
- private final Map<Path, Digest> overallFileDigests = new HashMap<>();
private final DigestAlgorithm digestAlgorithm;
- private volatile boolean isClosed = false;
+ // Fields accessed only from the RecordWriter thread (addRow/close caller)
+ private BigInteger minToken = null;
+ private BigInteger maxToken = null;
+ private long rowCount = 0;
+ // Fields protected by synchronization - accessed from both RecordWriter
thread and executor threads
+ private final Map<Path, Digest> overallFileDigests = new HashMap<>();
+ private boolean isClosed = false;
private int sstableCount = 0;
- private long rowCount = 0;
private long bytesWritten = 0;
public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter
tableWriter, Path outDir,
@@ -125,6 +142,11 @@ public class SortedSSTableWriter
/**
* Add a row to be written.
+ * <p>
+ * <b>Threading:</b> This method MUST be called from the same thread that
calls {@link #close(BulkWriterContext)}
+ * (typically the RecordWriter thread). It is NOT thread-safe and must not
be called concurrently with any other
+ * method on this instance.
+ *
* @param token the hashed token of the row's partition key.
* The value must be monotonically increasing in the
subsequent calls.
* @param boundValues bound values of the columns in the row
@@ -171,11 +193,37 @@ public class SortedSSTableWriter
return sstableCount;
}
- public Map<Path, Digest> prepareSStablesToSend(@NotNull BulkWriterContext
writerContext, Set<SSTableDescriptor> sstables) throws IOException
+ /**
+ * Prepares a set of SSTables to be sent to replicas by calculating
digests and validating them.
+ * <p>
+ * This method is called when SSTables are produced during the write
process (before final close).
+ * It processes newly-produced SSTables, calculates their file digests,
validates them, and updates
+ * the internal counters.
+ * <p>
+ * <b>Threading:</b> This method is thread-safe and may be called
concurrently from background threads
+ * (e.g., from {@link DirectStreamSession#onSSTablesProduced(Set)} via the
executor service).
+ * It is synchronized to protect shared state ({@code overallFileDigests},
{@code sstableCount},
+ * {@code bytesWritten}) from concurrent access with {@link
#close(BulkWriterContext)}.
+ *
+ * @param writerContext the bulk writer context
+ * @param sstables the set of SSTable descriptors to prepare
+ * @return a map of file paths to their digests, or an empty map if the
writer is already closed
+ * @throws IOException if an I/O error occurs
+ */
+ public synchronized Map<Path, Digest> prepareSStablesToSend(@NotNull
BulkWriterContext writerContext, Set<SSTableDescriptor> sstables) throws
IOException
{
+ // If the writer is already closed, return empty map
+ // The remaining SSTables will be handled by sendRemainingSSTables()
+ if (isClosed)
+ {
+ LOGGER.debug("Writer is already closed, returning empty digest
map. Remaining SSTables will be handled by sendRemainingSSTables()");
+ return Collections.emptyMap();
+ }
+
+ // Filter for SSTables that match the requested descriptors AND
haven't been hashed yet
DirectoryStream.Filter<Path> sstableFilter = path -> {
SSTableDescriptor baseName = SSTables.getSSTableDescriptor(path);
- return sstables.contains(baseName);
+ return sstables.contains(baseName) &&
!overallFileDigests.containsKey(path);
};
Set<Path> dataFilePaths = new HashSet<>();
Map<Path, Digest> fileDigests = new HashMap<>();
@@ -200,7 +248,23 @@ public class SortedSSTableWriter
return fileDigests;
}
- public void close(BulkWriterContext writerContext) throws IOException
+ /**
+ * Closes this writer, flushes any remaining data, calculates digests, and
validates all SSTables.
+ * <p>
+ * This method performs the final flush of the SSTable writer, processes
any SSTables that were not
+ * already handled by {@link #prepareSStablesToSend(BulkWriterContext,
Set)}, calculates their digests,
+ * and validates all written SSTables.
+ * <p>
+ * <b>Threading:</b> This method MUST be called from the same thread that
calls {@link #addRow(BigInteger, Map)}
+ * (typically the RecordWriter thread). It is synchronized to prevent
races with concurrent
+ * {@link #prepareSStablesToSend(BulkWriterContext, Set)} calls from
background threads.
+ * <p>
+ * This method is idempotent - calling it multiple times will return early
after the first call completes.
+ *
+ * @param writerContext the bulk writer context
+ * @throws IOException if an I/O error occurs during closing
+ */
+ public synchronized void close(BulkWriterContext writerContext) throws
IOException
{
if (isClosed)
{
@@ -209,24 +273,27 @@ public class SortedSSTableWriter
}
isClosed = true;
cqlSSTableWriter.close();
- for (Path dataFile : getDataFileStream())
+
+ Set<Path> hashedFiles = new HashSet<>(overallFileDigests.keySet());
+ Set<Path> newlyHashedFiles = new HashSet<>();
+
+ // Filter out SSTables that were already hashed during production (via
prepareSStablesToSend)
+ // Only process new SSTables produced during final flush
+ DirectoryStream.Filter<Path> unhashedFilter = path ->
!hashedFiles.contains(path);
+
+ for (Path dataFile : getDataFileStream(unhashedFilter))
{
// NOTE: We calculate file hashes before re-reading so that we
know what we hashed
// is what we validated. Then we send these along with the
files and the
// receiving end re-hashes the files to make sure they still
match.
-
- // Skip hash calculation for SSTables that were already hashed
during production
- // (via prepareSStablesToSend). Only hash new SSTables produced
during final flush.
- boolean alreadyHashed = overallFileDigests.keySet()
- .stream()
- .anyMatch(path ->
SSTables.getSSTableBaseName(path).equals(SSTables.getSSTableBaseName(dataFile)));
- if (!alreadyHashed)
- {
- overallFileDigests.putAll(calculateFileDigestMap(dataFile));
- }
+ Map<Path, Digest> newFileDigests =
calculateFileDigestMap(dataFile);
+ overallFileDigests.putAll(newFileDigests);
+ newlyHashedFiles.addAll(newFileDigests.keySet());
sstableCount += 1;
}
- bytesWritten += calculatedTotalSize(overallFileDigests.keySet());
+ // Only calculate size for newly hashed files, not all files in
overallFileDigests
+ // (previously hashed files may have been deleted by
prepareSStablesToSend)
+ bytesWritten += calculatedTotalSize(newlyHashedFiles);
validateSSTables(writerContext);
}
@@ -287,9 +354,14 @@ public class SortedSSTableWriter
}
}
- private DirectoryStream<Path> getDataFileStream() throws IOException
+ private DirectoryStream<Path>
getDataFileStream(DirectoryStream.Filter<Path> filter) throws IOException
{
- return Files.newDirectoryStream(getOutDir(), "*Data.db");
+ // Combine the data file filter with the provided filter
+ DirectoryStream.Filter<Path> combinedFilter = path -> {
+ String fileName = path.getFileName().toString();
+ return fileName.endsWith("Data.db") && filter.accept(path);
+ };
+ return Files.newDirectoryStream(getOutDir(), combinedFilter);
}
private Map<Path, Digest> calculateFileDigestMap(Path dataFile) throws
IOException
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
index 88a97f78..9a05dca6 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.spark.bulkwriter;
+import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.Path;
@@ -36,6 +37,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,7 +141,16 @@ public abstract class StreamSession<T extends
TransportContext>
"SSTable range %s should be enclosed in the
partition range %s",
sstableWriter.getTokenRange(), tokenRange);
// close the writer before finalizing stream
- sstableWriter.close(writerContext);
+ try
+ {
+ sstableWriter.close(writerContext);
+ }
+ catch (IOException ioe)
+ {
+ // clean up the sstables and rethrow on I/O errors when closing
+ cleanupSSTables(LOGGER);
+ throw ioe;
+ }
return executorService.submit(this::doFinalizeStream);
}
@@ -186,4 +197,18 @@ public abstract class StreamSession<T extends
TransportContext>
Collections.shuffle(replicasForTokenRange);
return replicasForTokenRange;
}
+
+ protected void cleanupSSTables(Logger logger)
+ {
+ File tempDir = sstableWriter.getOutDir().toFile();
+ logger.info("[{}]: Removing temporary files after stream session from
{}", sessionID, tempDir);
+ try
+ {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ catch (IOException exception)
+ {
+ logger.warn("[{}]: Failed to delete temporary directory {}",
sessionID, tempDir, exception);
+ }
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
index f1c8ea74..6328f125 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
@@ -28,11 +28,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
@@ -44,15 +51,17 @@ import org.apache.cassandra.bridge.CassandraVersionFeatures;
import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.common.Digest;
import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
public class SortedSSTableWriterTest
{
private static String previousMbeanState;
- public static Iterable<Object[]> data()
+ public static Iterable<Object[]> supportedVersions()
{
return Arrays.stream(CassandraVersion.supportedVersions())
.map(version -> new Object[]{version})
@@ -86,7 +95,7 @@ public class SortedSSTableWriterTest
private Path tmpDir;
@ParameterizedTest
- @MethodSource("data")
+ @MethodSource("supportedVersions")
public void canCreateWriterForVersion(String version) throws IOException
{
MockBulkWriterContext writerContext = new
MockBulkWriterContext(tokenRangeMapping, version,
ConsistencyLevel.CL.LOCAL_QUORUM);
@@ -102,10 +111,19 @@ public class SortedSSTableWriterTest
{
case 40:
case 41:
- assertThat(baseFileName).isEqualTo("nb-1-big");
+ // Format is "nb-<generation>-big"
+ assertThat(baseFileName).matches("nb-\\d+-big");
break;
case 50:
-
assertThat(baseFileName).isEqualTo("big".equals(CassandraVersion.sstableFormat())
? "oa-2-big" : "da-2-bti");
+ // Format is "oa-<generation>-big" or "da-<generation>-bti"
+ if ("big".equals(CassandraVersion.sstableFormat()))
+ {
+ assertThat(baseFileName).matches("oa-\\d+-big");
+ }
+ else
+ {
+ assertThat(baseFileName).matches("da-\\d+-bti");
+ }
break;
default:
throw new UnsupportedOperationException("Unsupported version:
" + version);
@@ -116,11 +134,245 @@ public class SortedSSTableWriterTest
dataFileStream.forEach(dataFilePath -> {
dataFilePaths.add(dataFilePath);
assertThat(SSTables.cassandraVersionFromTable(dataFilePath).getMajorVersion())
-
.isEqualTo(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion());
+
.isEqualTo(CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion());
});
}
// no exception should be thrown from both the validate methods
tw.validateSSTables(writerContext);
tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
}
+
+ /**
+ * Tests the race condition fix between prepareSStablesToSend (called from
background threads)
+ * and close (called from the main thread). This test exercises
CASSANALYTICS-107.
+ *
+ * This test focuses on verifying thread safety when:
+ * 1. prepareSStablesToSend is called repeatedly from a background thread
+ * 2. close is called from the main thread
+ * 3. Both methods access shared state concurrently
+ */
+ @ParameterizedTest
+ @MethodSource("supportedVersions")
+ public void testConcurrentPrepareSStablesToSendAndClose(String version)
throws Exception
+ {
+ MockBulkWriterContext writerContext = new
MockBulkWriterContext(tokenRangeMapping, version,
ConsistencyLevel.CL.LOCAL_QUORUM);
+
+ // First, create real SSTables that will be used to simulate the race
+ // These SSTables will be in tmpDir and represent the "intermediate
flush" scenario
+ List<SSTableDescriptor> existingSSTables =
mockSSTableProduced(writerContext);
+
+ // Verify we have SSTables to work with
+ assertThat(existingSSTables).as("Should have produced
SSTables").isNotEmpty();
+
+ // Now create the writer that will be tested for the race condition
+ // It will share the same directory (tmpDir) where the SSTables
already exist, i.e. already have sstables produced
+ SortedSSTableWriter testWriter = new
SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1);
+ testWriter.setSSTablesProducedListener(x -> {});
+
+ // Add a row to the test writer (this will produce another SSTable on
close)
+ testWriter.addRow(BigInteger.valueOf(100), ImmutableMap.of("id", 2,
"date", 2, "course", "test2", "marks", 200));
+
+ // Simulate the race: prepareSStablesToSend in background thread
processing existing SSTables,
+ // while close is called in main thread
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try
+ {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(2);
+
+ // Background thread: repeatedly call prepareSStablesToSend with
the existing SSTables
+ // This simulates DirectStreamSession#onSSTablesProduced
processing an intermediate flush
+ Future<?> prepareFuture = executor.submit(() -> {
+ Uninterruptibles.awaitUninterruptibly(startLatch);
+ try
+ {
+ // Repeatedly call prepareSStablesToSend to increase
chance of race
+ for (int i = 0; i < 50; i++)
+ {
+ try
+ {
+ // Use the existing SSTables to simulate real
scenario
+ testWriter.prepareSStablesToSend(writerContext,
new HashSet<>(existingSSTables));
+ Thread.yield();
+ }
+ catch (IOException e)
+ {
+ // IOException is acceptable (e.g., files already
processed)
+ // But ConcurrentModificationException would
indicate a threading bug
+ String message = e.getMessage();
+ if (message != null &&
message.toLowerCase().contains("concurrent"))
+ {
+ throw new RuntimeException("Thread safety
violation detected", e);
+ }
+ }
+ }
+ }
+ finally
+ {
+ completionLatch.countDown();
+ }
+ });
+
+ // Main thread: call close
+ Future<?> closeFuture = executor.submit(() -> {
+ Uninterruptibles.awaitUninterruptibly(startLatch);
+ try
+ {
+ // Small delay to let prepareSStablesToSend start
+ Uninterruptibles.sleepUninterruptibly(5,
TimeUnit.MILLISECONDS);
+ testWriter.close(writerContext);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("close failed", e);
+ }
+ finally
+ {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both operations concurrently
+ startLatch.countDown();
+
+ // Wait for both to complete
+ assertThat(completionLatch.await(30, TimeUnit.SECONDS))
+ .as("Both operations should complete within timeout")
+ .isTrue();
+
+ // Verify neither future threw an exception
+ prepareFuture.get(5, TimeUnit.SECONDS);
+ closeFuture.get(5, TimeUnit.SECONDS);
+
+ // Verify the writer is in a consistent state
+ assertThat(testWriter.sstableCount()).isEqualTo(2);
+ assertThat(testWriter.bytesWritten()).isGreaterThan(0);
+
+ // Verify file digests map is not corrupted
+ assertThat(testWriter.fileDigestMap()).isNotEmpty();
+
+ // Verify SSTables can still be validated (no data corruption)
+ testWriter.validateSSTables(writerContext);
+ }
+ finally
+ {
+ executor.shutdown();
+ assertThat(executor.awaitTermination(10, TimeUnit.SECONDS))
+ .as("Executor should terminate cleanly")
+ .isTrue();
+ }
+ }
+
+ /**
+ * Tests the scenario where prepareSStablesToSend is called with produced
SSTables,
+ * those files are then deleted (simulating DirectStreamSession behavior),
and then
+ * close() is called. This verifies that close() only calculates
bytesWritten for
+ * newly produced files, not the already-processed (and deleted) ones.
+ * This test exercises CASSANALYTICS-107.
+ */
+ @ParameterizedTest
+ @MethodSource("supportedVersions")
+ public void testBytesWrittenWithDeletedFiles(String version) throws
Exception
+ {
+ MockBulkWriterContext writerContext = new
MockBulkWriterContext(tokenRangeMapping, version,
ConsistencyLevel.CL.LOCAL_QUORUM);
+
+ // Create initial SSTables to simulate intermediate flush
+ List<SSTableDescriptor> existingSSTables =
mockSSTableProduced(writerContext);
+ assertThat(existingSSTables).as("Should have produced
SSTables").isNotEmpty();
+
+ // Create a new writer that will process the existing SSTables
+ SortedSSTableWriter writer = new SortedSSTableWriter(writerContext,
tmpDir, new XXHash32DigestAlgorithm(), 1);
+ writer.setSSTablesProducedListener(x -> {});
+
+ // Add a row to produce another SSTable on close (use higher token to
maintain order)
+ writer.addRow(BigInteger.valueOf(100), ImmutableMap.of("id", 2,
"date", 2, "course", "test2", "marks", 200));
+
+ // Call prepareSStablesToSend with the existing SSTables
+ Map<Path, Digest> processedFiles =
writer.prepareSStablesToSend(writerContext, new HashSet<>(existingSSTables));
+ assertThat(processedFiles).as("Should have processed existing
SSTables").isNotEmpty();
+
+ long bytesAfterPrepare = writer.bytesWritten();
+
+ // Delete the files that were processed (simulating
DirectStreamSession behavior)
+ for (Path path : processedFiles.keySet())
+ {
+ Files.deleteIfExists(path);
+ }
+
+ // Now close - this should only count NEW files, not trying to
re-count deleted ones
+ // No NoSuchFileException should be thrown
+ assertThatNoException().isThrownBy(() -> writer.close(writerContext));
+
+ // Verify bytesWritten increased (from close processing new files)
+ assertThat(writer.bytesWritten())
+ .as("bytesWritten should have increased after close")
+ .isGreaterThanOrEqualTo(bytesAfterPrepare);
+
+ assertThat(writer.sstableCount()).as("Should have correct sstable
count").isEqualTo(2);
+ }
+
+ /**
+ * Tests that prepareSStablesToSend returns an empty map when called after
close().
+ * This verifies that the method properly guards against being called
after the writer is closed,
+ * which would otherwise cause double-counting of SSTables and bytes.
+ */
+ @ParameterizedTest
+ @MethodSource("supportedVersions")
+ public void testPrepareSStablesToSendAfterClose(String version) throws
Exception
+ {
+ MockBulkWriterContext writerContext = new
MockBulkWriterContext(tokenRangeMapping, version,
ConsistencyLevel.CL.LOCAL_QUORUM);
+
+ // Create a writer and add a row
+ SortedSSTableWriter writer = new SortedSSTableWriter(writerContext,
tmpDir, new XXHash32DigestAlgorithm(), 1);
+ writer.setSSTablesProducedListener(x -> {});
+
+ writer.addRow(BigInteger.valueOf(100), ImmutableMap.of("id", 2,
"date", 2, "course", "test2", "marks", 200));
+
+ // Close the writer first
+ writer.close(writerContext);
+
+ // Record the state after close
+ int sstableCountAfterClose = writer.sstableCount();
+ long bytesWrittenAfterClose = writer.bytesWritten();
+ int fileDigestCountAfterClose = writer.fileDigestMap().size();
+
+ // Try to call prepareSStablesToSend after close - it should return
empty map
+ Map<Path, Digest> result = writer.prepareSStablesToSend(writerContext,
new HashSet<>());
+
+ // Verify it returned an empty map
+ assertThat(result)
+ .as("prepareSStablesToSend should return empty map when called after
close")
+ .isEmpty();
+
+ // Verify that counters were NOT incremented (no double-counting)
+ assertThat(writer.sstableCount())
+ .as("sstableCount should not change when prepareSStablesToSend is
called after close")
+ .isEqualTo(sstableCountAfterClose);
+
+ assertThat(writer.bytesWritten())
+ .as("bytesWritten should not change when prepareSStablesToSend is
called after close")
+ .isEqualTo(bytesWrittenAfterClose);
+
+ assertThat(writer.fileDigestMap().size())
+ .as("fileDigestMap size should not change when prepareSStablesToSend
is called after close")
+ .isEqualTo(fileDigestCountAfterClose);
+ }
+
+ /**
+ * Helper method to create initial SSTables for testing.
+ * This simulates the scenario where SSTables are produced during
intermediate flushes, i.e. prepareSStablesToSend
+ */
+ private List<SSTableDescriptor> mockSSTableProduced(MockBulkWriterContext
writerContext) throws IOException
+ {
+ SortedSSTableWriter initialWriter = new
SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1);
+ List<SSTableDescriptor> producedSSTables = new ArrayList<>();
+ initialWriter.setSSTablesProducedListener(producedSSTables::addAll);
+
+ // Write a row to produce real SSTables
+ // Use token BigInteger.ONE for consistency
+ initialWriter.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date",
1, "course", "test1", "marks", 100));
+ initialWriter.close(writerContext);
+
+ return producedSSTables;
+ }
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index a6f9783e..c923b702 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -27,6 +27,8 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import javax.annotation.concurrent.NotThreadSafe;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.Config;
@@ -38,6 +40,7 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.jetbrains.annotations.NotNull;
+@NotThreadSafe // The underlying CQLSSTableWriter is not thread-safe
public class SSTableWriterImplementation implements SSTableWriter
{
static
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]