This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17a38d3  CASSANALYTICS-4: Add CassandraBridge helper APIs that can be 
used by external tooling (#98)
17a38d3 is described below

commit 17a38d33321003f1bbfd3ba73617014424df1416
Author: jberragan <[email protected]>
AuthorDate: Mon Feb 10 15:35:16 2025 -0800

    CASSANALYTICS-4: Add CassandraBridge helper APIs that can be used by 
external tooling (#98)
    
    Patch by James Berragan; Reviewed by Francisco Guerrero , Yifan Cai for 
CASSANALYTICS-4
---
 CHANGES.txt                                        |   1 +
 .../cassandra/spark/data/ReplicationFactor.java    |   8 +
 .../apache/cassandra/spark/data/TypeConverter.java |   2 +
 .../spark/sparksql/PartialRowBuilder.java          |   2 +
 .../cassandra/spark/sparksql/RowIterator.java      |  49 +++
 .../spark/sparksql/filters/PartitionKeyFilter.java |  13 +-
 .../spark/sparksql/filters/PruneColumnFilter.java  |   9 +
 .../cassandra/spark/utils/ByteBufferUtils.java     |  10 +
 .../apache/cassandra/spark/utils/RandomUtils.java  |  21 ++
 .../cassandra/spark/utils/test/TestSSTable.java    |  11 +-
 .../spark/sparksql/AbstractSparkRowIterator.java   |   3 +-
 .../java/org/apache/cassandra/spark/TestUtils.java |  12 +
 .../java/org/apache/cassandra/spark/Tester.java    |   2 +-
 .../spark/reader/CassandraBridgeUtilTests.java     | 331 +++++++++++++++++++++
 .../org/apache/cassandra/bridge/BloomFilter.java   |  35 +--
 .../apache/cassandra/bridge/CassandraBridge.java   | 207 +++++++++++++
 .../org/apache/cassandra/bridge/Tokenizer.java     |  33 +-
 .../bridge/CassandraBridgeImplementation.java      | 258 +++++++++++++++-
 .../apache/cassandra/spark/reader/IndexReader.java |   2 +-
 .../apache/cassandra/spark/reader/ReaderUtils.java | 169 ++++++++---
 .../cassandra/spark/reader/SSTableCache.java       |   3 +-
 .../cassandra/spark/reader/SSTableReader.java      |  46 +--
 .../cassandra/spark/reader/SummaryDbUtils.java     |  10 +-
 .../cassandra/spark/reader/ReaderUtilsTests.java   |  10 +-
 .../cassandra/spark/reader/SSTableCacheTests.java  |   2 +-
 .../cassandra/spark/reader/SSTableReaderTests.java |  24 +-
 26 files changed, 1103 insertions(+), 170 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b941a3c..2c98bc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Add CassandraBridge helper APIs that can be used by external tooling 
(CASSANALYTICS-4)
  * Refactor to decouple RowIterator and CellIterator from Spark so bulk reads 
can be performed outside of Spark (CASSANDRA-20259)
  * CEP-44 Kafka integration for Cassandra CDC using Sidecar (CASSANDRA-19962)
  * Expose detailed bulk write failure message for better insight 
(CASSANDRA-20066)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
index ce85f57..0b9b682 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import java.util.Objects;
+
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,6 +98,12 @@ public class ReplicationFactor implements Serializable
         }
     }
 
+    public static ReplicationFactor simpleStrategy(int rf)
+    {
+        return new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
+                                     ImmutableMap.of("replication_factor", 
rf));
+    }
+
     @NotNull
     private final ReplicationStrategy replicationStrategy;
     @NotNull
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
index b190479..4e885af 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
@@ -23,6 +23,8 @@ import org.jetbrains.annotations.NotNull;
 
 public interface TypeConverter
 {
+    TypeConverter IDENTITY = (cqlType, value, isFrozen) -> value;
+
     /**
      * Converts deserialized Cassandra Java value to desired equivalent type.
      * E.g. SparkSQL uses `org.apache.spark.unsafe.types.UTF8String` to wrap 
strings.
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
index 6df672c..f492f26 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.sparksql;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -51,6 +52,7 @@ public class PartialRowBuilder<T> extends FullRowBuilder<T>
                              Function<Object[], T> rowBuilder)
     {
         super(table, hasProjectedValueColumns, rowBuilder);
+        Objects.requireNonNull(requiredSchema, "requiredColumns must be 
non-null to use PartialRowBuilder");
         this.requiredSchema = requiredSchema;
         this.columnIndex = new HashMap<>(requiredSchema.length);
         for (int i = 0; i < requiredSchema.length; i++)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
index 6d3382c..7b5369d 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
@@ -20,9 +20,13 @@
 package org.apache.cassandra.spark.sparksql;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -58,6 +62,51 @@ public abstract class RowIterator<T>
         this.builder = newBuilder(decorator);
     }
 
+    /**
+     * @param it              CellIterator cell iterate instance to iterate 
over each cell in the data.
+     * @param stats           stats instance to emit KPIs
+     * @param requiredColumns optional column filter to select a subset of 
columns, if null then all columns are returned.
+     * @return a basic RowIterator that transforms Object[] to a Map keyed on 
column name
+     */
+    public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator 
it, Stats stats, @Nullable String[] requiredColumns)
+    {
+        CqlTable cqlTable = it.cqlTable();
+        boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
+        return new RowIterator<Map<String, Object>>(it, stats, 
requiredColumns, Function.identity())
+        {
+            @SuppressWarnings("DataFlowIssue") // requiredColumns null checked 
in PartialRowBuilder constructor
+            @Override
+            public PartialRowBuilder<Map<String, Object>> newPartialBuilder()
+            {
+                return new PartialRowBuilder<>(
+                requiredColumns, it.cqlTable(), hasProjectedValueColumns,
+                (valueArray) -> {
+                    Map<String, Object> row = new 
HashMap<>(requiredColumns.length);
+                    for (int i = 0; i < requiredColumns.length; i++)
+                    {
+                        row.put(requiredColumns[i], valueArray[i]);
+                    }
+                    return row;
+                });
+            }
+
+            @Override
+            public FullRowBuilder<Map<String, Object>> newFullRowBuilder()
+            {
+                return new FullRowBuilder<>(
+                cqlTable, hasProjectedValueColumns,
+                (valueArray) -> {
+                    Map<String, Object> row = new 
HashMap<>(cqlTable.numFields());
+                    for (CqlField field : cqlTable.fields())
+                    {
+                        row.put(field.name(), valueArray[field.position()]);
+                    }
+                    return row;
+                });
+            }
+        };
+    }
+
     @NotNull
     protected RowBuilder<T> newBuilder(Function<RowBuilder<T>, RowBuilder<T>> 
decorator)
     {
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
index 6d04850..59a4a76 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.bridge.TokenRange;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.jetbrains.annotations.NotNull;
 
-public final class PartitionKeyFilter implements Serializable
+public final class PartitionKeyFilter implements Serializable, 
Comparable<PartitionKeyFilter>
 {
     @NotNull
     private BigInteger token;
@@ -141,4 +141,15 @@ public final class PartitionKeyFilter implements 
Serializable
         return filterKey.equals(that.filterKey)
                && token.equals(that.token);
     }
+
+    @Override
+    public int compareTo(@NotNull PartitionKeyFilter other)
+    {
+        int compare = token.compareTo(other.token);
+        if (compare == 0)
+        {
+            return key().compareTo(other.key());
+        }
+        return compare;
+    }
 }
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
index c032659..e46e1a3 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
@@ -19,9 +19,12 @@
 
 package org.apache.cassandra.spark.sparksql.filters;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Prune column push-down filter to skip reading columns that are not needed
@@ -35,6 +38,12 @@ public class PruneColumnFilter
         this.requiredColumns = requiredColumns;
     }
 
+    @Nullable
+    public static PruneColumnFilter of(@Nullable String[] requiredColumns)
+    {
+        return requiredColumns == null ? null : new PruneColumnFilter(new 
HashSet<>(Arrays.asList(requiredColumns)));
+    }
+
     public Set<String> requiredColumns()
     {
         return requiredColumns;
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
index adb0eab..fd4ece2 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
@@ -91,6 +91,16 @@ public final class ByteBufferUtils
         return bytes;
     }
 
+    public static String readShortLengthCompositeTypeString(ByteBuffer buf)
+    {
+        int len = buf.getShort();
+        byte[] ar = new byte[len];
+        buf.get(ar);
+        String str = new String(ar, StandardCharsets.UTF_8);
+        buf.get(); // CompositeType pads with a null byte at the end
+        return str;
+    }
+
     /**
      * Decode ByteBuffer into String using provided CharsetDecoder.
      *
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
index c59e11e..5827b44 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
@@ -23,6 +23,7 @@ import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
@@ -119,6 +120,26 @@ public final class RandomUtils
         return randomAlphanumeric(RandomUtils.nextInt(minLengthInclusive, 
maxLengthExclusive));
     }
 
+    public static String randomAlphanumeric(Set<String> alreadyExist)
+    {
+        return randomAlphanumeric(alreadyExist, 32);
+    }
+
+    public static String randomAlphanumeric(Set<String> alreadyExist, int 
length)
+    {
+        String str = randomAlphanumeric(length);
+        while (alreadyExist.contains(str))
+        {
+            str = randomAlphanumeric(length);
+        }
+        return str;
+    }
+
+    public static String randomAlphanumeric()
+    {
+        return randomAlphanumeric(32);
+    }
+
     public static String randomAlphanumeric(int length)
     {
         StringBuilder sb = new StringBuilder(length);
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
index d7e42c4..6e9d6a4 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
@@ -98,7 +98,7 @@ public final class TestSSTable extends SSTable
     @Override
     protected InputStream openInputStream(FileType fileType)
     {
-        Path filePath = FileType.resolveComponentFile(fileType, dataFile);
+        Path filePath = fileComponentPath(fileType);
         try
         {
             return filePath != null ? new BufferedInputStream(new 
FileInputStream(filePath.toFile())) : null;
@@ -109,14 +109,19 @@ public final class TestSSTable extends SSTable
         }
     }
 
+    public Path fileComponentPath(FileType fileType)
+    {
+        return FileType.resolveComponentFile(fileType, dataFile);
+    }
+
     public long length(FileType fileType)
     {
-        return IOUtils.size(FileType.resolveComponentFile(fileType, dataFile));
+        return IOUtils.size(fileComponentPath(fileType));
     }
 
     public boolean isMissing(FileType fileType)
     {
-        return FileType.resolveComponentFile(fileType, dataFile) == null;
+        return fileComponentPath(fileType) == null;
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
index 85fd17b..632e73f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.spark.sparksql;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -87,10 +86,10 @@ abstract class AbstractSparkRowIterator<T> extends 
RowIterator<T>
      */
     public abstract T rowBuilder(Object[] valueArray);
 
+    @SuppressWarnings("DataFlowIssue") // Null check performed within 
PartialRowBuilder constructor
     @Override
     public PartialRowBuilder<T> newPartialBuilder()
     {
-        Objects.requireNonNull(requiredColumns, "requiredColumns must be 
non-null for PartialRowBuilder");
         return new PartialRowBuilder<>(requiredColumns, it.cqlTable(), 
it.hasProjectedValueColumns(), this::rowBuilder);
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
index f85be6c..865de42 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
@@ -388,4 +388,16 @@ public final class TestUtils extends CommonTestUtils
     {
         return Range.openClosed(start, end);
     }
+
+    public static void createDirectory(Path directory)
+    {
+        try
+        {
+            Files.createDirectory(directory);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
index 1fa6eb0..34767f2 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
@@ -59,7 +59,7 @@ import static 
org.quicktheories.generators.SourceDSL.arbitrary;
 
 public final class Tester
 {
-    static final int DEFAULT_NUM_ROWS = 200;
+    public static final int DEFAULT_NUM_ROWS = 200;
 
     @NotNull
     private final List<CassandraVersion> versions;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
new file mode 100644
index 0000000..0ae7825
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.cassandra.bridge.BloomFilter;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.SSTableSummary;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.spark.TestUtils;
+import org.apache.cassandra.spark.data.FileType;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.test.TestSSTable;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static org.apache.cassandra.spark.Tester.DEFAULT_NUM_ROWS;
+import static 
org.apache.cassandra.spark.utils.ByteBufferUtils.readShortLengthCompositeTypeString;
+import static org.apache.cassandra.spark.utils.RandomUtils.randomAlphanumeric;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+
+public class CassandraBridgeUtilTests
+{
+    @TempDir
+    private static Path tempPath;
+
+    @Test
+    public void testLastRepairTime()
+    {
+        runTest((partitioner, bridge, schema, testDir) -> {
+            writeSSTable(partitioner, bridge, schema, testDir,
+                         (writer) -> IntStream.range(0, DEFAULT_NUM_ROWS)
+                                              .forEach(i -> 
writer.write(randomAlphanumeric(), randomAlphanumeric(), randomAlphanumeric()))
+            );
+
+            try
+            {
+                assertEquals(0, bridge.lastRepairTime("ks", "tb", 
TestSSTable.firstIn(testDir)));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Test
+    public void testBuildPartitionKey()
+    {
+        runTest((partitioner, bridge, schema, testDir) -> {
+            ByteBuffer pk1 = bridge.encodePartitionKey(partitioner, "ks", 
schema.createStatement, Collections.singletonList("a"));
+            assertEquals("a", ByteBufferUtils.string(pk1));
+            String str1 = randomAlphanumeric();
+            ByteBuffer pk2 = bridge.encodePartitionKey(partitioner, "ks", 
schema.createStatement, Collections.singletonList(str1));
+            assertEquals(str1, ByteBufferUtils.string(pk2));
+            List<String> keys = Arrays.asList(randomAlphanumeric(), 
randomAlphanumeric(), randomAlphanumeric());
+
+            TestSchema threePkSchema = TestSchema.builder(bridge)
+                                                 .withPartitionKey("a", 
bridge.text())
+                                                 .withPartitionKey("b", 
bridge.text())
+                                                 .withPartitionKey("c", 
bridge.text())
+                                                 .withColumn("d", 
bridge.bigint())
+                                                 .withColumn("e", 
bridge.aInt())
+                                                 .build();
+            ByteBuffer pk3 = bridge.encodePartitionKey(partitioner, "ks", 
threePkSchema.createStatement, keys);
+            assertEquals(keys.get(0), readShortLengthCompositeTypeString(pk3));
+            assertEquals(keys.get(1), readShortLengthCompositeTypeString(pk3));
+            assertEquals(keys.get(2), readShortLengthCompositeTypeString(pk3));
+        });
+    }
+
+    @Test
+    public void testOverlaps()
+    {
+        runTest((partitioner, bridge, schema, testDir) -> {
+            // write SSTable
+            List<String> keys = IntStream.range(0, 3).mapToObj(i -> 
randomAlphanumeric()).collect(Collectors.toList());
+            List<ByteBuffer> buffers = bridge.encodePartitionKeys(
+            partitioner,
+            schema.keyspace,
+            schema.createStatement,
+            keys.stream()
+                .map(Collections::singletonList)
+                .collect(Collectors.toList())
+            );
+            List<BigInteger> sortedTokens = buffers
+                                            .stream()
+                                            .map(partitioner::hash)
+                                            .sorted()
+                                            .collect(Collectors.toList());
+            assertTrue(TestSSTable.allIn(testDir).isEmpty());
+            writeSSTable(partitioner, bridge, schema, testDir,
+                         (writer) -> keys.forEach(key -> writer.write(key, 
randomAlphanumeric(), randomAlphanumeric())));
+            assertEquals(1, TestSSTable.allIn(testDir).size());
+
+            TestSSTable ssTable = (TestSSTable) TestSSTable.firstIn(testDir);
+            List<TokenRange> testRanges = Arrays.asList(
+            TokenRange.openClosed(sortedTokens.get(0), sortedTokens.get(0)),
+            TokenRange.closed(sortedTokens.get(0), sortedTokens.get(1)),
+            TokenRange.closed(sortedTokens.get(0), sortedTokens.get(2)),
+            TokenRange.openClosed(sortedTokens.get(1), sortedTokens.get(1)),
+            TokenRange.closed(sortedTokens.get(1), sortedTokens.get(2)),
+            TokenRange.openClosed(sortedTokens.get(2), 
sortedTokens.get(2).add(BigInteger.ONE)) // out of range
+            );
+
+            Path summaryDbFile = ssTable.fileComponentPath(FileType.SUMMARY);
+            for (int j = 0; j < 2; j++) // loop around after deleting 
Summary.db file to verify we can check using Index.db file
+            {
+                SSTableSummary summary = bridge.getSSTableSummary(partitioner, 
ssTable, 128, 256);
+                assertEquals(sortedTokens.get(0), summary.firstToken);
+                assertEquals(sortedTokens.get(2), summary.lastToken);
+                TokenRange sstableRange = 
TokenRange.closed(summary.firstToken, summary.lastToken);
+                for (BigInteger token : sortedTokens)
+                {
+                    assertTrue(sstableRange.contains(token));
+                }
+                List<Boolean> result = bridge.overlaps(ssTable, partitioner, 
128, 256, testRanges);
+
+                assertEquals(testRanges.size(), result.size());
+                for (int i = 0; i < result.size(); i++)
+                {
+                    assertEquals(i < result.size() - 1, result.get(i));
+                }
+
+                // delete Summary.db file and check we can read the Index.db 
file too
+                Files.deleteIfExists(summaryDbFile);
+            }
+        });
+    }
+
+    @Test
+    public void testContains()
+    {
+        runTest((partitioner, bridge, schema, testDir) -> {
+            // write SSTable
+            Set<String> keys = IntStream.range(0, 25).mapToObj(i -> 
randomAlphanumeric()).collect(Collectors.toSet());
+            List<ByteBuffer> buffers = bridge.encodePartitionKeys(
+            partitioner,
+            schema.keyspace,
+            schema.createStatement,
+            keys.stream()
+                .map(Collections::singletonList)
+                .collect(Collectors.toList())
+            );
+            assertTrue(TestSSTable.allIn(testDir).isEmpty());
+            writeSSTable(partitioner, bridge, schema, testDir,
+                         (writer) -> keys.forEach(key -> writer.write(key, 
randomAlphanumeric(), randomAlphanumeric())));
+            assertEquals(1, TestSSTable.allIn(testDir).size());
+
+            TestSSTable ssTable = (TestSSTable) TestSSTable.firstIn(testDir);
+
+            // should return all positives for the keys contained in the 
SSTable
+            BloomFilter filter = bridge.openBloomFilter(partitioner, 
schema.keyspace, schema.table, ssTable);
+            List<Boolean> result = 
buffers.stream().map(filter::mightContain).collect(Collectors.toList());
+            assertEquals(result.size(), buffers.size());
+            assertTrue(result.stream().allMatch(boolValue -> boolValue));
+
+            // random keys should return some negatives for keys not contained 
in the SSTable
+            List<String> otherKeys = IntStream.range(0, 
DEFAULT_NUM_ROWS).mapToObj(i -> 
randomAlphanumeric(keys)).collect(Collectors.toList());
+            List<ByteBuffer> randomBuffers = bridge.encodePartitionKeys(
+            partitioner,
+            schema.keyspace,
+            schema.createStatement,
+            otherKeys.stream()
+                     .map(Collections::singletonList)
+                     .collect(Collectors.toList())
+            );
+            BloomFilter randomBloomFilter = 
bridge.openBloomFilter(partitioner, schema.keyspace, schema.table, ssTable);
+            List<Boolean> randomResult = 
randomBuffers.stream().map(randomBloomFilter::mightContain).collect(Collectors.toList());
+            assertEquals(randomResult.size(), otherKeys.size());
+            assertTrue(randomResult.stream().anyMatch(boolValue -> 
!boolValue));
+
+            // perform exact contains query to confirm expected keys exist and 
random keys don't
+            assertTrue(bridge.contains(partitioner, schema.keyspace, 
schema.table, ssTable, buffers).stream().allMatch(aBoolean -> aBoolean));
+            assertTrue(bridge.contains(partitioner, schema.keyspace, 
schema.table, ssTable, randomBuffers).stream().noneMatch(aBoolean -> aBoolean));
+            List<ByteBuffer> allKeys = new ArrayList<>();
+            allKeys.addAll(buffers);
+            allKeys.addAll(randomBuffers);
+            List<Boolean> exactResult = bridge.contains(partitioner, 
schema.keyspace, schema.table, ssTable, allKeys);
+            assertEquals(allKeys.size(), exactResult.size());
+            for (int i = 0; i < exactResult.size(); i++)
+            {
+                boolean contains = exactResult.get(i);
+                assertEquals(i < buffers.size(), contains);
+                if (filter.doesNotContain(allKeys.get(i)))
+                {
+                    // if bloom filter returns true for `doesNotContain` then 
contains should always be false
+                    assertFalse(contains);
+                }
+                if (contains)
+                {
+                    // bloom filter should always return true if SSTable 
contains key
+                    assertTrue(filter.mightContain(allKeys.get(i)));
+                }
+            }
+        });
+    }
+
+    @Test
+    public void testReadKeys()
+    {
+        runTest(
+        (partitioner, bridge, schema, testDir) -> {
+            Map<String, String> expected = IntStream.range(0, DEFAULT_NUM_ROWS)
+                                                    .mapToObj(i -> 
randomAlphanumeric(32))
+                                                    
.collect(Collectors.toMap(Function.identity(), i -> randomAlphanumeric(32)));
+            writeSSTable(partitioner, bridge, schema, testDir, (writer) -> 
expected.forEach((key, value) -> writer.write(key, value, value)));
+
+            List<SSTable> all = TestSSTable.allIn(testDir);
+            Set<SSTable> ssTables = new HashSet<>(all);
+            Map<String, Map<String, Object>> actual = new 
HashMap<>(DEFAULT_NUM_ROWS);
+            bridge.readStringPartitionKeys(partitioner,
+                                           schema.keyspace,
+                                           schema.createStatement,
+                                           ssTables,
+                                           null,
+                                           
expected.keySet().stream().map(Collections::singletonList).collect(Collectors.toList()),
+                                           null,
+                                           (row) -> 
actual.put(row.get("a").toString(), row)
+            );
+            assertEquals(expected.size(), actual.size());
+            for (Map.Entry<String, String> entry : expected.entrySet())
+            {
+                Map<String, Object> actualRow = actual.get(entry.getKey());
+                assertNotNull(actualRow);
+                assertEquals(actualRow.get("b"), entry.getValue());
+                assertEquals(actualRow.get("c"), entry.getValue());
+            }
+        }
+        );
+    }
+
+    /* test utils */
+
+    private void writeSSTable(Partitioner partitioner, CassandraBridge bridge, 
TestSchema schema, Path testDir, Consumer<CassandraBridge.Writer> writer)
+    {
+        bridge.writeSSTable(partitioner,
+                            schema.keyspace,
+                            schema.table,
+                            testDir,
+                            schema.createStatement,
+                            schema.insertStatement,
+                            writer);
+    }
+
+    private interface TestRunable
+    {
+        void run(Partitioner partitioner, CassandraBridge bridge, TestSchema 
schema, Path testDir) throws IOException;
+    }
+
+    private static void runTest(TestRunable test)
+    {
+        qt().forAll(TestUtils.partitioners(), TestUtils.bridges())
+            .checkAssert(
+            ((partitioner, bridge) -> {
+                TestSchema schema = TestSchema.builder(bridge)
+                                              .withPartitionKey("a", 
bridge.text())
+                                              .withColumn("b", bridge.text())
+                                              .withColumn("c", bridge.text())
+                                              .build();
+
+                String testId = UUID.randomUUID().toString();
+                Path testDir = tempPath.resolve(testId);
+                try
+                {
+                    TestUtils.createDirectory(testDir);
+                    test.run(partitioner, bridge, schema, testDir);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                finally
+                {
+                    try
+                    {
+                        FileUtils.deleteDirectory(testDir.toFile());
+                    }
+                    catch (IOException ignore)
+                    {
+                    }
+                }
+            }
+            )
+            );
+    }
+}
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
 b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BloomFilter.java
similarity index 56%
copy from 
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
copy to 
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BloomFilter.java
index c032659..7ab1332 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BloomFilter.java
@@ -17,36 +17,27 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.sparksql.filters;
+package org.apache.cassandra.bridge;
 
-import java.util.Set;
-
-import org.jetbrains.annotations.NotNull;
+import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 /**
- * Prune column push-down filter to skip reading columns that are not needed
+ * Version independent interface to front bloom filter.
  */
-public class PruneColumnFilter
+public interface BloomFilter extends Predicate<ByteBuffer>
 {
-    private final Set<String> requiredColumns;
-
-    public PruneColumnFilter(@NotNull Set<String> requiredColumns)
-    {
-        this.requiredColumns = requiredColumns;
-    }
-
-    public Set<String> requiredColumns()
-    {
-        return requiredColumns;
-    }
-
-    public int size()
+    /**
+     * @param partitionKey serialzied partition key.
+     * @return true if SSTable might contain a given partition key, might 
return false-positives but never false-negatives.
+     */
+    default boolean mightContain(ByteBuffer partitionKey)
     {
-        return requiredColumns.size();
+        return test(partitionKey);
     }
 
-    public boolean includeColumn(String columnName)
+    default boolean doesNotContain(ByteBuffer partitionKey)
     {
-        return requiredColumns.contains(columnName);
+        return !mightContain(partitionKey);
     }
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 62d1d81..0701ae2 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableMap;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
+import org.apache.cassandra.spark.data.BasicSupplier;
 import org.apache.cassandra.spark.data.CassandraTypes;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
@@ -401,6 +402,11 @@ public abstract class CassandraBridge
                                                      @NotNull String table,
                                                      @NotNull SSTable ssTable);
 
+    public abstract SSTableSummary getSSTableSummary(@NotNull Partitioner 
partitioner,
+                                                     @NotNull SSTable ssTable,
+                                                     int minIndexInterval,
+                                                     int maxIndexInterval);
+
     // Version-Specific Test Utility Methods
 
     @VisibleForTesting
@@ -443,6 +449,207 @@ public abstract class CassandraBridge
 
     public abstract CompressionUtil compressionUtil();
 
+    // additional SSTable utils methods
+
+    /**
+     * @param keyspace keyspace name
+     * @param table    table name
+     * @param ssTable  SSTable instance
+     * @return last repair time for a given SSTable by reading the 
Statistics.db file.
+     * @throws IOException
+     */
+    public abstract long lastRepairTime(@NotNull String keyspace,
+                                        @NotNull String table,
+                                        @NotNull SSTable ssTable) throws 
IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param partitioner      Cassandra partitioner
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @param ranges           a list of token ranges
+     * @return a list boolean value if corresponding token range in `ranges` 
list parameter overlaps with the SSTable.
+     * The SSTable may or may not contain data for the range.
+     */
+    public abstract List<Boolean> overlaps(@NotNull SSTable ssTable,
+                                           @NotNull Partitioner partitioner,
+                                           int minIndexInterval,
+                                           int maxIndexInterval,
+                                           @NotNull List<TokenRange> ranges) 
throws IOException;
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        Cassandra keyspace
+     * @param createTableStmt CQL table create statement
+     * @param partitionKeys   list of
+     * @return list of tokens corresponding to each input `partitionKeys`
+     */
+    public List<BigInteger> toTokens(@NotNull Partitioner partitioner,
+                                     @NotNull String keyspace,
+                                     @NotNull String createTableStmt,
+                                     @NotNull List<List<String>> partitionKeys)
+    {
+        return toTokens(partitioner, encodePartitionKeys(partitioner, 
keyspace, createTableStmt, partitionKeys));
+    }
+
+    /**
+     * @param partitioner   Cassandra partitioner
+     * @param partitionKeys list of encoded partition keys
+     * @return list of tokens corresponding to each input `partitionKeys`
+     */
+    public List<BigInteger> toTokens(@NotNull Partitioner partitioner,
+                                     @NotNull List<ByteBuffer> partitionKeys)
+    {
+        Tokenizer tokenizer = tokenizer(partitioner);
+        return partitionKeys
+               .stream()
+               .map(tokenizer::toToken)
+               .collect(Collectors.toList());
+    }
+
+    /**
+     * @param partitioner Cassandra partitioner
+     * @return a Tokenizer instance for the provided Partitioner that maps a 
partition key to the token.
+     */
+    public abstract Tokenizer tokenizer(@NotNull Partitioner partitioner);
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        keyspace name
+     * @param createTableStmt CQL create table statement
+     * @param partitionKey    partition key
+     * @return encoded ByteBuffer for the input `partitionKey`
+     */
+    public ByteBuffer encodePartitionKey(@NotNull Partitioner partitioner,
+                                         @NotNull String keyspace,
+                                         @NotNull String createTableStmt,
+                                         @NotNull List<String> partitionKey)
+    {
+        return encodePartitionKeys(partitioner, keyspace, createTableStmt, 
Collections.singletonList(partitionKey)).get(0);
+    }
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        keyspace name
+     * @param createTableStmt CQL create table statement
+     * @param partitionKeys   list of partition keys
+     * @return a list encoded ByteBuffers corresponding to the partition keys 
input in `partitionKeys`
+     */
+    public abstract List<ByteBuffer> encodePartitionKeys(@NotNull Partitioner 
partitioner,
+                                                         @NotNull String 
keyspace,
+                                                         @NotNull String 
createTableStmt,
+                                                         @NotNull 
List<List<String>> partitionKeys);
+
+    /**
+     * @param partitioner   Cassandra partitioner
+     * @param keyspace      keyspace name
+     * @param table         table name
+     * @param ssTable       SSTable instance
+     * @return version independent BloomFilter instance to answer if SSTable 
might contain a partition key
+     * (might return false-positives but never false-negatives)
+     * @throws IOException
+     */
+    public abstract BloomFilter openBloomFilter(@NotNull Partitioner 
partitioner,
+                                                @NotNull String keyspace,
+                                                @NotNull String table,
+                                                @NotNull SSTable ssTable) 
throws IOException;
+
+    /**
+     * @param partitioner   Cassandra partitioner
+     * @param keyspace      keyspace name
+     * @param table         table name
+     * @param ssTable       SSTable instance
+     * @param partitionKeys list of partition keys
+     * @return list of booleans returning true if an SSTable contains a 
partition key, corresponding to the partition keys input in `partitionKeys`.
+     * @throws IOException
+     */
+    public abstract List<Boolean> contains(@NotNull Partitioner partitioner,
+                                           @NotNull String keyspace,
+                                           @NotNull String table,
+                                           @NotNull SSTable ssTable,
+                                           @NotNull List<ByteBuffer> 
partitionKeys) throws IOException;
+
+    /**
+     * Convenience method around `readPartitionKeys` to accept partition keys 
as string values and encode with the correct types.
+     *
+     * @param partitioner Cassandra partitioner
+     * @param keyspace    keyspace name
+     * @param createStmt  create table CQL statement
+     * @param ssTables    set of SSTables to read
+     * @param rowConsumer Consumer interface to consume rows as they are read 
to avoid buffering all rows in memory for consumption.
+     * @throws IOException
+     */
+    public void readStringPartitionKeys(@NotNull Partitioner partitioner,
+                                        @NotNull String keyspace,
+                                        @NotNull String createStmt,
+                                        @NotNull Set<SSTable> ssTables,
+                                        @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
+    {
+        readStringPartitionKeys(partitioner, keyspace, createStmt, ssTables, 
null, null, null, rowConsumer);
+    }
+
+    /**
+     * Convenience method around `readPartitionKeys` to accept partition keys 
as string values and encode with the correct types.
+     *
+     * @param partitioner       Cassandra partitioner
+     * @param keyspace          keyspace name
+     * @param createStmt        create table CQL statement
+     * @param ssTables          set of SSTables to read
+     * @param tokenRange        optional token range to limit the bulk read to 
a restricted token range.
+     * @param partitionKeys     list of partition keys, if more than one 
partition keys they must be correctly ordered in the inner list.
+     * @param pruneColumnFilter optional filter to select a subset of columns, 
this can offer performance improvement if skipping over large blobs or columns.
+     * @param rowConsumer       Consumer interface to consume rows as they are 
read to avoid buffering all rows in memory for consumption.
+     * @throws IOException
+     */
+    public void readStringPartitionKeys(@NotNull Partitioner partitioner,
+                                        @NotNull String keyspace,
+                                        @NotNull String createStmt,
+                                        @NotNull Set<SSTable> ssTables,
+                                        @Nullable TokenRange tokenRange,
+                                        @Nullable List<List<String>> 
partitionKeys,
+                                        @Nullable String[] pruneColumnFilter,
+                                        @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
+    {
+        readPartitionKeys(partitioner,
+                          keyspace,
+                          createStmt,
+                          new BasicSupplier(ssTables),
+                          tokenRange,
+                          partitionKeys == null ? null : 
encodePartitionKeys(partitioner, keyspace, createStmt, partitionKeys),
+                          pruneColumnFilter,
+                          rowConsumer);
+    }
+
+    public void readPartitionKeys(@NotNull Partitioner partitioner,
+                                  @NotNull String keyspace,
+                                  @NotNull String createStmt,
+                                  @NotNull Set<SSTable> ssTables,
+                                  @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
+    {
+        readPartitionKeys(partitioner, keyspace, createStmt, ssTables, null, 
null, null, rowConsumer);
+    }
+
+    public void readPartitionKeys(@NotNull Partitioner partitioner,
+                                  @NotNull String keyspace,
+                                  @NotNull String createStmt,
+                                  @NotNull Set<SSTable> ssTables,
+                                  @Nullable TokenRange tokenRange,
+                                  @Nullable List<ByteBuffer> partitionKeys,
+                                  @Nullable String[] pruneColumnFilter,
+                                  @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
+    {
+        readPartitionKeys(partitioner, keyspace, createStmt, new 
BasicSupplier(ssTables), tokenRange, partitionKeys, pruneColumnFilter, 
rowConsumer);
+    }
+
+    public abstract void readPartitionKeys(@NotNull Partitioner partitioner,
+                                           @NotNull String keyspace,
+                                           @NotNull String createStmt,
+                                           @NotNull SSTablesSupplier ssTables,
+                                           @Nullable TokenRange tokenRange,
+                                           @Nullable List<ByteBuffer> 
partitionKeys,
+                                           @Nullable String[] 
pruneColumnFilter,
+                                           @NotNull Consumer<Map<String, 
Object>> rowConsumer) throws IOException;
+
     // Kryo/Java (De-)Serialization
 
     public abstract void kryoRegister(Kryo kryo);
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
 b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/Tokenizer.java
similarity index 54%
copy from 
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
copy to 
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/Tokenizer.java
index c032659..90c2ddd 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
+++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/Tokenizer.java
@@ -17,36 +17,15 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.sparksql.filters;
+package org.apache.cassandra.bridge;
 
-import java.util.Set;
-
-import org.jetbrains.annotations.NotNull;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /**
- * Prune column push-down filter to skip reading columns that are not needed
+ * Interface that converts partition key to a BigInteger token
  */
-public class PruneColumnFilter
+public interface Tokenizer
 {
-    private final Set<String> requiredColumns;
-
-    public PruneColumnFilter(@NotNull Set<String> requiredColumns)
-    {
-        this.requiredColumns = requiredColumns;
-    }
-
-    public Set<String> requiredColumns()
-    {
-        return requiredColumns;
-    }
-
-    public int size()
-    {
-        return requiredColumns.size();
-    }
-
-    public boolean includeColumn(String columnName)
-    {
-        return requiredColumns.contains(columnName);
-    }
+    BigInteger toToken(ByteBuffer partitionKey);
 }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index e6d6db9..18d6694 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.bridge;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
@@ -32,21 +33,29 @@ import java.nio.file.Path;
 import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
+import org.apache.cassandra.analytics.reader.common.IndexIterator;
+import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
@@ -62,6 +71,9 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -72,9 +84,10 @@ import org.apache.cassandra.spark.data.CqlType;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.SSTablesSupplier;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.data.TypeConverter;
 import org.apache.cassandra.spark.data.complex.CqlTuple;
 import org.apache.cassandra.spark.data.complex.CqlUdt;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.reader.CompactionStreamScanner;
 import org.apache.cassandra.spark.reader.IndexEntry;
 import org.apache.cassandra.spark.reader.IndexReader;
@@ -83,18 +96,20 @@ import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.SchemaBuilder;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.reader.SummaryDbUtils;
-import org.apache.cassandra.analytics.reader.common.IndexIterator;
+import org.apache.cassandra.spark.sparksql.CellIterator;
+import org.apache.cassandra.spark.sparksql.RowIterator;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
 import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.tools.JsonTransformer;
 import org.apache.cassandra.tools.Util;
 import org.apache.cassandra.util.CompressionUtil;
+import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.CompressionUtilImplementation;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.TokenUtils;
 import org.apache.cassandra.utils.UUIDGen;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -102,13 +117,9 @@ import org.jetbrains.annotations.Nullable;
 @SuppressWarnings("unused")
 public class CassandraBridgeImplementation extends CassandraBridge
 {
-    private final Map<Class<?>, Serializer<?>> kryoSerializers;
-
-    public static void main(String[] args)
-    {
-        
System.out.println(UUIDGen.unixTimestamp(UUID.fromString("ac3f2f40-8637-11ef-a12b-2d1e10948b4b")));
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraBridgeImplementation.class);
 
-    }
+    private final Map<Class<?>, Serializer<?>> kryoSerializers;
 
     static
     {
@@ -264,6 +275,206 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
         return CompressionUtilImplementation.INSTANCE;
     }
 
+    @Override
+    public long lastRepairTime(String keyspace, String table, SSTable ssTable) 
throws IOException
+    {
+        Map<MetadataType, MetadataComponent> componentMap = 
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, 
EnumSet.of(MetadataType.STATS));
+        StatsMetadata statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+        if (statsMetadata == null)
+        {
+            throw new IllegalStateException("Could not read StatsMetadata");
+        }
+        return statsMetadata.repairedAt;
+    }
+
+    @Override
+    public List<Boolean> overlaps(SSTable ssTable,
+                                  Partitioner partitioner,
+                                  int minIndexInterval,
+                                  int maxIndexInterval,
+                                  List<TokenRange> ranges)
+    {
+        SSTableSummary summary = getSSTableSummary(partitioner, ssTable, 
minIndexInterval, maxIndexInterval);
+        TokenRange sstableRange = TokenRange.closed(summary.firstToken, 
summary.lastToken);
+        return ranges.stream()
+                     .map(range -> range.isConnected(sstableRange))
+                     .collect(Collectors.toList());
+    }
+
+    @Override
+    public Tokenizer tokenizer(Partitioner partitioner)
+    {
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        return partitionKey -> {
+            DecoratedKey decoratedKey = iPartitioner.decorateKey(partitionKey);
+            return TokenUtils.tokenToBigInteger(decoratedKey.getToken());
+        };
+    }
+
+    @Override
+    public List<ByteBuffer> encodePartitionKeys(Partitioner partitioner, 
String keyspace, String createTableStmt, List<List<String>> keys)
+    {
+        CqlTable table = new SchemaBuilder(createTableStmt, keyspace, 
ReplicationFactor.simpleStrategy(1), partitioner).build();
+        return keys.stream().map(key -> buildPartitionKey(table, 
key)).collect(Collectors.toList());
+    }
+
+    @Override
+    public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner 
partitioner,
+                                                                   String 
keyspace,
+                                                                   String 
table,
+                                                                   SSTable 
ssTable) throws IOException
+    {
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        // closing `SharedCloseableImpl` instances is known to cause SIGSEGV 
errors
+        BloomFilter filter = openBloomFilter(descriptor, ssTable);
+        return partitionKey -> {
+            DecoratedKey decoratedKey = iPartitioner.decorateKey(partitionKey);
+            return filter.isPresent(decoratedKey);
+        };
+    }
+
+    private BloomFilter openBloomFilter(Descriptor descriptor, SSTable 
ssTable) throws IOException
+    {
+        return ReaderUtils.readFilter(ssTable, descriptor);
+    }
+
+    @Override
+    public List<Boolean> contains(Partitioner partitioner, String keyspace, 
String table, SSTable ssTable, List<ByteBuffer> partitionKeys) throws 
IOException
+    {
+        if (partitionKeys.isEmpty())
+        {
+            return Collections.emptyList();
+        }
+
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        List<DecoratedKey> decoratedKeys = 
partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList());
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        BloomFilter filter = openBloomFilter(descriptor, ssTable);
+        List<Boolean> result = 
decoratedKeys.stream().map(filter::isPresent).collect(Collectors.toList());
+        if (result.stream().noneMatch(found -> found))
+        {
+            // no matches in the bloom filter, so we can exit early
+            return result;
+        }
+
+        // sorted by token with index into original partitionKeys list
+        List<Pair<BigInteger, Integer>> sortedByTokens = IntStream.range(0, 
decoratedKeys.size())
+                                                                  
.mapToObj(idx -> {
+                                                                      
DecoratedKey key = decoratedKeys.get(idx);
+                                                                      
BigInteger token = TokenUtils.tokenToBigInteger(key.getToken());
+                                                                      return 
Pair.of(token, idx);
+                                                                  })
+                                                                  
.sorted(Comparator.comparing(Pair::getLeft))
+                                                                  
.collect(Collectors.toList());
+        try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+        {
+            if (primaryIndex == null)
+            {
+                throw new IOException("Could not read Index.db file");
+            }
+
+            final int[] position = new int[]{0};
+            ReaderUtils.readPrimaryIndex(primaryIndex, (buffer) -> {
+                DecoratedKey key = iPartitioner.decorateKey(buffer);
+                BigInteger token = 
TokenUtils.tokenToBigInteger(key.getToken());
+
+                Pair<BigInteger, Integer> current = 
sortedByTokens.get(position[0]);
+                int compare = token.compareTo(current.getLeft());
+                while (compare > 0)
+                {
+                    // we passed without finding the key
+                    result.set(current.getRight(), false);
+                    position[0]++;
+                    if (position[0] >= decoratedKeys.size())
+                    {
+                        // if we've found all the keys we can exit early
+                        return true;
+                    }
+                    current = sortedByTokens.get(position[0]);
+                    compare = token.compareTo(current.getLeft());
+                }
+
+                ByteBuffer currentKey = partitionKeys.get(current.getRight());
+                if (compare == 0 && buffer.equals(currentKey))  // token and 
key matches
+                {
+                    result.set(current.getRight(), true);
+                    position[0]++;
+                }
+
+                // if we've found all the keys we can exit early
+                return position[0] >= decoratedKeys.size();
+            });
+
+            // mark as false any keys we didn't reach
+            IntStream.range(position[0], sortedByTokens.size())
+                     .forEach(i -> 
result.set(sortedByTokens.get(i).getRight(), false));
+        }
+
+        return result;
+    }
+
+    @Override
+    public void readPartitionKeys(Partitioner partitioner,
+                                  String keyspace,
+                                  String createStmt,
+                                  SSTablesSupplier ssTables,
+                                  @Nullable TokenRange tokenRange,
+                                  @Nullable List<ByteBuffer> partitionKeys,
+                                  @Nullable String[] requiredColumns,
+                                  Consumer<Map<String, Object>> rowConsumer) 
throws IOException
+    {
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspace, 
ReplicationFactor.simpleStrategy(1), partitioner);
+        TableMetadata metadata = schemaBuilder.tableMetaData();
+        CqlTable table = schemaBuilder.build();
+        List<BigInteger> tokens = partitionKeys == null ? 
Collections.emptyList() : toTokens(partitioner, partitionKeys);
+        List<PartitionKeyFilter> partitionKeyFilters = partitionKeys == null ? 
Collections.emptyList() :
+                                                       IntStream
+                                                       .range(0, 
partitionKeys.size())
+                                                       .mapToObj(i -> 
PartitionKeyFilter.create(partitionKeys.get(i), tokens.get(i)))
+                                                       .sorted()
+                                                       
.collect(Collectors.toList());
+
+        try (CellIterator it = new CellIterator(0,
+                                                table,
+                                                Stats.DoNothingStats.INSTANCE,
+                                                TypeConverter.IDENTITY,
+                                                partitionKeyFilters,
+                                                (t) -> 
PruneColumnFilter.of(requiredColumns),
+                                                (partitionId1, 
partitionKeyFilters1, columnFilter1) ->
+                                                new CompactionStreamScanner(
+                                                metadata,
+                                                partitioner,
+                                                TimeProvider.DEFAULT,
+                                                ssTables.openAll((ssTable, 
isRepairPrimary) ->
+                                                                 
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
+                                                                               
                                 .withPartitionKeyFilters(partitionKeyFilters1)
+                                                                               
                                 .build())
+                                                ))
+        {
+            @Override
+            public boolean isInPartition(int partitionId, BigInteger token, 
ByteBuffer partitionKey)
+            {
+                return true;
+            }
+
+            @Override
+            public boolean equals(CqlField field, Object obj1, Object obj2)
+            {
+                return Objects.equals(obj1, obj2);
+            }
+        })
+        {
+            RowIterator<Map<String, Object>> rowIterator = 
RowIterator.rowMapIterator(it, Stats.DoNothingStats.INSTANCE, requiredColumns);
+
+            while (rowIterator.next())
+            {
+                rowConsumer.accept(rowIterator.get());
+            }
+        }
+    }
+
     @Override
     public synchronized void writeSSTable(Partitioner partitioner,
                                           String keyspace,
@@ -336,15 +547,32 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
         {
             throw new RuntimeException("Could not create table metadata needed 
for reading SSTable summaries for keyspace: " + keyspace);
         }
+        return getSSTableSummary(metadata.partitioner, ssTable, 
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
+    }
+
+    @Override
+    public SSTableSummary getSSTableSummary(@NotNull Partitioner partitioner,
+                                            @NotNull SSTable ssTable,
+                                            int minIndexInterval,
+                                            int maxIndexInterval)
+    {
+        return getSSTableSummary(getPartitioner(partitioner), ssTable, 
minIndexInterval, maxIndexInterval);
+    }
+
+    protected SSTableSummary getSSTableSummary(@NotNull IPartitioner 
partitioner,
+                                               @NotNull SSTable ssTable,
+                                               int minIndexInterval,
+                                               int maxIndexInterval)
+    {
         try
         {
-            SummaryDbUtils.Summary summary = 
SummaryDbUtils.readSummary(metadata, ssTable);
-            Pair<DecoratedKey, DecoratedKey> keys = 
Pair.create(summary.first(), summary.last());
-            if (keys.left == null || keys.right == null)
+            SummaryDbUtils.Summary summary = 
SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, 
maxIndexInterval);
+            Pair<DecoratedKey, DecoratedKey> keys = summary == null ? null : 
Pair.of(summary.first(), summary.last());
+            if (summary == null)
             {
-                keys = ReaderUtils.keysFromIndex(metadata, ssTable);
+                keys = ReaderUtils.keysFromIndex(partitioner, ssTable);
             }
-            if (keys.left == null || keys.right == null)
+            if (keys == null)
             {
                 throw new RuntimeException("Could not load SSTable first or 
last tokens for SSTable: " + ssTable.getDataFileName());
             }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
index 0626479..1af3208 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
@@ -64,7 +64,7 @@ public class IndexReader implements IIndexReader
         long startTimeNanos = now;
         try
         {
-            File file = SSTableReader.constructFilename(metadata.keyspace, 
metadata.name, ssTable.getDataFileName());
+            File file = ReaderUtils.constructFilename(metadata.keyspace, 
metadata.name, ssTable.getDataFileName());
             Descriptor descriptor = Descriptor.fromFilename(file);
             Version version = descriptor.version;
 
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 1e7de7f..3c84591 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.reader;
 
 import java.io.DataInputStream;
 import java.io.EOFException;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,15 +30,17 @@ import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.zip.CRC32;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.DecoratedKey;
@@ -62,13 +65,14 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.BloomFilterSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TokenUtils;
 import org.apache.cassandra.utils.vint.VIntCoding;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
@@ -83,6 +87,36 @@ public final class ReaderUtils extends TokenUtils
           .orElseThrow(() -> new RuntimeException("Could not find 
SerializationHeader.Component constructor"));
     public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = 
ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
+    public static Descriptor constructDescriptor(@NotNull String keyspace, 
@NotNull String table, @NotNull SSTable ssTable)
+    {
+        File file = ReaderUtils.constructFilename(keyspace, table, 
ssTable.getDataFileName());
+        return Descriptor.fromFilename(file);
+    }
+
+    /**
+     * Constructs full file path for a given combination of keyspace, table, 
and data file name,
+     * while adjusting for data files with non-standard names prefixed with 
keyspace and table
+     *
+     * @param keyspace Name of the keyspace
+     * @param table    Name of the table
+     * @param filename Name of the data file
+     * @return A full file path, adjusted for non-standard file names
+     */
+    @VisibleForTesting
+    @NotNull
+    public static File constructFilename(@NotNull String keyspace, @NotNull 
String table, @NotNull String filename)
+    {
+        String[] components = filename.split("-");
+        if (components.length == 6
+            && components[0].equals(keyspace)
+            && components[1].equals(table))
+        {
+            filename = filename.substring(keyspace.length() + table.length() + 
2);
+        }
+
+        return new File(String.format("./%s/%s", keyspace, table), filename);
+    }
+
     static
     {
         SERIALIZATION_HEADER.setAccessible(true);
@@ -166,23 +200,30 @@ public final class ReaderUtils extends TokenUtils
         return CompositeType.build(ByteBufferAccessor.instance, isStatic, 
values);
     }
 
+    @Nullable
     public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull 
TableMetadata metadata,
                                                                  @NotNull 
SSTable ssTable) throws IOException
+    {
+        return keysFromIndex(metadata.partitioner, ssTable);
+    }
+
+    @Nullable
+    public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull 
IPartitioner partitioner,
+                                                                 @NotNull 
SSTable ssTable) throws IOException
     {
         try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
         {
             if (primaryIndex != null)
             {
-                IPartitioner partitioner = metadata.partitioner;
-                Pair<ByteBuffer, ByteBuffer> keys = 
readPrimaryIndex(primaryIndex, true, Collections.emptyList());
-                return Pair.create(partitioner.decorateKey(keys.left), 
partitioner.decorateKey(keys.right));
+                Pair<ByteBuffer, ByteBuffer> keys = 
primaryIndexReadFirstAndLastKey(primaryIndex);
+                return Pair.of(partitioner.decorateKey(keys.left), 
partitioner.decorateKey(keys.right));
             }
         }
-        return Pair.create(null, null);
+        return null;
     }
 
-    static boolean anyFilterKeyInIndex(@NotNull SSTable ssTable,
-                                       @NotNull List<PartitionKeyFilter> 
filters) throws IOException
+    public static boolean anyFilterKeyInIndex(@NotNull SSTable ssTable,
+                                              @NotNull 
List<PartitionKeyFilter> filters) throws IOException
     {
         if (filters.isEmpty())
         {
@@ -193,23 +234,37 @@ public final class ReaderUtils extends TokenUtils
         {
             if (primaryIndex != null)
             {
-                Pair<ByteBuffer, ByteBuffer> keys = 
readPrimaryIndex(primaryIndex, false, filters);
-                if (keys.left != null || keys.right != null)
-                {
-                    return false;
-                }
+                return primaryIndexContainsAnyKey(primaryIndex, filters);
             }
         }
-        return true;
+
+        return true; // could not read primary index, so to be safe assume it 
contains the keys
     }
 
-    static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(SSTable ssTable,
-                                                                         
Descriptor descriptor) throws IOException
+    public static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(String keyspace,
+                                                                               
 String table,
+                                                                               
 SSTable ssTable,
+                                                                               
 EnumSet<MetadataType> selectedTypes) throws IOException
+    {
+        return deserializeStatsMetadata(ssTable, selectedTypes, 
constructDescriptor(keyspace, table, ssTable));
+    }
+
+    public static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(SSTable ssTable,
+                                                                               
 Descriptor descriptor) throws IOException
+    {
+        return deserializeStatsMetadata(ssTable,
+                                        EnumSet.of(MetadataType.VALIDATION, 
MetadataType.STATS, MetadataType.HEADER),
+                                        descriptor);
+    }
+
+    public static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(SSTable ssTable,
+                                                                               
 EnumSet<MetadataType> selectedTypes,
+                                                                               
 Descriptor descriptor) throws IOException
     {
         try (InputStream statsStream = ssTable.openStatsStream())
         {
             return deserializeStatsMetadata(statsStream,
-                                            
EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER),
+                                            selectedTypes,
                                             descriptor);
         }
     }
@@ -395,39 +450,69 @@ public final class ReaderUtils extends TokenUtils
         return 
TypeParser.parse(UTF8Type.instance.compose(ByteBufferUtil.readWithVIntLength(in)));
     }
 
+    public static Pair<ByteBuffer, ByteBuffer> 
primaryIndexReadFirstAndLastKey(@NotNull InputStream primaryIndex) throws 
IOException
+    {
+        ByteBuffer[] firstAndLast = new ByteBuffer[]{null, null};
+        readPrimaryIndex(primaryIndex, (buffer) -> {
+            if (firstAndLast[0] == null)
+            {
+                firstAndLast[0] = buffer;
+            }
+            firstAndLast[1] = buffer;
+            return false; // never exit early
+        });
+        return Pair.of(firstAndLast[0], firstAndLast[1]);
+    }
+
+    /**
+     * Reads primary Index.db file returning true and exiting early if it 
contains any of the PartitionKeyFilter
+     *
+     * @param primaryIndex input stream for Index.db file
+     * @param filters      list of filters to search for
+     * @return true if Index.db file contains any of the keys
+     * @throws IOException
+     */
+    public static boolean primaryIndexContainsAnyKey(@NotNull InputStream 
primaryIndex,
+                                                     @NotNull 
List<PartitionKeyFilter> filters) throws IOException
+    {
+        final boolean[] result = new boolean[]{false};
+        readPrimaryIndex(primaryIndex, (buffer) -> {
+            boolean anyMatch = filters.stream().anyMatch(filter -> 
filter.matches(buffer));
+            if (anyMatch)
+            {
+                result[0] = true;
+                return true; // exit early, we found at least one key
+            }
+            return false;
+        });
+        return result[0];
+    }
+
     /**
-     * Read primary Index.db file, read through all partitions to get first 
and last partition key
+     * Read primary Index.db file
      *
      * @param primaryIndex input stream for Index.db file
+     * @param tracker      tracker that consumes each key byffer and returns 
true if can exit early, otherwise continues to read primary index
      * @return pair of first and last decorated keys
      * @throws IOException
      */
-    @SuppressWarnings("InfiniteLoopStatement")
-    static Pair<ByteBuffer, ByteBuffer> readPrimaryIndex(@NotNull InputStream 
primaryIndex,
-                                                         boolean 
readFirstLastKey,
-                                                         @NotNull 
List<PartitionKeyFilter> filters) throws IOException
+    public static void readPrimaryIndex(@NotNull InputStream primaryIndex,
+                                        @NotNull Function<ByteBuffer, Boolean> 
tracker) throws IOException
     {
-        ByteBuffer firstKey = null;
-        ByteBuffer lastKey = null;
         try (DataInputStream dis = new DataInputStream(primaryIndex))
         {
-            byte[] last = null;
             try
             {
                 while (true)
                 {
                     int length = dis.readUnsignedShort();
-                    byte[] buffer = new byte[length];
-                    dis.readFully(buffer);
-                    if (firstKey == null)
+                    byte[] array = new byte[length];
+                    dis.readFully(array);
+                    ByteBuffer buffer = ByteBuffer.wrap(array);
+                    if (tracker.apply(buffer))
                     {
-                        firstKey = ByteBuffer.wrap(buffer);
-                    }
-                    last = buffer;
-                    ByteBuffer key = ByteBuffer.wrap(last);
-                    if (!readFirstLastKey && filters.stream().anyMatch(filter 
-> filter.filter(key)))
-                    {
-                        return Pair.create(null, null);
+                        // exit early if tracker returns true
+                        return;
                     }
 
                     // Read position and skip promoted index
@@ -437,14 +522,7 @@ public final class ReaderUtils extends TokenUtils
             catch (EOFException ignored)
             {
             }
-
-            if (last != null)
-            {
-                lastKey = ByteBuffer.wrap(last);
-            }
         }
-
-        return Pair.create(firstKey, lastKey);
     }
 
     static void skipRowIndexEntry(DataInputStream dis) throws IOException
@@ -505,7 +583,12 @@ public final class ReaderUtils extends TokenUtils
         }
     }
 
-    static BloomFilter readFilter(@NotNull SSTable ssTable, boolean 
hasOldBfFormat) throws IOException
+    public static BloomFilter readFilter(@NotNull SSTable ssTable, Descriptor 
descriptor) throws IOException
+    {
+        return readFilter(ssTable, descriptor.version.hasOldBfFormat());
+    }
+
+    public static BloomFilter readFilter(@NotNull SSTable ssTable, boolean 
hasOldBfFormat) throws IOException
     {
         try (InputStream filterStream = ssTable.openFilterStream())
         {
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
index 6e9079e..e4c3ebc 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
@@ -40,9 +40,9 @@ import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.Pair;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -116,6 +116,7 @@ public class SSTableCache
         return get(summary, ssTable, () -> 
SummaryDbUtils.readSummary(metadata, ssTable));
     }
 
+    @Nullable
     public Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull 
TableMetadata metadata,
                                                           @NotNull SSTable 
ssTable) throws IOException
     {
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 085bc4b..8f821d3 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.spark.reader;
 
 import java.io.DataInputStream;
 import java.io.EOFException;
-import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.math.BigInteger;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Streams;
 import org.slf4j.Logger;
@@ -84,9 +82,9 @@ import 
org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -156,9 +154,12 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
             return this;
         }
 
-        public Builder withPartitionKeyFilters(@NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters)
+        public Builder withPartitionKeyFilters(@Nullable 
Collection<PartitionKeyFilter> partitionKeyFilters)
         {
-            this.partitionKeyFilters.addAll(partitionKeyFilters);
+            if (partitionKeyFilters != null)
+            {
+                this.partitionKeyFilters.addAll(partitionKeyFilters);
+            }
             return this;
         }
 
@@ -243,25 +244,24 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         this.isRepaired = isRepaired;
         this.sparkRangeFilter = sparkRangeFilter;
 
-        File file = constructFilename(metadata.keyspace, metadata.name, 
ssTable.getDataFileName());
-        Descriptor descriptor = Descriptor.fromFilename(file);
+        Descriptor descriptor = 
ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
         this.version = descriptor.version;
 
         SummaryDbUtils.Summary summary = null;
-        Pair<DecoratedKey, DecoratedKey> keys = Pair.create(null, null);
+        Pair<DecoratedKey, DecoratedKey> keys = null;
         try
         {
             now = System.nanoTime();
             summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
             stats.readSummaryDb(ssTable, System.nanoTime() - now);
-            keys = Pair.create(summary.first(), summary.last());
+            keys = Pair.of(summary.first(), summary.last());
         }
         catch (IOException exception)
         {
             LOGGER.warn("Failed to read Summary.db file ssTable='{}'", 
ssTable, exception);
         }
 
-        if (keys.left == null || keys.right == null)
+        if (keys == null)
         {
             LOGGER.warn("Could not load first and last key from Summary.db 
file, so attempting Index.db fileName={}",
                         ssTable.getDataFileName());
@@ -270,7 +270,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
             stats.readIndexDb(ssTable, System.nanoTime() - now);
         }
 
-        if (keys.left == null || keys.right == null)
+        if (keys == null)
         {
             throw new IOException("Could not load SSTable first or last 
tokens");
         }
@@ -406,30 +406,6 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         this.openedNanos = System.nanoTime();
     }
 
-    /**
-     * Constructs full file path for a given combination of keyspace, table, 
and data file name,
-     * while adjusting for data files with non-standard names prefixed with 
keyspace and table
-     *
-     * @param keyspace Name of the keyspace
-     * @param table    Name of the table
-     * @param filename Name of the data file
-     * @return A full file path, adjusted for non-standard file names
-     */
-    @VisibleForTesting
-    @NotNull
-    static File constructFilename(@NotNull String keyspace, @NotNull String 
table, @NotNull String filename)
-    {
-        String[] components = filename.split("-");
-        if (components.length == 6
-                && components[0].equals(keyspace)
-                && components[1].equals(table))
-        {
-            filename = filename.substring(keyspace.length() + table.length() + 
2);
-        }
-
-        return new File(String.format("./%s/%s", keyspace, table), filename);
-    }
-
     private static Map<ByteBuffer, DroppedColumn> buildDroppedColumns(String 
keyspace,
                                                                       String 
table,
                                                                       SSTable 
ssTable,
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
index 0a2df42..a7cc98d 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Helper methods for reading the Summary.db SSTable file component
@@ -75,10 +76,16 @@ public final class SummaryDbUtils
     }
 
     public static Summary readSummary(@NotNull TableMetadata metadata, 
@NotNull SSTable ssTable) throws IOException
+    {
+        return readSummary(ssTable, metadata.partitioner, 
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
+    }
+
+    @Nullable
+    public static Summary readSummary(@NotNull SSTable ssTable, IPartitioner 
partitioner, int minIndexInterval, int maxIndexInterval) throws IOException
     {
         try (InputStream in = ssTable.openSummaryStream())
         {
-            return readSummary(in, metadata.partitioner, 
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
+            return readSummary(in, partitioner, minIndexInterval, 
maxIndexInterval);
         }
     }
 
@@ -92,6 +99,7 @@ public final class SummaryDbUtils
      * @return Summary object
      * @throws IOException io exception
      */
+    @Nullable
     static Summary readSummary(InputStream summaryStream,
                                IPartitioner partitioner,
                                int minIndexInterval,
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
index c3f4fa0..b9c2eb8 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
@@ -55,10 +55,10 @@ import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 import org.apache.cassandra.spark.utils.test.TestSSTable;
 import org.apache.cassandra.spark.utils.test.TestSchema;
-import org.apache.cassandra.utils.Pair;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,7 +98,7 @@ public class ReaderUtilsTests
 
                     String dataFile = 
TestSSTable.firstIn(directory.path()).getDataFileName();
                     Descriptor descriptor = Descriptor.fromFilename(
-                            new File(String.format("./%s/%s", schema.keyspace, 
schema.table), dataFile));
+                    new File(String.format("./%s/%s", schema.keyspace, 
schema.table), dataFile));
                     Path statsFile = TestSSTable.firstIn(directory.path(), 
FileType.STATISTICS);
 
                     // Deserialize stats meta data and verify components match 
expected values
@@ -178,9 +178,9 @@ public class ReaderUtilsTests
                     Pair<DecoratedKey, DecoratedKey> indexKeys;
                     try (InputStream in = new 
BufferedInputStream(Files.newInputStream(indexFile)))
                     {
-                        Pair<ByteBuffer, ByteBuffer> keys = 
ReaderUtils.readPrimaryIndex(in, true, Collections.emptyList());
-                        indexKeys = 
Pair.create(Murmur3Partitioner.instance.decorateKey(keys.left),
-                                                
Murmur3Partitioner.instance.decorateKey(keys.right));
+                        Pair<ByteBuffer, ByteBuffer> keys = 
ReaderUtils.primaryIndexReadFirstAndLastKey(in);
+                        indexKeys = 
Pair.of(Murmur3Partitioner.instance.decorateKey(keys.left),
+                                            
Murmur3Partitioner.instance.decorateKey(keys.right));
                     }
                     assertNotNull(indexKeys);
                     assertEquals(indexKeys.left, summaryKeys.first());
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
index 4ffe403..ec40c7b 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
@@ -37,11 +37,11 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 import org.apache.cassandra.spark.utils.test.TestSSTable;
 import org.apache.cassandra.spark.utils.test.TestSchema;
 import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.Pair;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
index 4c9a402..282b36e 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
@@ -76,11 +76,11 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.TemporaryDirectory;
 import org.apache.cassandra.spark.utils.Throwing;
 import org.apache.cassandra.spark.utils.test.TestSSTable;
 import org.apache.cassandra.spark.utils.test.TestSchema;
-import org.apache.cassandra.utils.Pair;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -276,9 +276,9 @@ public class SSTableReaderTests
                     Pair<DecoratedKey, DecoratedKey> firstAndLast;
                     try (InputStream is = new BufferedInputStream(new 
FileInputStream(indexFile.toFile())))
                     {
-                        Pair<ByteBuffer, ByteBuffer> keys = 
ReaderUtils.readPrimaryIndex(is, true, Collections.emptyList());
-                        firstAndLast = 
Pair.create(BRIDGE.getPartitioner(partitioner).decorateKey(keys.left),
-                                                   
BRIDGE.getPartitioner(partitioner).decorateKey(keys.right));
+                        Pair<ByteBuffer, ByteBuffer> keys = 
ReaderUtils.primaryIndexReadFirstAndLastKey(is);
+                        firstAndLast = 
Pair.of(BRIDGE.getPartitioner(partitioner).decorateKey(keys.left),
+                                               
BRIDGE.getPartitioner(partitioner).decorateKey(keys.right));
                     }
                     BigInteger first = 
ReaderUtils.tokenToBigInteger(firstAndLast.left.getToken());
                     BigInteger last = 
ReaderUtils.tokenToBigInteger(firstAndLast.right.getToken());
@@ -753,25 +753,25 @@ public class SSTableReaderTests
     {
         // Standard SSTable data file name
         assertEquals(new File("./keyspace/table/na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"na-1-big-Data.db"));
 
         // Non-standard SSTable data file name
         assertEquals(new File("./keyspace/table/na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "keyspace-table-na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"keyspace-table-na-1-big-Data.db"));
 
         // Malformed SSTable data file names
         assertEquals(new 
File("./keyspace/table/keyspace-table-qwerty-na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "keyspace-table-qwerty-na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"keyspace-table-qwerty-na-1-big-Data.db"));
         assertEquals(new 
File("./keyspace/table/keyspace-qwerty-na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "keyspace-qwerty-na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"keyspace-qwerty-na-1-big-Data.db"));
         assertEquals(new 
File("./keyspace/table/qwerty-table-na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "qwerty-table-na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"qwerty-table-na-1-big-Data.db"));
         assertEquals(new File("./keyspace/table/keyspace-na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "keyspace-na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"keyspace-na-1-big-Data.db"));
         assertEquals(new File("./keyspace/table/table-na-1-big-Data.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "table-na-1-big-Data.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"table-na-1-big-Data.db"));
         assertEquals(new File("./keyspace/table/qwerty.db"),
-                              SSTableReader.constructFilename("keyspace", 
"table", "qwerty.db"));
+                     ReaderUtils.constructFilename("keyspace", "table", 
"qwerty.db"));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to