This is an automated email from the ASF dual-hosted git repository.
lukasz-antoniak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new c877942c CASSANALYTICS-167: Regenerate bloom filters for
CQLSSTableWriter Patch by Lukasz Antoniak; reviewed by Jon Haddad, Yifan Cai
for CASSANALYTICS-167
c877942c is described below
commit c877942c96eaae3da37ea1af557dfd9719095357
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Wed May 27 09:41:33 2026 +0200
CASSANALYTICS-167: Regenerate bloom filters for CQLSSTableWriter
Patch by Lukasz Antoniak; reviewed by Jon Haddad, Yifan Cai for
CASSANALYTICS-167
---
CHANGES.txt | 1 +
.../spark/bulkwriter/SortedSSTableWriter.java | 87 +++++++++-----
.../cassandra/spark/data/LocalDataLayer.java | 3 +-
.../NonValidatingTestSortedSSTableWriter.java | 8 ++
.../spark/bulkwriter/SortedSSTableWriterTest.java | 131 +++++++++++++++++++++
.../spark/bulkwriter/TableSchemaTestCommon.java | 3 +-
.../apache/cassandra/bridge/CassandraBridge.java | 15 +++
.../bridge/CassandraBridgeImplementation.java | 62 ++++++++++
.../bridge/CassandraBridgeImplementation.java | 61 ++++++++++
.../org/apache/cassandra/utils/FilterDbUtils.java | 57 +++++++++
10 files changed, 398 insertions(+), 30 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ec3409ea..58d4cf96 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.5.0
-----
+ * Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
* Spark 4.0 Support (CASSANALYTICS-34)
* Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index 3d09be47..877aafa1 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -43,12 +43,14 @@ import org.apache.cassandra.bridge.CassandraVersionFeatures;
import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.common.Digest;
import org.apache.cassandra.spark.common.SSTables;
+import org.apache.cassandra.spark.data.FileSystemSSTable;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.LocalDataLayer;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.utils.DigestAlgorithm;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -227,6 +229,9 @@ public class SortedSSTableWriter
};
Set<Path> dataFilePaths = new HashSet<>();
Map<Path, Digest> fileDigests = new HashMap<>();
+ // FIXME: CQLSSTableWriter may produce incomplete Filter.db file,
rebuilding it manually (see CASSANDRA-21423).
+ // rebuild Filter.db files before calculating their digest
+ rebuildFilterComponents(writerContext, sstableFilter);
try (DirectoryStream<Path> stream =
Files.newDirectoryStream(getOutDir(), sstableFilter))
{
for (Path path : stream)
@@ -281,15 +286,21 @@ public class SortedSSTableWriter
// Only process new SSTables produced during final flush
DirectoryStream.Filter<Path> unhashedFilter = path ->
!hashedFiles.contains(path);
- for (Path dataFile : getDataFileStream(unhashedFilter))
+ // FIXME: CQLSSTableWriter may produce incomplete Filter.db file,
rebuilding it manually (see CASSANDRA-21423).
+ rebuildFilterComponents(writerContext, unhashedFilter);
+
+ try (DirectoryStream<Path> dataFileStream =
getDataFileStream(unhashedFilter))
{
- // NOTE: We calculate file hashes before re-reading so that we
know what we hashed
- // is what we validated. Then we send these along with the
files and the
- // receiving end re-hashes the files to make sure they still
match.
- Map<Path, Digest> newFileDigests =
calculateFileDigestMap(dataFile);
- overallFileDigests.putAll(newFileDigests);
- newlyHashedFiles.addAll(newFileDigests.keySet());
- sstableCount += 1;
+ for (Path dataFile : dataFileStream)
+ {
+ // NOTE: We calculate file hashes before re-reading so that we
know what we hashed
+ // is what we validated. Then we send these along with
the files and the
+ // receiving end re-hashes the files to make sure they
still match.
+ Map<Path, Digest> newFileDigests =
calculateFileDigestMap(dataFile);
+ overallFileDigests.putAll(newFileDigests);
+ newlyHashedFiles.addAll(newFileDigests.keySet());
+ sstableCount += 1;
+ }
}
// Only calculate size for newly hashed files, not all files in
overallFileDigests
// (previously hashed files may have been deleted by
prepareSStablesToSend)
@@ -297,6 +308,21 @@ public class SortedSSTableWriter
validateSSTables(writerContext);
}
+ protected void rebuildFilterComponents(@NotNull BulkWriterContext
writerContext,
+ @NotNull
DirectoryStream.Filter<Path> filter) throws IOException
+ {
+ LocalDataLayer layer = buildLocalDataLayer(writerContext, getOutDir(),
null);
+ try (DirectoryStream<Path> dataFileStream = getDataFileStream(filter))
+ {
+ for (Path dataFile : dataFileStream)
+ {
+ FileSystemSSTable ssTable = new FileSystemSSTable(dataFile,
false, BufferingInputStreamStats::doNothingStats);
+ writerContext.bridge().rebuildBloomFilter(layer.partitioner(),
layer.cqlTable(), ssTable, getOutDir());
+ LOGGER.debug("Rebuilt bloom filter for sstable {}", dataFile);
+ }
+ }
+ }
+
@VisibleForTesting
public void validateSSTables(@NotNull BulkWriterContext writerContext)
{
@@ -319,26 +345,7 @@ public class SortedSSTableWriter
// and then validate all of them in parallel threads
try
{
- CassandraVersion version =
CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
- String keyspace =
writerContext.job().qualifiedTableName().keyspace();
- String schema =
writerContext.schema().getTableSchema().createStatement;
- Partitioner partitioner = writerContext.cluster().getPartitioner();
- Set<String> udtStatements =
writerContext.schema().getUserDefinedTypeStatements();
- LocalDataLayer layer = new LocalDataLayer(version,
- partitioner,
- keyspace,
- schema,
- udtStatements,
- Collections.emptyList()
/* requestedFeatures */,
- false /*
useSSTableInputStream */,
- null /* statsClass */,
-
SSTableTimeRangeFilter.ALL,
-
outputDirectory.toString());
- if (dataFilePaths != null)
- {
- layer.setDataFilePaths(dataFilePaths);
- }
-
+ LocalDataLayer layer = buildLocalDataLayer(writerContext,
outputDirectory, dataFilePaths);
try (StreamScanner<RowData> scanner =
layer.openCompactionScanner(partitionId, Collections.emptyList(), null))
{
while (scanner.next())
@@ -354,6 +361,30 @@ public class SortedSSTableWriter
}
}
+ private LocalDataLayer buildLocalDataLayer(@NotNull BulkWriterContext
writerContext, @NotNull Path outputDirectory, @Nullable Set<Path> dataFilePaths)
+ {
+ CassandraVersion version =
CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
+ String keyspace = writerContext.job().qualifiedTableName().keyspace();
+ String schema =
writerContext.schema().getTableSchema().createStatement;
+ Partitioner partitioner = writerContext.cluster().getPartitioner();
+ Set<String> udtStatements =
writerContext.schema().getUserDefinedTypeStatements();
+ LocalDataLayer layer = new LocalDataLayer(version,
+ partitioner,
+ keyspace,
+ schema,
+ udtStatements,
+ Collections.emptyList() /*
requestedFeatures */,
+ false /*
useSSTableInputStream */,
+ null /* statsClass */,
+ SSTableTimeRangeFilter.ALL,
+ outputDirectory.toString());
+ if (dataFilePaths != null)
+ {
+ layer.setDataFilePaths(dataFilePaths);
+ }
+ return layer;
+ }
+
private DirectoryStream<Path>
getDataFileStream(DirectoryStream.Filter<Path> filter) throws IOException
{
// Combine the data file filter with the provided filter
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index 64b57e1c..0a30cde0 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -69,7 +69,8 @@ import static
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupport
import static
org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
/**
- * Basic DataLayer implementation to read SSTables from local file system.
Mostly used for testing.
+ * Basic DataLayer implementation to read SSTables from local file system.
+ * Mostly used for testing, but also for validating SSTables during bulk data
insertion.
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class LocalDataLayer extends DataLayer implements Serializable
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
index 0498b7e8..39a4daf6 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.spark.bulkwriter;
+import java.nio.file.DirectoryStream;
import java.nio.file.Path;
import org.apache.cassandra.spark.utils.DigestAlgorithm;
@@ -36,4 +37,11 @@ public class NonValidatingTestSortedSSTableWriter extends
SortedSSTableWriter
{
// Skip validation for these tests
}
+
+ @Override
+ protected void rebuildFilterComponents(@NotNull BulkWriterContext
writerContext,
+ @NotNull
DirectoryStream.Filter<Path> filter)
+ {
+ // Skip rebuild for these tests
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
index 271c760d..2bd5b563 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
@@ -19,17 +19,23 @@
package org.apache.cassandra.spark.bulkwriter;
+import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -37,6 +43,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterAll;
@@ -45,12 +52,20 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import org.apache.cassandra.bridge.BloomFilter;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.bridge.CassandraVersionFeatures;
import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.Digest;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.FileSystemSSTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
import org.jetbrains.annotations.NotNull;
@@ -148,6 +163,122 @@ public class SortedSSTableWriterTest
tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
}
+ @ParameterizedTest
+ @MethodSource("supportedVersions")
+ public void testBloomFilterRebuild(String version) throws IOException
+ {
+ int rowCount = 50_000;
+ CassandraBridge bridge = CassandraBridgeFactory.get(version);
+ MockBulkWriterContext writerContext = new
MockBulkWriterContext(tokenRangeMapping, version,
ConsistencyLevel.CL.LOCAL_QUORUM);
+ Partitioner partitioner = writerContext.getPartitioner();
+ CqlTable cqlTable =
bridge.buildSchema(writerContext.schema().getTableSchema().createStatement,
+
writerContext.qualifiedTableName().keyspace(),
+ new
ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
+
ImmutableMap.of("replication_factor", 1)),
+ partitioner,
+ Collections.emptySet());
+ SortedMap<BigInteger, List<String>> sortedKeys = new TreeMap<>();
+ for (int i = 0; i < rowCount; ++i)
+ {
+ List<String> keys = ImmutableList.of(String.valueOf(i), "1");
+ AbstractMap.SimpleEntry<ByteBuffer, BigInteger> partitionKey =
bridge.getPartitionKey(cqlTable, partitioner, keys);
+ sortedKeys.put(partitionKey.getValue(), keys);
+ }
+
+ SortedSSTableWriter tw = new SortedSSTableWriter(writerContext,
tmpDir, new XXHash32DigestAlgorithm(), 1);
+ List<SSTableDescriptor> allSSTables = new ArrayList<>();
+ tw.setSSTablesProducedListener(allSSTables::addAll);
+ for (BigInteger token : sortedKeys.keySet())
+ {
+ List<String> partitionKey = sortedKeys.get(token);
+ tw.addRow(token,
+ ImmutableMap.of("id",
Integer.parseInt(partitionKey.get(0)),
+ "date",
Integer.parseInt(partitionKey.get(1)),
+ "course", "foo", "marks", 1));
+ }
+ tw.close(writerContext);
+
+ assertThat(allSSTables).hasSize(1);
+
+ Set<Path> filterFilePaths = new HashSet<>();
+ try (DirectoryStream<Path> filterFileStream =
Files.newDirectoryStream(tw.getOutDir(), "*-Filter.db"))
+ {
+ filterFileStream.forEach(filterFilePaths::add);
+ }
+
+ assertThat(filterFilePaths).hasSize(1);
+
+ Path filterFile = filterFilePaths.iterator().next();
+ String dataFileName = filterFile.toFile().getName().replace("-Filter",
"-Data");
+ Path dataFilePath = filterFile.getParent().resolve(dataFileName);
+ FileSystemSSTable ssTable = new FileSystemSSTable(dataFilePath, false,
BufferingInputStreamStats::doNothingStats);
+
+ BloomFilter bloomFilter = bridge.openBloomFilter(partitioner,
+
writerContext.qualifiedTableName().keyspace(),
+
writerContext.qualifiedTableName().table(),
+ ssTable);
+
+ // second column is always set to 1 when inserting data
+ List<ByteBuffer> searchKeys = bridge.encodePartitionKeys(partitioner,
+
writerContext.qualifiedTableName().keyspace(),
+
writerContext.schema().getTableSchema().createStatement,
+
ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2")));
+
+ assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue();
+ // Flaky assertion: bloom filters can answer false positive, but since
we are using limited data set,
+ // it is unlikely to happen.
+ assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue();
+ }
+
+ @ParameterizedTest
+ @MethodSource("supportedVersions")
+ public void testBloomFilterRebuildErrorHandling(String version) throws
IOException
+ {
+ MockBulkWriterContext writerContext = new
MockBulkWriterContext(tokenRangeMapping, version,
ConsistencyLevel.CL.LOCAL_QUORUM);
+ SortedSSTableWriter tw = new SortedSSTableWriter(writerContext,
tmpDir, new XXHash32DigestAlgorithm(), 1)
+ {
+ protected void rebuildFilterComponents(@NotNull BulkWriterContext
writerContext,
+ @NotNull
DirectoryStream.Filter<Path> filter) throws IOException
+ {
+ // temporarily move index file to simulate error in bloom
filter rebuild process
+ try (DirectoryStream<Path> indexFileStream =
Files.newDirectoryStream(getOutDir(), "*.db"))
+ {
+ indexFileStream.forEach(indexFilePath -> {
+ for (String indexSuffix :
Arrays.asList("Partitions.db", "Index.db"))
+ {
+ if
(indexFilePath.toFile().getName().endsWith(indexSuffix))
+ {
+ File indexFile = indexFilePath.toFile();
+ boolean moved = indexFile.renameTo(new
File(indexFile.getAbsolutePath() + "_hidden"));
+ assertThat(moved).isTrue();
+ }
+ }
+ });
+ }
+ super.rebuildFilterComponents(writerContext, filter);
+ // move the index files back
+ try (DirectoryStream<Path> hiddenFileStream =
Files.newDirectoryStream(getOutDir(), "*_hidden"))
+ {
+ hiddenFileStream.forEach(hiddenFilePath -> {
+ File hiddenFile = hiddenFilePath.toFile();
+ boolean moved = hiddenFile.renameTo(new
File(hiddenFile.getParent(), hiddenFile.getName().replace("_hidden", "")));
+ assertThat(moved).isTrue();
+ });
+ }
+ }
+ };
+ List<SSTableDescriptor> allSSTables = new ArrayList<>();
+ tw.setSSTablesProducedListener(allSSTables::addAll);
+ tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1,
"course", "foo", "marks", 1));
+ tw.close(writerContext);
+ assertThat(allSSTables).hasSize(1);
+ // verify that bloom filter was not created
+ try (DirectoryStream<Path> filterFileStream =
Files.newDirectoryStream(tw.getOutDir(), "*Filter.db"))
+ {
+ assertThat(filterFileStream.iterator().hasNext()).isFalse();
+ }
+ }
+
/**
* Tests the race condition fix between prepareSStablesToSend (called from
background threads)
* and close (called from the main thread). This test exercises
CASSANALYTICS-107.
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index 9025d882..701ca337 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.cassandra.bridge.CassandraBridge;
@@ -467,7 +468,7 @@ public final class TableSchemaTestCommon
String clusteringKey = primaryColumns.stream()
.map(this::maybeQuoteIdentifierIfRequested)
.collect(Collectors.joining(","));
- return "PRIMARY KEY (" + partitionKey + clusteringKey + ")";
+ return "PRIMARY KEY (" + partitionKey +
(StringUtils.isNotBlank(clusteringKey) ? ("," + clusteringKey) : "") + ")";
}
@Override
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 6bbf7f14..67c5bc7a 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -556,6 +556,21 @@ public abstract class CassandraBridge
@NotNull String table,
@NotNull SSTable ssTable)
throws IOException;
+ /**
+ * Recreates bloom filter for a given SSTable based on index file.
+ * Old {@code Filter.db} file is overwritten if present.
+ *
+ * @param partitioner Cassandra partitioner
+ * @param cqltable CQL table
+ * @param ssTable SSTable instance
+ * @param directory Directory where SSTable files are present
+ * @throws IOException
+ */
+ public abstract void rebuildBloomFilter(@NotNull Partitioner partitioner,
+ @NotNull CqlTable cqltable,
+ @NotNull SSTable ssTable,
+ @NotNull Path directory) throws
IOException;
+
/**
* @param partitioner Cassandra partitioner
* @param keyspace keyspace name
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 88d5c6df..5dd64855 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -71,12 +71,14 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.bti.BtiReaderUtils;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -115,6 +117,8 @@ import org.apache.cassandra.util.CompressionUtil;
import org.apache.cassandra.util.IntWrapper;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.CompressionUtilImplementation;
+import org.apache.cassandra.utils.FilterDbUtils;
+import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.TokenUtils;
import org.jetbrains.annotations.NotNull;
@@ -348,6 +352,64 @@ public class CassandraBridgeImplementation extends
CassandraBridge
};
}
+ @Override
+ public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+ @NotNull CqlTable cqltable,
+ @NotNull SSTable ssTable,
+ @NotNull Path directory) throws IOException
+ {
+ String keyspace = cqltable.keyspace();
+ String table = cqltable.table();
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner);
+ TableMetadata tableMetadata = schemaBuilder.tableMetaData();
+
+ if (tableMetadata.params.bloomFilterFpChance == 1.0)
+ {
+ return; // bloom filter has been disabled for the table
+ }
+
+ Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace,
table, ssTable);
+ File filterFile = new File(directory,
descriptor.fileFor(SSTableFormat.Components.FILTER).name());
+ try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable,
ssTable, tableMetadata))
+ {
+ Function<ByteBuffer, Boolean> tracker = bytes -> {
+ DecoratedKey key = iPartitioner.decorateKey(bytes);
+ filter.add(key);
+ return false;
+ };
+ if (ssTable.isBtiFormat())
+ {
+ BtiReaderUtils.readPrimaryIndex(ssTable, iPartitioner,
descriptor,
+
tableMetadata.params.crcCheckChance, tracker);
+ }
+ else
+ {
+ try (InputStream primaryIndex =
ssTable.openPrimaryIndexStream())
+ {
+ if (primaryIndex == null)
+ {
+ throw new IOException("Could not read Index.db file");
+ }
+ ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+ }
+ }
+
+ try (FileOutputStreamPlus stream =
filterFile.newOutputStream(File.WriteMode.OVERWRITE))
+ {
+ filter.serialize(stream, descriptor.version.hasOldBfFormat());
+ stream.flush();
+ stream.sync();
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to rebuild bloom filter for sstable {}/{}",
directory, ssTable.getDataFileName(), e);
+ // Remove potentially corrupted bloom filter. It will be rebuilt
by Cassandra during sstable import.
+ Files.deleteIfExists(filterFile.toPath());
+ }
+ }
+
private BloomFilter openBloomFilter(Descriptor descriptor, SSTable
ssTable) throws IOException
{
return ReaderUtils.readFilter(ssTable, descriptor);
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 943a3b76..5a8f9ed8 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.bridge;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -44,6 +45,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -68,6 +70,7 @@ import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
@@ -75,6 +78,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -111,7 +116,11 @@ import org.apache.cassandra.tools.Util;
import org.apache.cassandra.util.CompressionUtil;
import org.apache.cassandra.util.IntWrapper;
import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.BloomFilterSerializer;
import org.apache.cassandra.utils.CompressionUtilImplementation;
+import org.apache.cassandra.utils.FilterDbUtils;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.SyncUtil;
import org.apache.cassandra.utils.TokenUtils;
import org.apache.cassandra.utils.UUIDGen;
import org.jetbrains.annotations.NotNull;
@@ -324,6 +333,58 @@ public class CassandraBridgeImplementation extends
CassandraBridge
return keys.stream().map(key -> buildPartitionKey(table,
key)).collect(Collectors.toList());
}
+ @Override
+ public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+ @NotNull CqlTable cqltable,
+ @NotNull SSTable ssTable,
+ @NotNull Path directory) throws IOException
+ {
+ String keyspace = cqltable.keyspace();
+ String table = cqltable.table();
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner);
+ TableMetadata tableMetadata = schemaBuilder.tableMetaData();
+
+ if (tableMetadata.params.bloomFilterFpChance == 1.0)
+ {
+ return; // bloom filter has been disabled for the table
+ }
+
+ Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace,
table, ssTable);
+ File filterFile = new File(directory.toFile(),
descriptor.relativeFilenameFor(Component.FILTER));
+ try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable,
ssTable, tableMetadata))
+ {
+ Function<ByteBuffer, Boolean> tracker = bytes -> {
+ DecoratedKey key = iPartitioner.decorateKey(bytes);
+ filter.add(key);
+ return false;
+ };
+
+ try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+ {
+ if (primaryIndex == null)
+ {
+ throw new IOException("Could not read Index.db file");
+ }
+ ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+ }
+
+ try (FileOutputStream fos = new FileOutputStream(filterFile,
false);
+ DataOutputStreamPlus stream = new
BufferedDataOutputStreamPlus(fos))
+ {
+ BloomFilterSerializer.serialize((BloomFilter) filter, stream);
+ stream.flush();
+ SyncUtil.sync(fos);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to rebuild bloom filter for sstable {}/{}",
directory, ssTable.getDataFileName(), e);
+ // Remove potentially corrupted bloom filter. It will be rebuilt
by Cassandra during sstable import.
+ Files.deleteIfExists(filterFile.toPath());
+ }
+ }
+
@Override
public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner
partitioner,
String
keyspace,
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java
new file mode 100644
index 00000000..d8fc46fa
--- /dev/null
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.reader.ReaderUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Helper methods for managing bloom filters.
+ */
+public class FilterDbUtils
+{
+ private FilterDbUtils()
+ {
+ throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
+ }
+
+ public static IFilter buildBloomFilter(@NotNull CqlTable cqltable,
+ @NotNull SSTable ssTable,
+ @NotNull TableMetadata
tableMetadata) throws IOException
+ {
+ String keyspace = cqltable.keyspace();
+ String table = cqltable.table();
+
+ Map<MetadataType, MetadataComponent> componentMap =
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable,
EnumSet.of(MetadataType.STATS));
+ StatsMetadata statsMetadata = (StatsMetadata)
componentMap.get(MetadataType.STATS);
+
+ return FilterFactory.getFilter(statsMetadata.totalRows,
tableMetadata.params.bloomFilterFpChance);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]