This is an automated email from the ASF dual-hosted git repository.

lukasz-antoniak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c877942c CASSANALYTICS-167: Regenerate bloom filters for 
CQLSSTableWriter Patch by Lukasz Antoniak; reviewed by Jon Haddad, Yifan Cai 
for CASSANALYTICS-167
c877942c is described below

commit c877942c96eaae3da37ea1af557dfd9719095357
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Wed May 27 09:41:33 2026 +0200

    CASSANALYTICS-167: Regenerate bloom filters for CQLSSTableWriter
    Patch by Lukasz Antoniak; reviewed by Jon Haddad, Yifan Cai for 
CASSANALYTICS-167
---
 CHANGES.txt                                        |   1 +
 .../spark/bulkwriter/SortedSSTableWriter.java      |  87 +++++++++-----
 .../cassandra/spark/data/LocalDataLayer.java       |   3 +-
 .../NonValidatingTestSortedSSTableWriter.java      |   8 ++
 .../spark/bulkwriter/SortedSSTableWriterTest.java  | 131 +++++++++++++++++++++
 .../spark/bulkwriter/TableSchemaTestCommon.java    |   3 +-
 .../apache/cassandra/bridge/CassandraBridge.java   |  15 +++
 .../bridge/CassandraBridgeImplementation.java      |  62 ++++++++++
 .../bridge/CassandraBridgeImplementation.java      |  61 ++++++++++
 .../org/apache/cassandra/utils/FilterDbUtils.java  |  57 +++++++++
 10 files changed, 398 insertions(+), 30 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ec3409ea..58d4cf96 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.5.0
 -----
+ * Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
  * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
  * Spark 4.0 Support (CASSANALYTICS-34)
  * Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index 3d09be47..877aafa1 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -43,12 +43,14 @@ import org.apache.cassandra.bridge.CassandraVersionFeatures;
 import org.apache.cassandra.bridge.SSTableDescriptor;
 import org.apache.cassandra.spark.common.Digest;
 import org.apache.cassandra.spark.common.SSTables;
+import org.apache.cassandra.spark.data.FileSystemSSTable;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.LocalDataLayer;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
 import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -227,6 +229,9 @@ public class SortedSSTableWriter
         };
         Set<Path> dataFilePaths = new HashSet<>();
         Map<Path, Digest> fileDigests = new HashMap<>();
+        // FIXME: CQLSSTableWriter may produce incomplete Filter.db file, 
rebuilding it manually (see CASSANDRA-21423).
+        // rebuild Filter.db files before calculating their digest
+        rebuildFilterComponents(writerContext, sstableFilter);
         try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(getOutDir(), sstableFilter))
         {
             for (Path path : stream)
@@ -281,15 +286,21 @@ public class SortedSSTableWriter
         // Only process new SSTables produced during final flush
         DirectoryStream.Filter<Path> unhashedFilter = path -> 
!hashedFiles.contains(path);
 
-        for (Path dataFile : getDataFileStream(unhashedFilter))
+        // FIXME: CQLSSTableWriter may produce incomplete Filter.db file, 
rebuilding it manually (see CASSANDRA-21423).
+        rebuildFilterComponents(writerContext, unhashedFilter);
+
+        try (DirectoryStream<Path> dataFileStream = 
getDataFileStream(unhashedFilter))
         {
-            // NOTE: We calculate file hashes before re-reading so that we 
know what we hashed
-            //       is what we validated. Then we send these along with the 
files and the
-            //       receiving end re-hashes the files to make sure they still 
match.
-            Map<Path, Digest> newFileDigests = 
calculateFileDigestMap(dataFile);
-            overallFileDigests.putAll(newFileDigests);
-            newlyHashedFiles.addAll(newFileDigests.keySet());
-            sstableCount += 1;
+            for (Path dataFile : dataFileStream)
+            {
+                // NOTE: We calculate file hashes before re-reading so that we 
know what we hashed
+                //       is what we validated. Then we send these along with 
the files and the
+                //       receiving end re-hashes the files to make sure they 
still match.
+                Map<Path, Digest> newFileDigests = 
calculateFileDigestMap(dataFile);
+                overallFileDigests.putAll(newFileDigests);
+                newlyHashedFiles.addAll(newFileDigests.keySet());
+                sstableCount += 1;
+            }
         }
         // Only calculate size for newly hashed files, not all files in 
overallFileDigests
         // (previously hashed files may have been deleted by 
prepareSStablesToSend)
@@ -297,6 +308,21 @@ public class SortedSSTableWriter
         validateSSTables(writerContext);
     }
 
+    protected void rebuildFilterComponents(@NotNull BulkWriterContext 
writerContext,
+                                           @NotNull 
DirectoryStream.Filter<Path> filter) throws IOException
+    {
+        LocalDataLayer layer = buildLocalDataLayer(writerContext, getOutDir(), 
null);
+        try (DirectoryStream<Path> dataFileStream = getDataFileStream(filter))
+        {
+            for (Path dataFile : dataFileStream)
+            {
+                FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, 
false, BufferingInputStreamStats::doNothingStats);
+                writerContext.bridge().rebuildBloomFilter(layer.partitioner(), 
layer.cqlTable(), ssTable, getOutDir());
+                LOGGER.debug("Rebuilt bloom filter for sstable {}", dataFile);
+            }
+        }
+    }
+
     @VisibleForTesting
     public void validateSSTables(@NotNull BulkWriterContext writerContext)
     {
@@ -319,26 +345,7 @@ public class SortedSSTableWriter
         //       and then validate all of them in parallel threads
         try
         {
-            CassandraVersion version = 
CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
-            String keyspace = 
writerContext.job().qualifiedTableName().keyspace();
-            String schema = 
writerContext.schema().getTableSchema().createStatement;
-            Partitioner partitioner = writerContext.cluster().getPartitioner();
-            Set<String> udtStatements = 
writerContext.schema().getUserDefinedTypeStatements();
-            LocalDataLayer layer = new LocalDataLayer(version,
-                                                      partitioner,
-                                                      keyspace,
-                                                      schema,
-                                                      udtStatements,
-                                                      Collections.emptyList() 
/* requestedFeatures */,
-                                                      false /* 
useSSTableInputStream */,
-                                                      null /* statsClass */,
-                                                      
SSTableTimeRangeFilter.ALL,
-                                                      
outputDirectory.toString());
-            if (dataFilePaths != null)
-            {
-                layer.setDataFilePaths(dataFilePaths);
-            }
-
+            LocalDataLayer layer = buildLocalDataLayer(writerContext, 
outputDirectory, dataFilePaths);
             try (StreamScanner<RowData> scanner = 
layer.openCompactionScanner(partitionId, Collections.emptyList(), null))
             {
                 while (scanner.next())
@@ -354,6 +361,30 @@ public class SortedSSTableWriter
         }
     }
 
+    private LocalDataLayer buildLocalDataLayer(@NotNull BulkWriterContext 
writerContext, @NotNull Path outputDirectory, @Nullable Set<Path> dataFilePaths)
+    {
+        CassandraVersion version = 
CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
+        String keyspace = writerContext.job().qualifiedTableName().keyspace();
+        String schema = 
writerContext.schema().getTableSchema().createStatement;
+        Partitioner partitioner = writerContext.cluster().getPartitioner();
+        Set<String> udtStatements = 
writerContext.schema().getUserDefinedTypeStatements();
+        LocalDataLayer layer = new LocalDataLayer(version,
+                                                  partitioner,
+                                                  keyspace,
+                                                  schema,
+                                                  udtStatements,
+                                                  Collections.emptyList() /* 
requestedFeatures */,
+                                                  false /* 
useSSTableInputStream */,
+                                                  null /* statsClass */,
+                                                  SSTableTimeRangeFilter.ALL,
+                                                  outputDirectory.toString());
+        if (dataFilePaths != null)
+        {
+            layer.setDataFilePaths(dataFilePaths);
+        }
+        return layer;
+    }
+
     private DirectoryStream<Path> 
getDataFileStream(DirectoryStream.Filter<Path> filter) throws IOException
     {
         // Combine the data file filter with the provided filter
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index 64b57e1c..0a30cde0 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -69,7 +69,8 @@ import static 
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupport
 import static 
org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
 
 /**
- * Basic DataLayer implementation to read SSTables from local file system. 
Mostly used for testing.
+ * Basic DataLayer implementation to read SSTables from local file system.
+ * Mostly used for testing, but also for validating SSTables during bulk data 
insertion.
  */
 @SuppressWarnings({"unused", "WeakerAccess"})
 public class LocalDataLayer extends DataLayer implements Serializable
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
index 0498b7e8..39a4daf6 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
+import java.nio.file.DirectoryStream;
 import java.nio.file.Path;
 
 import org.apache.cassandra.spark.utils.DigestAlgorithm;
@@ -36,4 +37,11 @@ public class NonValidatingTestSortedSSTableWriter extends 
SortedSSTableWriter
     {
         // Skip validation for these tests
     }
+
+    @Override
+    protected void rebuildFilterComponents(@NotNull BulkWriterContext 
writerContext,
+                                           @NotNull 
DirectoryStream.Filter<Path> filter)
+    {
+        // Skip rebuild for these tests
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
index 271c760d..2bd5b563 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java
@@ -19,17 +19,23 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
+import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -37,6 +43,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.AfterAll;
@@ -45,12 +52,20 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import org.apache.cassandra.bridge.BloomFilter;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.bridge.CassandraVersionFeatures;
 import org.apache.cassandra.bridge.SSTableDescriptor;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.Digest;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.FileSystemSSTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
 import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
 
@@ -148,6 +163,122 @@ public class SortedSSTableWriterTest
         tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
     }
 
+    @ParameterizedTest
+    @MethodSource("supportedVersions")
+    public void testBloomFilterRebuild(String version) throws IOException
+    {
+        int rowCount = 50_000;
+        CassandraBridge bridge = CassandraBridgeFactory.get(version);
+        MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
+        Partitioner partitioner = writerContext.getPartitioner();
+        CqlTable cqlTable = 
bridge.buildSchema(writerContext.schema().getTableSchema().createStatement,
+                                               
writerContext.qualifiedTableName().keyspace(),
+                                               new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
+                                                                     
ImmutableMap.of("replication_factor", 1)),
+                                               partitioner,
+                                               Collections.emptySet());
+        SortedMap<BigInteger, List<String>> sortedKeys = new TreeMap<>();
+        for (int i = 0; i < rowCount; ++i)
+        {
+            List<String> keys = ImmutableList.of(String.valueOf(i), "1");
+            AbstractMap.SimpleEntry<ByteBuffer, BigInteger> partitionKey = 
bridge.getPartitionKey(cqlTable, partitioner, keys);
+            sortedKeys.put(partitionKey.getValue(), keys);
+        }
+
+        SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, 
tmpDir, new XXHash32DigestAlgorithm(), 1);
+        List<SSTableDescriptor> allSSTables = new ArrayList<>();
+        tw.setSSTablesProducedListener(allSSTables::addAll);
+        for (BigInteger token : sortedKeys.keySet())
+        {
+            List<String> partitionKey = sortedKeys.get(token);
+            tw.addRow(token,
+                      ImmutableMap.of("id", 
Integer.parseInt(partitionKey.get(0)),
+                                      "date", 
Integer.parseInt(partitionKey.get(1)),
+                                      "course", "foo", "marks", 1));
+        }
+        tw.close(writerContext);
+
+        assertThat(allSSTables).hasSize(1);
+
+        Set<Path> filterFilePaths = new HashSet<>();
+        try (DirectoryStream<Path> filterFileStream = 
Files.newDirectoryStream(tw.getOutDir(), "*-Filter.db"))
+        {
+            filterFileStream.forEach(filterFilePaths::add);
+        }
+
+        assertThat(filterFilePaths).hasSize(1);
+
+        Path filterFile = filterFilePaths.iterator().next();
+        String dataFileName = filterFile.toFile().getName().replace("-Filter", 
"-Data");
+        Path dataFilePath = filterFile.getParent().resolve(dataFileName);
+        FileSystemSSTable ssTable = new FileSystemSSTable(dataFilePath, false, 
BufferingInputStreamStats::doNothingStats);
+
+        BloomFilter bloomFilter = bridge.openBloomFilter(partitioner,
+                                                         
writerContext.qualifiedTableName().keyspace(),
+                                                         
writerContext.qualifiedTableName().table(),
+                                                         ssTable);
+
+        // second column is always set to 1 when inserting data
+        List<ByteBuffer> searchKeys = bridge.encodePartitionKeys(partitioner,
+                                                                 
writerContext.qualifiedTableName().keyspace(),
+                                                                 
writerContext.schema().getTableSchema().createStatement,
+                                                                 
ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2")));
+
+        assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue();
+        // Flaky assertion: bloom filters can answer false positive, but since 
we are using limited data set,
+        // it is unlikely to happen.
+        assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue();
+    }
+
+    @ParameterizedTest
+    @MethodSource("supportedVersions")
+    public void testBloomFilterRebuildErrorHandling(String version) throws 
IOException
+    {
+        MockBulkWriterContext writerContext = new 
MockBulkWriterContext(tokenRangeMapping, version, 
ConsistencyLevel.CL.LOCAL_QUORUM);
+        SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, 
tmpDir, new XXHash32DigestAlgorithm(), 1)
+        {
+            protected void rebuildFilterComponents(@NotNull BulkWriterContext 
writerContext,
+                                                   @NotNull 
DirectoryStream.Filter<Path> filter) throws IOException
+            {
+                // temporarily move index file to simulate error in bloom 
filter rebuild process
+                try (DirectoryStream<Path> indexFileStream = 
Files.newDirectoryStream(getOutDir(), "*.db"))
+                {
+                    indexFileStream.forEach(indexFilePath -> {
+                        for (String indexSuffix : 
Arrays.asList("Partitions.db", "Index.db"))
+                        {
+                            if 
(indexFilePath.toFile().getName().endsWith(indexSuffix))
+                            {
+                                File indexFile = indexFilePath.toFile();
+                                boolean moved = indexFile.renameTo(new 
File(indexFile.getAbsolutePath() + "_hidden"));
+                                assertThat(moved).isTrue();
+                            }
+                        }
+                    });
+                }
+                super.rebuildFilterComponents(writerContext, filter);
+                // move the index files back
+                try (DirectoryStream<Path> hiddenFileStream = 
Files.newDirectoryStream(getOutDir(), "*_hidden"))
+                {
+                    hiddenFileStream.forEach(hiddenFilePath -> {
+                        File hiddenFile = hiddenFilePath.toFile();
+                        boolean moved = hiddenFile.renameTo(new 
File(hiddenFile.getParent(), hiddenFile.getName().replace("_hidden", "")));
+                        assertThat(moved).isTrue();
+                    });
+                }
+            }
+        };
+        List<SSTableDescriptor> allSSTables = new ArrayList<>();
+        tw.setSSTablesProducedListener(allSSTables::addAll);
+        tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, 
"course", "foo", "marks", 1));
+        tw.close(writerContext);
+        assertThat(allSSTables).hasSize(1);
+        // verify that bloom filter was not created
+        try (DirectoryStream<Path> filterFileStream = 
Files.newDirectoryStream(tw.getOutDir(), "*Filter.db"))
+        {
+            assertThat(filterFileStream.iterator().hasNext()).isFalse();
+        }
+    }
+
     /**
      * Tests the race condition fix between prepareSStablesToSend (called from 
background threads)
      * and close (called from the main thread). This test exercises 
CASSANALYTICS-107.
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index 9025d882..701ca337 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.cassandra.bridge.CassandraBridge;
@@ -467,7 +468,7 @@ public final class TableSchemaTestCommon
             String clusteringKey = primaryColumns.stream()
                                                  
.map(this::maybeQuoteIdentifierIfRequested)
                                                  
.collect(Collectors.joining(","));
-            return "PRIMARY KEY (" + partitionKey + clusteringKey + ")";
+            return "PRIMARY KEY (" + partitionKey + 
(StringUtils.isNotBlank(clusteringKey) ? ("," + clusteringKey) : "") + ")";
         }
 
         @Override
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 6bbf7f14..67c5bc7a 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -556,6 +556,21 @@ public abstract class CassandraBridge
                                                 @NotNull String table,
                                                 @NotNull SSTable ssTable) 
throws IOException;
 
+    /**
+     * Recreates bloom filter for a given SSTable based on index file.
+     * Old {@code Filter.db} file is overwritten if present.
+     *
+     * @param partitioner Cassandra partitioner
+     * @param cqltable    CQL table
+     * @param ssTable     SSTable instance
+     * @param directory   Directory where SSTable files are present
+     * @throws IOException
+     */
+    public abstract void rebuildBloomFilter(@NotNull Partitioner partitioner,
+                                            @NotNull CqlTable cqltable,
+                                            @NotNull SSTable ssTable,
+                                            @NotNull Path directory) throws 
IOException;
+
     /**
      * @param partitioner   Cassandra partitioner
      * @param keyspace      keyspace name
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 88d5c6df..5dd64855 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -71,12 +71,14 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.bti.BtiReaderUtils;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -115,6 +117,8 @@ import org.apache.cassandra.util.CompressionUtil;
 import org.apache.cassandra.util.IntWrapper;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.CompressionUtilImplementation;
+import org.apache.cassandra.utils.FilterDbUtils;
+import org.apache.cassandra.utils.IFilter;
 import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.TokenUtils;
 import org.jetbrains.annotations.NotNull;
@@ -348,6 +352,64 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
         };
     }
 
+    @Override
+    public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+                                   @NotNull CqlTable cqltable,
+                                   @NotNull SSTable ssTable,
+                                   @NotNull Path directory) throws IOException
+    {
+        String keyspace = cqltable.keyspace();
+        String table = cqltable.table();
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner);
+        TableMetadata tableMetadata = schemaBuilder.tableMetaData();
+
+        if (tableMetadata.params.bloomFilterFpChance == 1.0)
+        {
+            return; // bloom filter has been disabled for the table
+        }
+
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        File filterFile = new File(directory, 
descriptor.fileFor(SSTableFormat.Components.FILTER).name());
+        try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable, 
ssTable, tableMetadata))
+        {
+            Function<ByteBuffer, Boolean> tracker = bytes -> {
+                DecoratedKey key = iPartitioner.decorateKey(bytes);
+                filter.add(key);
+                return false;
+            };
+            if (ssTable.isBtiFormat())
+            {
+                BtiReaderUtils.readPrimaryIndex(ssTable, iPartitioner, 
descriptor,
+                                                
tableMetadata.params.crcCheckChance, tracker);
+            }
+            else
+            {
+                try (InputStream primaryIndex = 
ssTable.openPrimaryIndexStream())
+                {
+                    if (primaryIndex == null)
+                    {
+                        throw new IOException("Could not read Index.db file");
+                    }
+                    ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+                }
+            }
+
+            try (FileOutputStreamPlus stream = 
filterFile.newOutputStream(File.WriteMode.OVERWRITE))
+            {
+                filter.serialize(stream, descriptor.version.hasOldBfFormat());
+                stream.flush();
+                stream.sync();
+            }
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to rebuild bloom filter for sstable {}/{}", 
directory, ssTable.getDataFileName(), e);
+            // Remove potentially corrupted bloom filter. It will be rebuilt 
by Cassandra during sstable import.
+            Files.deleteIfExists(filterFile.toPath());
+        }
+    }
+
     private BloomFilter openBloomFilter(Descriptor descriptor, SSTable 
ssTable) throws IOException
     {
         return ReaderUtils.readFilter(ssTable, descriptor);
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 943a3b76..5a8f9ed8 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.bridge;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -44,6 +45,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -68,6 +70,7 @@ import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
@@ -75,6 +78,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -111,7 +116,11 @@ import org.apache.cassandra.tools.Util;
 import org.apache.cassandra.util.CompressionUtil;
 import org.apache.cassandra.util.IntWrapper;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.BloomFilterSerializer;
 import org.apache.cassandra.utils.CompressionUtilImplementation;
+import org.apache.cassandra.utils.FilterDbUtils;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.TokenUtils;
 import org.apache.cassandra.utils.UUIDGen;
 import org.jetbrains.annotations.NotNull;
@@ -324,6 +333,58 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
         return keys.stream().map(key -> buildPartitionKey(table, 
key)).collect(Collectors.toList());
     }
 
+    @Override
+    public void rebuildBloomFilter(@NotNull Partitioner partitioner,
+                                   @NotNull CqlTable cqltable,
+                                   @NotNull SSTable ssTable,
+                                   @NotNull Path directory) throws IOException
+    {
+        String keyspace = cqltable.keyspace();
+        String table = cqltable.table();
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner);
+        TableMetadata tableMetadata = schemaBuilder.tableMetaData();
+
+        if (tableMetadata.params.bloomFilterFpChance == 1.0)
+        {
+            return; // bloom filter has been disabled for the table
+        }
+
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        File filterFile = new File(directory.toFile(), 
descriptor.relativeFilenameFor(Component.FILTER));
+        try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable, 
ssTable, tableMetadata))
+        {
+            Function<ByteBuffer, Boolean> tracker = bytes -> {
+                DecoratedKey key = iPartitioner.decorateKey(bytes);
+                filter.add(key);
+                return false;
+            };
+
+            try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+            {
+                if (primaryIndex == null)
+                {
+                    throw new IOException("Could not read Index.db file");
+                }
+                ReaderUtils.readPrimaryIndex(primaryIndex, tracker);
+            }
+
+            try (FileOutputStream fos = new FileOutputStream(filterFile, 
false);
+                 DataOutputStreamPlus stream = new 
BufferedDataOutputStreamPlus(fos))
+            {
+                BloomFilterSerializer.serialize((BloomFilter) filter, stream);
+                stream.flush();
+                SyncUtil.sync(fos);
+            }
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to rebuild bloom filter for sstable {}/{}", 
directory, ssTable.getDataFileName(), e);
+            // Remove potentially corrupted bloom filter. It will be rebuilt 
by Cassandra during sstable import.
+            Files.deleteIfExists(filterFile.toPath());
+        }
+    }
+
     @Override
     public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner 
partitioner,
                                                                    String 
keyspace,
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java
new file mode 100644
index 00000000..d8fc46fa
--- /dev/null
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.reader.ReaderUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Helper methods for managing bloom filters.
+ */
+public class FilterDbUtils
+{
+    private FilterDbUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static IFilter buildBloomFilter(@NotNull CqlTable cqltable,
+                                           @NotNull SSTable ssTable,
+                                           @NotNull TableMetadata 
tableMetadata) throws IOException
+    {
+        String keyspace = cqltable.keyspace();
+        String table = cqltable.table();
+
+        Map<MetadataType, MetadataComponent> componentMap = 
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, 
EnumSet.of(MetadataType.STATS));
+        StatsMetadata statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+
+        return FilterFactory.getFilter(statsMetadata.totalRows, 
tableMetadata.params.bloomFilterFpChance);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to