This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dbbd211  CASSANDRA-19821: prevent double closing sstable writer (#72)
dbbd211 is described below

commit dbbd211cd420eb185d0579f16f5d46abc7bafeb4
Author: Yifan Cai <y...@apache.org>
AuthorDate: Fri Aug 9 15:47:21 2024 -0700

    CASSANDRA-19821: prevent double closing sstable writer (#72)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19821
---
 CHANGES.txt                                        |   1 +
 .../cassandra/spark/bulkwriter/RecordWriter.java   |  12 +-
 .../spark/bulkwriter/SortedSSTableWriter.java      |  22 +++-
 .../cassandra/spark/bulkwriter/StreamSession.java  |  49 ++-----
 .../bulkwriter/blobupload/BlobStreamSession.java   |   9 --
 .../spark/bulkwriter/SortedSSTableWriterTest.java  |   2 +-
 .../correctness/BulkWriteDiskCorruptionTest.java   | 146 +++++++++++++++++++++
 7 files changed, 181 insertions(+), 60 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6c0e3a0..9a9e5ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Prevent double closing sstable writer (CASSANDRA-19821)
  * Stream sstable eagerly when bulk writing to reclaim local disk space sooner 
(CASSANDRA-19806)
  * Split the Cassandra type logic out from CassandraBridge into a separate 
module (CASSANDRA-19793)
  * Remove other uses of Apache Commons lang for hashcode, equality and random 
string generation (CASSANDRA-19791)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 5368003..3169cff 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -53,12 +53,12 @@ import 
o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.bulkwriter.util.TaskContextUtils;
-import org.apache.cassandra.util.ThreadUtil;
 import org.apache.cassandra.spark.data.BridgeUdtValue;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.utils.DigestAlgorithm;
+import org.apache.cassandra.util.ThreadUtil;
 import org.apache.spark.TaskContext;
 import org.jetbrains.annotations.NotNull;
 import scala.Tuple2;
@@ -184,6 +184,10 @@ public class RecordWriter
             Range<BigInteger> currentRange = subRanges.get(currentRangeIndex);
             while (dataIterator.hasNext())
             {
+                if (streamSession != null)
+                {
+                    streamSession.throwIfLastStreamFailed();
+                }
                 Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
                 BigInteger token = rowData._1().getToken();
                 // Advance to the next range that contains the token.
@@ -219,12 +223,6 @@ public class RecordWriter
                          taskContext.stageAttemptNumber(),
                          taskContext.attemptNumber());
 
-            // if streamSession is not closed/nullified. Clean it up here
-            if (streamSession != null)
-            {
-                streamSession.cleanupOnFailure();
-            }
-
             if (exception instanceof InterruptedException)
             {
                 Thread.currentThread().interrupt();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index bd0e35a..e3b534c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * SSTableWriter that expects sorted data
@@ -77,6 +78,8 @@ public class SortedSSTableWriter
     private final Map<Path, Digest> overallFileDigests = new HashMap<>();
     private final DigestAlgorithm digestAlgorithm;
 
+    private volatile boolean isClosed = false;
+
     private int sstableCount = 0;
     private long rowCount = 0;
     private long bytesWritten = 0;
@@ -99,7 +102,7 @@ public class SortedSSTableWriter
 
         String lowestCassandraVersion = 
writerContext.cluster().getLowestCassandraVersion();
         String packageVersion = getPackageVersion(lowestCassandraVersion);
-        LOGGER.info("Running with version " + packageVersion);
+        LOGGER.info("Running with version {}", packageVersion);
 
         SchemaInfo schema = writerContext.schema();
         TableSchema tableSchema = schema.getTableSchema();
@@ -192,12 +195,18 @@ public class SortedSSTableWriter
         }
         bytesWritten += calculatedTotalSize(fileDigests.keySet());
         overallFileDigests.putAll(fileDigests);
-        validateSSTables(writerContext, dataFilePaths);
+        validateSSTables(writerContext, getOutDir(), dataFilePaths);
         return fileDigests;
     }
 
     public void close(BulkWriterContext writerContext) throws IOException
     {
+        if (isClosed)
+        {
+            LOGGER.info("Already closed");
+            return;
+        }
+        isClosed = true;
         cqlSSTableWriter.close();
         for (Path dataFile : getDataFileStream())
         {
@@ -214,17 +223,19 @@ public class SortedSSTableWriter
     @VisibleForTesting
     public void validateSSTables(@NotNull BulkWriterContext writerContext)
     {
-        validateSSTables(writerContext, null);
+        validateSSTables(writerContext, getOutDir(), null);
     }
 
     /**
      * Validate SSTables. If dataFilePaths is null, it finds all sstables 
under the output directory of the writer and validates them
+     *
+     * @param outputDirectory output directory of the sstable writer
      * @param writerContext bulk writer context
      * @param dataFilePaths paths of sstables (data file) to be validated. The 
argument is nullable.
      *                      When it is null, it validates all sstables under 
the output directory.
      */
     @VisibleForTesting
-    public void validateSSTables(@NotNull BulkWriterContext writerContext, 
Set<Path> dataFilePaths)
+    public void validateSSTables(@NotNull BulkWriterContext writerContext, 
@NotNull Path outputDirectory, @Nullable Set<Path> dataFilePaths)
     {
         // NOTE: If this current implementation of SS-tables' validation 
proves to be a performance issue,
         //       we will need to modify LocalDataLayer to allow scanning and 
compaction of single data file,
@@ -236,7 +247,6 @@ public class SortedSSTableWriter
             String schema = 
writerContext.schema().getTableSchema().createStatement;
             Partitioner partitioner = writerContext.cluster().getPartitioner();
             Set<String> udtStatements = 
writerContext.schema().getUserDefinedTypeStatements();
-            String directory = getOutDir().toString();
             LocalDataLayer layer = new LocalDataLayer(version,
                                                       partitioner,
                                                       keyspace,
@@ -245,7 +255,7 @@ public class SortedSSTableWriter
                                                       Collections.emptyList() 
/* requestedFeatures */,
                                                       false /* 
useSSTableInputStream */,
                                                       null /* statsClass */,
-                                                      directory);
+                                                      
outputDirectory.toString());
             if (dataFilePaths != null)
             {
                 layer.setDataFilePaths(dataFilePaths);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
index 839b46d..d104534 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java
@@ -36,7 +36,6 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,11 +109,20 @@ public abstract class StreamSession<T extends 
TransportContext>
         return tokenRange;
     }
 
-    public void addRow(BigInteger token, Map<String, Object> boundValues) 
throws IOException
+    /**
+     * Throw exception when the last streaming task has failed
+     * @throws IOException
+     */
+    public void throwIfLastStreamFailed() throws IOException
     {
-        // exit early when sending the produced sstables has failed
-        rethrowIfLastStreamFailed();
+        if (lastStreamFailure.get() != null)
+        {
+            throw new IOException("Unexpected exception while streaming 
SSTables", lastStreamFailure.get());
+        }
+    }
 
+    public void addRow(BigInteger token, Map<String, Object> boundValues) 
throws IOException
+    {
         sstableWriter.addRow(token, boundValues);
     }
 
@@ -126,7 +134,6 @@ public abstract class StreamSession<T extends 
TransportContext>
     public Future<StreamResult> finalizeStreamAsync() throws IOException
     {
         isStreamFinalized = true;
-        rethrowIfLastStreamFailed();
         Preconditions.checkState(!sstableWriter.getTokenRange().isEmpty(), 
"Cannot stream empty SSTable");
         
Preconditions.checkState(tokenRange.encloses(sstableWriter.getTokenRange()),
                                  "SSTable range %s should be enclosed in the 
partition range %s",
@@ -136,30 +143,6 @@ public abstract class StreamSession<T extends 
TransportContext>
         return executorService.submit(this::doFinalizeStream);
     }
 
-    /**
-     * Clean up any remaining files on disk when streaming is failed
-     */
-    public void cleanupOnFailure()
-    {
-        try
-        {
-            sstableWriter.close(writerContext);
-        }
-        catch (IOException e)
-        {
-            LOGGER.warn("[{}]: Failed to close sstable writer on streaming 
failure", sessionID, e);
-        }
-
-        try
-        {
-            FileUtils.deleteDirectory(sstableWriter.getOutDir().toFile());
-        }
-        catch (IOException e)
-        {
-            LOGGER.warn("[{}]: Failed to clean up the produced sstables on 
streaming failure", sessionID, e);
-        }
-    }
-
     protected boolean isStreamFinalized()
     {
         return isStreamFinalized;
@@ -180,14 +163,6 @@ public abstract class StreamSession<T extends 
TransportContext>
         return streamedFiles.contains(file);
     }
 
-    private void rethrowIfLastStreamFailed() throws IOException
-    {
-        if (lastStreamFailure.get() != null)
-        {
-            throw new IOException("Unexpected exception while streaming 
SSTables", lastStreamFailure.get());
-        }
-    }
-
     @VisibleForTesting
     List<RingInstance> getReplicas()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
index 8ec1bcf..dd568df 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java
@@ -211,15 +211,6 @@ public class BlobStreamSession extends 
StreamSession<TransportContext.CloudStora
         }
     }
 
-    @Override
-    public void cleanupOnFailure()
-    {
-        super.cleanupOnFailure();
-
-        // remove any remaining bundle
-        sstablesBundler.cleanupBundle(sessionID);
-    }
-
     void sendBundle(Bundle bundle, boolean hasRefreshedCredentials)
     {
         StorageCredentials writeCredentials = 
getStorageCredentialsFromSidecar();
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
index f7d4584..e703394 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
@@ -102,6 +102,6 @@ public class SortedSSTableWriterTest
         }
         // no exception should be thrown from both the validate methods
         tw.validateSSTables(writerContext);
-        tw.validateSSTables(writerContext, dataFilePaths);
+        tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
     }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/correctness/BulkWriteDiskCorruptionTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/correctness/BulkWriteDiskCorruptionTest.java
new file mode 100644
index 0000000..35e6b6a
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/correctness/BulkWriteDiskCorruptionTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics.correctness;
+
+import java.io.RandomAccessFile;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.junit.jupiter.api.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import net.jpountz.lz4.LZ4Exception;
+import org.apache.cassandra.analytics.DataGenerationUtils;
+import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.BulkWriterContext;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+public class BulkWriteDiskCorruptionTest extends 
SharedClusterSparkIntegrationTestBase
+{
+    private static final QualifiedName QUALIFIED_NAME = new 
QualifiedName(TEST_KEYSPACE, "test_write_disk_corruption");
+
+    static
+    {
+        // Intercepts SortedSSTableWriter#validateSSTables to corrupt file on 
purpose.
+        // Install the class rebase the earliest, before JVM loads the class
+        BBHelperInterceptSortedSSTableWriterValidateSSTables.install();
+    }
+
+    @Test
+    void testDiskCorruption()
+    {
+        Map<String, String> writerOptions = new HashMap<>();
+
+        SparkSession spark = getOrCreateSparkSession();
+
+        // Generate some data
+        Dataset<Row> dfWrite = DataGenerationUtils.generateCourseData(spark, 
ROW_COUNT);
+
+        // Write the data using Bulk Writer
+        try
+        {
+            bulkWriterDataFrameWriter(dfWrite, QUALIFIED_NAME, 
writerOptions).save();
+            fail("Bulk write should fail");
+        }
+        catch (Exception ex)
+        {
+            assertThat(ex)
+            .isExactlyInstanceOf(RuntimeException.class)
+            .hasMessageContaining("Bulk Write to Cassandra has failed")
+            .rootCause()
+            .isExactlyInstanceOf(LZ4Exception.class)
+            .hasMessageContaining("Malformed input");
+        }
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+        createTestTable(QUALIFIED_NAME, CREATE_TEST_TABLE_STATEMENT);
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .nodesPerDc(1);
+    }
+
+    public static class BBHelperInterceptSortedSSTableWriterValidateSSTables
+    {
+        public static void install()
+        {
+            TypePool typePool = TypePool.Default.ofSystemLoader();
+            new ByteBuddy()
+            
.rebase(typePool.describe("org.apache.cassandra.spark.bulkwriter.SortedSSTableWriter").resolve(),
+                    ClassFileLocator.ForClassLoader.ofSystemLoader())
+            
.method(named("validateSSTables").and(takesArguments(BulkWriterContext.class, 
Path.class, Set.class)))
+            
.intercept(MethodDelegation.to(BBHelperInterceptSortedSSTableWriterValidateSSTables.class))
+            .make()
+            
.load(BBHelperInterceptSortedSSTableWriterValidateSSTables.class.getClassLoader(),
 ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static void validateSSTables(BulkWriterContext context,
+                                            Path outputDirectory,
+                                            Set<Path> dataFilePaths,
+                                            @SuperCall Callable<?> orig) 
throws Exception
+        {
+            try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(outputDirectory, "*Data.db"))
+            {
+                Path dataFile = stream.iterator().next();
+                try (RandomAccessFile file = new 
RandomAccessFile(dataFile.toFile(), "rw"))
+                {
+                    file.seek(file.length() / 2);
+                    file.writeChars("THIS IS CORRUPT DATA AND SHOULD NOT BE 
READABLE");
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            orig.call();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to