This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new dbbd211 CASSANDRA-19821: prevent double closing sstable writer (#72) dbbd211 is described below commit dbbd211cd420eb185d0579f16f5d46abc7bafeb4 Author: Yifan Cai <y...@apache.org> AuthorDate: Fri Aug 9 15:47:21 2024 -0700 CASSANDRA-19821: prevent double closing sstable writer (#72) Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19821 --- CHANGES.txt | 1 + .../cassandra/spark/bulkwriter/RecordWriter.java | 12 +- .../spark/bulkwriter/SortedSSTableWriter.java | 22 +++- .../cassandra/spark/bulkwriter/StreamSession.java | 49 ++----- .../bulkwriter/blobupload/BlobStreamSession.java | 9 -- .../spark/bulkwriter/SortedSSTableWriterTest.java | 2 +- .../correctness/BulkWriteDiskCorruptionTest.java | 146 +++++++++++++++++++++ 7 files changed, 181 insertions(+), 60 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6c0e3a0..9a9e5ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Prevent double closing sstable writer (CASSANDRA-19821) * Stream sstable eagerly when bulk writing to reclaim local disk space sooner (CASSANDRA-19806) * Split the Cassandra type logic out from CassandraBridge into a separate module (CASSANDRA-19793) * Remove other uses of Apache Commons lang for hashcode, equality and random string generation (CASSANDRA-19791) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index 5368003..3169cff 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -53,12 +53,12 @@ import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.bulkwriter.util.TaskContextUtils; -import org.apache.cassandra.util.ThreadUtil; import org.apache.cassandra.spark.data.BridgeUdtValue; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.utils.DigestAlgorithm; +import org.apache.cassandra.util.ThreadUtil; import org.apache.spark.TaskContext; import org.jetbrains.annotations.NotNull; import scala.Tuple2; @@ -184,6 +184,10 @@ public class RecordWriter Range<BigInteger> currentRange = subRanges.get(currentRangeIndex); while (dataIterator.hasNext()) { + if (streamSession != null) + { + streamSession.throwIfLastStreamFailed(); + } Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next(); BigInteger token = rowData._1().getToken(); // Advance to the next range that contains the token. @@ -219,12 +223,6 @@ public class RecordWriter taskContext.stageAttemptNumber(), taskContext.attemptNumber()); - // if streamSession is not closed/nullified. Clean it up here - if (streamSession != null) - { - streamSession.cleanupOnFailure(); - } - if (exception instanceof InterruptedException) { Thread.currentThread().interrupt(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java index bd0e35a..e3b534c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java @@ -50,6 +50,7 @@ import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.StreamScanner; import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * SSTableWriter that expects sorted data @@ -77,6 +78,8 @@ public class SortedSSTableWriter private final Map<Path, Digest> overallFileDigests = new HashMap<>(); private final DigestAlgorithm digestAlgorithm; + private volatile boolean isClosed = false; + private int sstableCount = 0; private long rowCount = 0; private long bytesWritten = 0; @@ -99,7 +102,7 @@ public class SortedSSTableWriter String lowestCassandraVersion = writerContext.cluster().getLowestCassandraVersion(); String packageVersion = getPackageVersion(lowestCassandraVersion); - LOGGER.info("Running with version " + packageVersion); + LOGGER.info("Running with version {}", packageVersion); SchemaInfo schema = writerContext.schema(); TableSchema tableSchema = schema.getTableSchema(); @@ -192,12 +195,18 @@ public class SortedSSTableWriter } bytesWritten += calculatedTotalSize(fileDigests.keySet()); overallFileDigests.putAll(fileDigests); - validateSSTables(writerContext, dataFilePaths); + validateSSTables(writerContext, getOutDir(), dataFilePaths); return fileDigests; } public void close(BulkWriterContext writerContext) throws IOException { + if (isClosed) + { + LOGGER.info("Already closed"); + return; + } + isClosed = true; cqlSSTableWriter.close(); for (Path dataFile : getDataFileStream()) { @@ -214,17 +223,19 @@ public class SortedSSTableWriter @VisibleForTesting public void validateSSTables(@NotNull BulkWriterContext writerContext) { - validateSSTables(writerContext, null); + validateSSTables(writerContext, getOutDir(), null); } /** * Validate SSTables. If dataFilePaths is null, it finds all sstables under the output directory of the writer and validates them + * + * @param outputDirectory output directory of the sstable writer * @param writerContext bulk writer context * @param dataFilePaths paths of sstables (data file) to be validated. The argument is nullable. * When it is null, it validates all sstables under the output directory. */ @VisibleForTesting - public void validateSSTables(@NotNull BulkWriterContext writerContext, Set<Path> dataFilePaths) + public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull Path outputDirectory, @Nullable Set<Path> dataFilePaths) { // NOTE: If this current implementation of SS-tables' validation proves to be a performance issue, // we will need to modify LocalDataLayer to allow scanning and compaction of single data file, @@ -236,7 +247,6 @@ public class SortedSSTableWriter String schema = writerContext.schema().getTableSchema().createStatement; Partitioner partitioner = writerContext.cluster().getPartitioner(); Set<String> udtStatements = writerContext.schema().getUserDefinedTypeStatements(); - String directory = getOutDir().toString(); LocalDataLayer layer = new LocalDataLayer(version, partitioner, keyspace, @@ -245,7 +255,7 @@ public class SortedSSTableWriter Collections.emptyList() /* requestedFeatures */, false /* useSSTableInputStream */, null /* statsClass */, - directory); + outputDirectory.toString()); if (dataFilePaths != null) { layer.setDataFilePaths(dataFilePaths); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java index 839b46d..d104534 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java @@ -36,7 +36,6 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Range; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,11 +109,20 @@ public abstract class StreamSession<T extends TransportContext> return tokenRange; } - public void addRow(BigInteger token, Map<String, Object> boundValues) throws IOException + /** + * Throw exception when the last streaming task has failed + * @throws IOException + */ + public void throwIfLastStreamFailed() throws IOException { - // exit early when sending the produced sstables has failed - rethrowIfLastStreamFailed(); + if (lastStreamFailure.get() != null) + { + throw new IOException("Unexpected exception while streaming SSTables", lastStreamFailure.get()); + } + } + public void addRow(BigInteger token, Map<String, Object> boundValues) throws IOException + { sstableWriter.addRow(token, boundValues); } @@ -126,7 +134,6 @@ public abstract class StreamSession<T extends TransportContext> public Future<StreamResult> finalizeStreamAsync() throws IOException { isStreamFinalized = true; - rethrowIfLastStreamFailed(); Preconditions.checkState(!sstableWriter.getTokenRange().isEmpty(), "Cannot stream empty SSTable"); Preconditions.checkState(tokenRange.encloses(sstableWriter.getTokenRange()), "SSTable range %s should be enclosed in the partition range %s", @@ -136,30 +143,6 @@ public abstract class StreamSession<T extends TransportContext> return executorService.submit(this::doFinalizeStream); } - /** - * Clean up any remaining files on disk when streaming is failed - */ - public void cleanupOnFailure() - { - try - { - sstableWriter.close(writerContext); - } - catch (IOException e) - { - LOGGER.warn("[{}]: Failed to close sstable writer on streaming failure", sessionID, e); - } - - try - { - FileUtils.deleteDirectory(sstableWriter.getOutDir().toFile()); - } - catch (IOException e) - { - LOGGER.warn("[{}]: Failed to clean up the produced sstables on streaming failure", sessionID, e); - } - } - protected boolean isStreamFinalized() { return isStreamFinalized; @@ -180,14 +163,6 @@ public abstract class StreamSession<T extends TransportContext> return streamedFiles.contains(file); } - private void rethrowIfLastStreamFailed() throws IOException - { - if (lastStreamFailure.get() != null) - { - throw new IOException("Unexpected exception while streaming SSTables", lastStreamFailure.get()); - } - } - @VisibleForTesting List<RingInstance> getReplicas() { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java index 8ec1bcf..dd568df 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSession.java @@ -211,15 +211,6 @@ public class BlobStreamSession extends StreamSession<TransportContext.CloudStora } } - @Override - public void cleanupOnFailure() - { - super.cleanupOnFailure(); - - // remove any remaining bundle - sstablesBundler.cleanupBundle(sessionID); - } - void sendBundle(Bundle bundle, boolean hasRefreshedCredentials) { StorageCredentials writeCredentials = getStorageCredentialsFromSidecar(); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java index f7d4584..e703394 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java @@ -102,6 +102,6 @@ public class SortedSSTableWriterTest } // no exception should be thrown from both the validate methods tw.validateSSTables(writerContext); - tw.validateSSTables(writerContext, dataFilePaths); + tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths); } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/correctness/BulkWriteDiskCorruptionTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/correctness/BulkWriteDiskCorruptionTest.java new file mode 100644 index 0000000..35e6b6a --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/correctness/BulkWriteDiskCorruptionTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics.correctness; + +import java.io.RandomAccessFile; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.junit.jupiter.api.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import net.jpountz.lz4.LZ4Exception; +import org.apache.cassandra.analytics.DataGenerationUtils; +import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.BulkWriterContext; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class BulkWriteDiskCorruptionTest extends SharedClusterSparkIntegrationTestBase +{ + private static final QualifiedName QUALIFIED_NAME = new QualifiedName(TEST_KEYSPACE, "test_write_disk_corruption"); + + static + { + // Intercepts SortedSSTableWriter#validateSSTables to corrupt file on purpose. + // Install the class rebase the earliest, before JVM loads the class + BBHelperInterceptSortedSSTableWriterValidateSSTables.install(); + } + + @Test + void testDiskCorruption() + { + Map<String, String> writerOptions = new HashMap<>(); + + SparkSession spark = getOrCreateSparkSession(); + + // Generate some data + Dataset<Row> dfWrite = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + + // Write the data using Bulk Writer + try + { + bulkWriterDataFrameWriter(dfWrite, QUALIFIED_NAME, writerOptions).save(); + fail("Bulk write should fail"); + } + catch (Exception ex) + { + assertThat(ex) + .isExactlyInstanceOf(RuntimeException.class) + .hasMessageContaining("Bulk Write to Cassandra has failed") + .rootCause() + .isExactlyInstanceOf(LZ4Exception.class) + .hasMessageContaining("Malformed input"); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(QUALIFIED_NAME, CREATE_TEST_TABLE_STATEMENT); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(1); + } + + public static class BBHelperInterceptSortedSSTableWriterValidateSSTables + { + public static void install() + { + TypePool typePool = TypePool.Default.ofSystemLoader(); + new ByteBuddy() + .rebase(typePool.describe("org.apache.cassandra.spark.bulkwriter.SortedSSTableWriter").resolve(), + ClassFileLocator.ForClassLoader.ofSystemLoader()) + .method(named("validateSSTables").and(takesArguments(BulkWriterContext.class, Path.class, Set.class))) + .intercept(MethodDelegation.to(BBHelperInterceptSortedSSTableWriterValidateSSTables.class)) + .make() + .load(BBHelperInterceptSortedSSTableWriterValidateSSTables.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static void validateSSTables(BulkWriterContext context, + Path outputDirectory, + Set<Path> dataFilePaths, + @SuperCall Callable<?> orig) throws Exception + { + try (DirectoryStream<Path> stream = Files.newDirectoryStream(outputDirectory, "*Data.db")) + { + Path dataFile = stream.iterator().next(); + try (RandomAccessFile file = new RandomAccessFile(dataFile.toFile(), "rw")) + { + file.seek(file.length() / 2); + file.writeChars("THIS IS CORRUPT DATA AND SHOULD NOT BE READABLE"); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + orig.call(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org