This is an automated email from the ASF dual-hosted git repository.

saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f36653d CASSANALYTICS-102: Add TimeRangeFilter to filter out SSTables 
outside given time window (#155)
3f36653d is described below

commit 3f36653d68abcbcea1e41de877dabe1493bc067f
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Thu Nov 20 09:45:54 2025 -0800

    CASSANALYTICS-102: Add TimeRangeFilter to filter out SSTables outside given 
time window (#155)
    
    patch by Saranya Krishnakumar; reviewed by Yifan Cai,  for CASSANALYTICS-102
---
 CHANGES.txt                                        |   1 +
 .../cassandra/spark/sparksql/CellIterator.java     |  12 +-
 .../sparksql/filters/SSTableTimeRangeFilter.java   | 147 ++++++++++++
 .../org/apache/cassandra/spark/utils/CqlUtils.java |  36 ++-
 .../filters/SSTableTimeRangeFilterTest.java        | 141 ++++++++++++
 .../org/apache/cassandra/spark/KryoRegister.java   |   2 +
 .../spark/bulkwriter/SortedSSTableWriter.java      |   2 +
 .../cassandra/spark/data/CassandraDataLayer.java   |  32 ++-
 .../apache/cassandra/spark/data/ClientConfig.java  |  17 ++
 .../org/apache/cassandra/spark/data/DataLayer.java |  18 +-
 .../cassandra/spark/data/LocalDataLayer.java       |  31 +++
 .../spark/sparksql/SparkCellIterator.java          |   1 +
 .../apache/cassandra/spark/utils/FilterUtils.java  |  19 ++
 .../apache/cassandra/spark/SSTableReaderTests.java | 102 +++++++++
 .../cassandra/spark/data/ClientConfigTests.java    |  76 +++++++
 .../cassandra/spark/data/LocalDataLayerTests.java  | 121 ++++++++++
 .../spark/data/PartitionedDataLayerTests.java      |   4 +-
 .../spark/reader/CassandraBridgeUtilTests.java     |   2 +
 .../spark/sparksql/SparkRowIteratorTests.java      |   4 +-
 .../apache/cassandra/spark/utils/CqlUtilsTest.java |  36 +++
 .../BulkReaderFilteringIntegrationTest.java        | 246 +++++++++++++++++++++
 .../apache/cassandra/bridge/CassandraBridge.java   |  44 ++--
 .../bridge/CassandraBridgeImplementation.java      |   8 +-
 .../cassandra/spark/reader/SSTableReader.java      |  33 +++
 .../bridge/CassandraBridgeImplementation.java      |   8 +-
 .../cassandra/spark/reader/SSTableReader.java      |  33 +++
 26 files changed, 1144 insertions(+), 32 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0f2bab39..56c088f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Add TimeRangeFilter to filter out SSTables outside given time window 
(CASSANALYTICS-102)
  * Generated distribution artifacts fix (CASSANALYTICS-105)
  * Fix SSTable descriptor mismatch preventing newly produced SSTables from 
being uploaded (CASSANALYTICS-98)
  * Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
index 971b545e..59707a6a 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -76,13 +77,15 @@ public abstract class CellIterator implements 
Iterator<Cell>, AutoCloseable
     public interface ScannerSupplier
     {
         /**
-         * @param partitionId         arbitrary id uniquely identifying this 
partiton of the bulk read
-         * @param partitionKeyFilters list of partition key filters to 
push-down,
-         * @param columnFilter        optional column filter to only read 
certain columns
+         * @param partitionId               arbitrary id uniquely identifying 
this partiton of the bulk read
+         * @param partitionKeyFilters       list of partition key filters to 
push-down,
+         * @param sstableTimeRangeFilter    sstable time range filter to 
filter based on min and max timestamp
+         * @param columnFilter              optional column filter to only 
read certain columns
          * @return a StreamScanner to iterate over each cell of the data.g
          */
         StreamScanner<RowData> get(int partitionId,
                                    @NotNull List<PartitionKeyFilter> 
partitionKeyFilters,
+                                   @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                    @Nullable PruneColumnFilter columnFilter);
     }
 
@@ -91,6 +94,7 @@ public abstract class CellIterator implements Iterator<Cell>, 
AutoCloseable
                         Stats stats,
                         TypeConverter typeConverter,
                         @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+                        @NotNull SSTableTimeRangeFilter sstableTimeRangeFilter,
                         Function<CqlTable, PruneColumnFilter> 
columnFilterSupplier,
                         ScannerSupplier scannerSupplier)
     {
@@ -116,7 +120,7 @@ public abstract class CellIterator implements 
Iterator<Cell>, AutoCloseable
         // Open compaction scanner
         startTimeNanos = System.nanoTime();
         previousTimeNanos = startTimeNanos;
-        scanner = scannerSupplier.get(partitionId, partitionKeyFilters, 
columnFilter);
+        scanner = scannerSupplier.get(partitionId, partitionKeyFilters, 
sstableTimeRangeFilter, columnFilter);
         long openTimeNanos = System.nanoTime() - startTimeNanos;
         LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
         stats.openedCompactionScanner(openTimeNanos);
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilter.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilter.java
new file mode 100644
index 00000000..6dd48413
--- /dev/null
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link SSTableTimeRangeFilter} to filter out based on timestamp in 
microseconds.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public class SSTableTimeRangeFilter implements Serializable
+{
+    public static final SSTableTimeRangeFilter ALL
+    = new SSTableTimeRangeFilter(Range.closed(0L, Long.MAX_VALUE));
+
+    /**
+     * {@code timeRange} is range of timestamp values represented in 
microseconds. Supports only closed range.
+     */
+    private final Range<Long> timeRange;
+    private final int hashcode;
+
+    /**
+     * Creates a {@link SSTableTimeRangeFilter} with given time {@link Range} 
of timestamp values represented in
+     * microseconds.
+     */
+    private SSTableTimeRangeFilter(Range<Long> timeRange)
+    {
+        this(timeRange, Objects.hash(timeRange));
+    }
+
+    // for serialization
+    private SSTableTimeRangeFilter(Range<Long> timeRange, int hashcode)
+    {
+        this.timeRange = timeRange;
+        this.hashcode = hashcode;
+    }
+
+    /**
+     * Returns the underlying Range.
+     *
+     * @return the time range of timestamp values in microseconds.
+     */
+    @NotNull
+    public Range<Long> range()
+    {
+        return timeRange;
+    }
+
+    /**
+     * Determines if SSTable with min and max timestamp overlap with the 
filter. SSTable is included if it
+     * overlaps with filter time range.
+     *
+     * @param startMicros   SSTable min timestamp in microseconds (inclusive)
+     * @param endMicros     SSTable max timestamp in microseconds (inclusive)
+     * @return true if the SSTable should be included, false if it should be 
omitted.
+     */
+    public boolean overlaps(long startMicros, long endMicros)
+    {
+        // Creates a closed range with startMicros and endMicros
+        Range<Long> sstableTimeRange = Range.closed(startMicros, endMicros);
+
+        // Check if ranges are connected (overlap or adjacent)
+        return timeRange.isConnected(sstableTimeRange);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("TimeRangeFilter%s", timeRange.toString());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (!(o instanceof SSTableTimeRangeFilter))
+        {
+            return false;
+        }
+        SSTableTimeRangeFilter that = (SSTableTimeRangeFilter) o;
+        return timeRange.equals(that.timeRange);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return hashcode;
+    }
+
+    /**
+     * Creates a {@link SSTableTimeRangeFilter} for a specific time range.
+     *
+     * @param startMicros the start timestamp in microseconds (inclusive)
+     * @param endMicros   the end timestamp in microseconds (inclusive)
+     * @return {@link SSTableTimeRangeFilter} with both start and end 
timestamps
+     */
+    @NotNull
+    public static SSTableTimeRangeFilter create(long startMicros, long 
endMicros)
+    {
+        return new SSTableTimeRangeFilter(Range.closed(startMicros, 
endMicros));
+    }
+
+    // Kryo
+
+    public static class Serializer extends 
com.esotericsoftware.kryo.Serializer<SSTableTimeRangeFilter>
+    {
+        public SSTableTimeRangeFilter read(Kryo kryo, Input in, 
Class<SSTableTimeRangeFilter> type)
+        {
+            return new SSTableTimeRangeFilter(Range.closed(in.readLong(), 
in.readLong()), in.readInt());
+        }
+
+        public void write(Kryo kryo, Output out, SSTableTimeRangeFilter object)
+        {
+            out.writeLong(object.timeRange.lowerEndpoint());
+            out.writeLong(object.timeRange.upperEndpoint());
+            out.writeInt(object.hashcode);
+        }
+    }
+}
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
index e694db67..9d989be5 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
@@ -57,6 +57,7 @@ public final class CqlUtils
     private static final Pattern ESCAPED_WHITESPACE_PATTERN = 
Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+");
     private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n");
     private static final Pattern ESCAPED_DOUBLE_BACKSLASH = 
Pattern.compile("\\\\");
+    private static final Pattern COMPACTION_STRATEGY_PATTERN = 
Pattern.compile("compaction\\s*=\\s*\\{\\s*'class'\\s*:\\s*'([^']+)'");
 
     private CqlUtils()
     {
@@ -147,7 +148,7 @@ public final class CqlUtils
         {
             String keyspace = matcher.group(1);
             String table = matcher.group(2);
-            createStmts.put(TableIdentifier.of(keyspace, table), 
extractCleanedTableSchema(cleaned, keyspace, table));
+            createStmts.put(TableIdentifier.of(keyspace, table), 
extractCleanedTableSchema(cleaned, keyspace, table, false));
         }
         return createStmts;
     }
@@ -179,19 +180,20 @@ public final class CqlUtils
 
     public static String extractTableSchema(@NotNull String schemaStr, 
@NotNull String keyspace, @NotNull String table)
     {
-        return extractCleanedTableSchema(cleanCql(schemaStr), keyspace, table);
+        return extractCleanedTableSchema(cleanCql(schemaStr), keyspace, table, 
false);
     }
 
     public static String extractCleanedTableSchema(@NotNull String 
createStatementToClean,
                                                    @NotNull String keyspace,
-                                                   @NotNull String table)
+                                                   @NotNull String table,
+                                                   boolean withTableProps)
     {
         Pattern pattern = Pattern.compile(String.format("CREATE TABLE (IF NOT 
EXISTS)? ?\"?%s?\"?\\.{1}\"?%s\"?[^;]*;", keyspace, table));
         Matcher matcher = pattern.matcher(createStatementToClean);
         if (matcher.find())
         {
             String fullSchema = 
createStatementToClean.substring(matcher.start(0), matcher.end(0));
-            String redactedSchema = removeTableProps(fullSchema);
+            String redactedSchema = withTableProps ? fullSchema : 
removeTableProps(fullSchema);
             String clustering = extractClustering(fullSchema);
             String separator = " WITH ";
             if (clustering != null)
@@ -268,4 +270,30 @@ public final class CqlUtils
         }
         return indexCount;
     }
+
+    /**
+     * Extracts the compaction strategy used from table schema.
+     *
+     * @param tableSchema table schema
+     * @return the compaction strategy, or null if not found
+     */
+    public static String extractCompactionStrategy(@NotNull String tableSchema)
+    {
+        Matcher matcher = COMPACTION_STRATEGY_PATTERN.matcher(tableSchema);
+        if (matcher.find())
+        {
+            return matcher.group(1);
+        }
+        return null;
+    }
+
+    /**
+     * Time range filter is only supported for TimeWindowCompactionStrategy.
+     *
+     * @return true if the strategy is TimeWindowCompactionStrategy, false 
otherwise
+     */
+    public static boolean isTimeRangeFilterSupported(String compactionStrategy)
+    {
+        return compactionStrategy == null || 
compactionStrategy.endsWith("TimeWindowCompactionStrategy");
+    }
 }
diff --git 
a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilterTest.java
 
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilterTest.java
new file mode 100644
index 00000000..52e2312f
--- /dev/null
+++ 
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link SSTableTimeRangeFilter}
+ */
+class SSTableTimeRangeFilterTest
+{
+    @Test
+    void testCreation()
+    {
+        SSTableTimeRangeFilter filter = SSTableTimeRangeFilter.create(100L, 
200L);
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().hasUpperBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(100);
+        assertThat(filter.range().upperEndpoint()).isEqualTo(200);
+    }
+
+    @Test
+    void testThrowsExceptionWhenStartGreaterThanEnd()
+    {
+        assertThatThrownBy(() -> SSTableTimeRangeFilter.create(200L, 100L))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid range: [200‥100]");
+    }
+
+    @Test
+    void testFiltering()
+    {
+        SSTableTimeRangeFilter sstableRange = 
SSTableTimeRangeFilter.create(100L, 200L);
+        assertThat(sstableRange.overlaps(0L, 50L)).isFalse();
+        assertThat(sstableRange.overlaps(0L, 100L)).isTrue();
+        assertThat(sstableRange.overlaps(250L, 300L)).isFalse();
+        assertThat(sstableRange.overlaps(200L, 300L)).isTrue();
+        assertThat(sstableRange.overlaps(120L, 180L)).isTrue();
+        assertThat(sstableRange.overlaps(50L, 150L)).isTrue();
+        assertThat(sstableRange.overlaps(150L, 250L)).isTrue();
+        assertThat(sstableRange.overlaps(50L, 250L)).isTrue();
+    }
+
+    @Test
+    void testToString()
+    {
+        SSTableTimeRangeFilter boundedRangeFilter = 
SSTableTimeRangeFilter.create(100L, 200L);
+        
assertThat(boundedRangeFilter.toString()).isEqualTo("TimeRangeFilter[100‥200]");
+
+        
assertThat(SSTableTimeRangeFilter.ALL.toString()).isEqualTo("TimeRangeFilter[0‥9223372036854775807]");
+    }
+
+    @Test
+    void testEquals()
+    {
+        SSTableTimeRangeFilter filter1 = SSTableTimeRangeFilter.create(100L, 
200L);
+        SSTableTimeRangeFilter filter2 = SSTableTimeRangeFilter.create(100L, 
200L);
+        SSTableTimeRangeFilter filter4 = SSTableTimeRangeFilter.create(100L, 
300L);
+
+        assertThat(filter1).isEqualTo(filter2);
+        assertThat(filter1).isNotEqualTo(filter4);
+    }
+
+    @Test
+    void testHashCode()
+    {
+        SSTableTimeRangeFilter filter1 = SSTableTimeRangeFilter.create(100L, 
200L);
+        SSTableTimeRangeFilter filter2 = SSTableTimeRangeFilter.create(100L, 
200L);
+        SSTableTimeRangeFilter filter3 = SSTableTimeRangeFilter.create(100L, 
300L);
+
+        assertThat(filter1.hashCode()).isEqualTo(filter2.hashCode());
+        assertThat(filter1.hashCode()).isNotEqualTo(filter3.hashCode());
+    }
+
+    @Test
+    void testStartEndSerialization() throws Exception
+    {
+        SSTableTimeRangeFilter original = SSTableTimeRangeFilter.create(100L, 
200L);
+        ByteArrayOutputStream baos = serialize(original);
+        SSTableTimeRangeFilter deserialized = deserialize(baos);
+        assertThat(deserialized).isEqualTo(original);
+        assertThat(deserialized.range().hasLowerBound()).isTrue();
+        assertThat(deserialized.range().lowerEndpoint()).isEqualTo(100L);
+        assertThat(deserialized.range().hasUpperBound()).isTrue();
+        assertThat(deserialized.range().upperEndpoint()).isEqualTo(200L);
+    }
+
+    @Test
+    void testEmptyFilter()
+    {
+        SSTableTimeRangeFilter emptyFilter = SSTableTimeRangeFilter.ALL;
+
+        assertThat(emptyFilter.overlaps(0L, 100L)).isTrue();
+        assertThat(emptyFilter.overlaps(100L, 200L)).isTrue();
+        assertThat(emptyFilter.overlaps(1000L, 2000L)).isTrue();
+        assertThat(emptyFilter.overlaps(0L, 0L)).isTrue();
+    }
+
+    private ByteArrayOutputStream serialize(SSTableTimeRangeFilter filter) 
throws Exception
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(filter);
+        oos.close();
+        return baos;
+    }
+
+    private SSTableTimeRangeFilter deserialize(ByteArrayOutputStream baos) 
throws Exception
+    {
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        SSTableTimeRangeFilter deserialized = (SSTableTimeRangeFilter) 
ois.readObject();
+        ois.close();
+        return deserialized;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
index a259d23c..f5311216 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
 import org.apache.cassandra.spark.data.partitioner.CassandraRing;
 import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.spark.SparkConf;
 import org.apache.spark.serializer.KryoRegistrator;
 import org.jetbrains.annotations.NotNull;
@@ -73,6 +74,7 @@ public class KryoRegister implements KryoRegistrator
         KRYO_SERIALIZERS.put(CassandraDataLayer.class, new 
CassandraDataLayer.Serializer());
         KRYO_SERIALIZERS.put(BigNumberConfigImpl.class, new 
BigNumberConfigImpl.Serializer());
         KRYO_SERIALIZERS.put(SslConfig.class, new SslConfig.Serializer());
+        KRYO_SERIALIZERS.put(SSTableTimeRangeFilter.class, new 
SSTableTimeRangeFilter.Serializer());
     }
 
     public static <T> void addSerializer(@NotNull Class<T> type, @NotNull 
Serializer<T> serializer)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index 7cdaca01..bbea732e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.spark.data.LocalDataLayer;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -264,6 +265,7 @@ public class SortedSSTableWriter
                                                       Collections.emptyList() 
/* requestedFeatures */,
                                                       false /* 
useSSTableInputStream */,
                                                       null /* statsClass */,
+                                                      
SSTableTimeRangeFilter.ALL,
                                                       
outputDirectory.toString());
             if (dataFilePaths != null)
             {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 67b06dd6..b6e84c6f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -88,6 +88,7 @@ import 
org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
 import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
 import org.apache.cassandra.spark.sparksql.RowBuilder;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.cassandra.spark.utils.ReaderTimeProvider;
 import org.apache.cassandra.spark.utils.ScalaFunctions;
@@ -103,6 +104,7 @@ import org.apache.spark.util.ShutdownHookManager;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupported;
 import static 
org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED;
 
 public class CassandraDataLayer extends PartitionedDataLayer implements 
StartupValidatable, Serializable
@@ -144,6 +146,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     protected transient SidecarClient sidecar;
 
     private SslConfig sslConfig;
+    private SSTableTimeRangeFilter sstableTimeRangeFilter;
 
     @VisibleForTesting
     transient Map<String, SidecarInstance> sidecarInstanceMap;
@@ -167,6 +170,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         this.useIncrementalRepair = options.useIncrementalRepair();
         this.lastModifiedTimestampField = options.lastModifiedTimestampField();
         this.requestedFeatures = options.requestedFeatures();
+        this.sstableTimeRangeFilter = options.sstableTimeRangeFilter;
     }
 
     // For serialization
@@ -193,7 +197,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                  @Nullable String lastModifiedTimestampField,
                                  List<SchemaFeature> requestedFeatures,
                                  @NotNull Map<String, ReplicationFactor> rfMap,
-                                 TimeProvider timeProvider)
+                                 TimeProvider timeProvider,
+                                 SSTableTimeRangeFilter sstableTimeRangeFilter)
     {
         super(consistencyLevel, datacenter);
         this.snapshotName = snapshotName;
@@ -219,6 +224,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         }
         this.rfMap = rfMap;
         this.timeProvider = timeProvider;
+        this.sstableTimeRangeFilter = sstableTimeRangeFilter;
         this.maybeQuoteKeyspaceAndTable();
         this.initSidecarClient();
         this.initInstanceMap();
@@ -292,6 +298,17 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         int indexCount = CqlUtils.extractIndexCount(fullSchema, keyspace, 
table);
         Set<String> udts = CqlUtils.extractUdts(fullSchema, keyspace);
         ReplicationFactor replicationFactor = 
CqlUtils.extractReplicationFactor(fullSchema, keyspace);
+
+        String tableSchemaWithProps = 
CqlUtils.extractCleanedTableSchema(fullSchema, keyspace, table, true);
+        String compactionStrategy = 
CqlUtils.extractCompactionStrategy(tableSchemaWithProps);
+        if (sstableTimeRangeFilter != null
+            && sstableTimeRangeFilter != SSTableTimeRangeFilter.ALL
+            && !isTimeRangeFilterSupported(compactionStrategy))
+        {
+            throw new UnsupportedOperationException("SSTableTimeRangeFilter is 
only supported with TimeWindowCompactionStrategy. " +
+                                                    "Current compaction 
strategy is: " + compactionStrategy);
+        }
+
         rfMap = Map.of(keyspace, replicationFactor);
         CompletableFuture<Integer> sizingFuture = 
CompletableFuture.supplyAsync(
         () -> getSizing(ringFuture, replicationFactor, 
options).getEffectiveNumberOfCores(),
@@ -497,6 +514,13 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         return null;
     }
 
+    @NotNull
+    @Override
+    public SSTableTimeRangeFilter sstableTimeRangeFilter()
+    {
+        return sstableTimeRangeFilter;
+    }
+
     @Override
     public CqlTable cqlTable()
     {
@@ -748,6 +772,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         }
         this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
         this.timeProvider = new ReaderTimeProvider(in.readInt());
+        this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject();
         this.maybeQuoteKeyspaceAndTable();
         this.initSidecarClient();
         this.initInstanceMap();
@@ -793,6 +818,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         }
         out.writeObject(this.rfMap);
         out.writeInt(timeProvider.referenceEpochInSeconds());
+        out.writeObject(this.sstableTimeRangeFilter);
     }
 
     private static void writeNullable(ObjectOutputStream out, @Nullable String 
string) throws IOException
@@ -868,6 +894,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             kryo.writeObject(out, listWrapper);
             kryo.writeObject(out, dataLayer.rfMap);
             out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds());
+            kryo.writeObject(out, dataLayer.sstableTimeRangeFilter);
         }
 
         @SuppressWarnings("unchecked")
@@ -909,7 +936,8 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             in.readString(),
             kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
             kryo.readObject(in, HashMap.class),
-            new ReaderTimeProvider(in.readInt()));
+            new ReaderTimeProvider(in.readInt()),
+            kryo.readObject(in, SSTableTimeRangeFilter.class));
         }
 
         // Wrapper only used internally for Kryo serialization/deserialization
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
index 800efb9e..4ca6c921 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
@@ -33,11 +33,13 @@ import org.apache.cassandra.bridge.BigNumberConfigImpl;
 import org.apache.cassandra.spark.config.SchemaFeature;
 import org.apache.cassandra.spark.config.SchemaFeatureSet;
 import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.MapUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.cassandra.spark.data.CassandraDataLayer.aliasLastModifiedTimestamp;
+import static 
org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
 
 public class ClientConfig
 {
@@ -81,6 +83,14 @@ public class ClientConfig
     public static final String ENABLE_EXPANSION_SHRINK_CHECK_KEY = 
"enableExpansionShrinkCheck";
     public static final String SIDECAR_PORT = "sidecar_port";
     public static final String QUOTE_IDENTIFIERS = "quote_identifiers";
+    /**
+     * {@code sstable_start_timestamp_micros} and {@code 
sstable_end_timestamp_micros} define a time range filter
+     * for SSTable selection. Both timestamps are represented in microseconds 
and both bounds are always inclusive
+     * (closed range).
+     */
+    public static final String SSTABLE_START_TIMESTAMP_MICROS = 
"sstable_start_timestamp_micros";
+    public static final String SSTABLE_END_TIMESTAMP_MICROS = 
"sstable_end_timestamp_micros";
+
     public static final int DEFAULT_SIDECAR_PORT = 9043;
 
     protected String sidecarContactPoints;
@@ -107,6 +117,7 @@ public class ClientConfig
     protected Boolean enableExpansionShrinkCheck;
     protected int sidecarPort;
     protected boolean quoteIdentifiers;
+    protected SSTableTimeRangeFilter sstableTimeRangeFilter;
 
     protected ClientConfig(Map<String, String> options)
     {
@@ -138,6 +149,7 @@ public class ClientConfig
         this.requestedFeatures = initRequestedFeatures(options);
         this.sidecarPort = MapUtils.getInt(options, SIDECAR_PORT, 
DEFAULT_SIDECAR_PORT);
         this.quoteIdentifiers = MapUtils.getBoolean(options, 
QUOTE_IDENTIFIERS, false);
+        this.sstableTimeRangeFilter = parseSSTableTimeRangeFilter(options);
     }
 
     protected String parseSidecarContactPoints(Map<String, String> options)
@@ -277,6 +289,11 @@ public class ClientConfig
         return quoteIdentifiers;
     }
 
+    public SSTableTimeRangeFilter sstableTimeRangeFilter()
+    {
+        return sstableTimeRangeFilter;
+    }
+
     public static ClientConfig create(Map<String, String> options)
     {
         return new ClientConfig(options);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
index 6878f869..394913ac 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
@@ -47,6 +47,7 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.Filter;
@@ -198,6 +199,17 @@ public abstract class DataLayer implements Serializable
         return null;
     }
 
+    /**
+     * Returns {@link SSTableTimeRangeFilter} to filter out SSTables based on 
min and max timestamp.
+     *
+     * @return {@link SSTableTimeRangeFilter}
+     */
+    @NotNull
+    public SSTableTimeRangeFilter sstableTimeRangeFilter()
+    {
+        return SSTableTimeRangeFilter.ALL;
+    }
+
     /**
      * DataLayer implementation should provide an ExecutorService for doing 
blocking I/O
      * when opening SSTable readers.
@@ -224,9 +236,9 @@ public abstract class DataLayer implements Serializable
      */
     public abstract String jobId();
 
-    public StreamScanner openCompactionScanner(int partitionId, 
List<PartitionKeyFilter> partitionKeyFilters)
+    public StreamScanner openCompactionScanner(int partitionId, 
List<PartitionKeyFilter> partitionKeyFilters, SSTableTimeRangeFilter 
sstableTimeRangeFilter)
     {
-        return openCompactionScanner(partitionId, partitionKeyFilters, null);
+        return openCompactionScanner(partitionId, partitionKeyFilters, 
sstableTimeRangeFilter, null);
     }
 
     /**
@@ -262,6 +274,7 @@ public abstract class DataLayer implements Serializable
      */
     public StreamScanner<RowData> openCompactionScanner(int partitionId,
                                                         
List<PartitionKeyFilter> partitionKeyFilters,
+                                                        SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                                         @Nullable 
PruneColumnFilter columnFilter)
     {
         List<PartitionKeyFilter> filtersInRange;
@@ -279,6 +292,7 @@ public abstract class DataLayer implements Serializable
                                              sstables(partitionId, 
sparkRangeFilter, filtersInRange),
                                              sparkRangeFilter,
                                              filtersInRange,
+                                             sstableTimeRangeFilter,
                                              columnFilter,
                                              timeProvider(),
                                              readIndexOffset(),
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index bef35240..a6d44904 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -59,12 +59,17 @@ import 
org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
+import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.cassandra.spark.utils.Throwing;
 import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.parquet.Strings;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupported;
+import static 
org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
+
 /**
  * Basic DataLayer implementation to read SSTables from local file system. 
Mostly used for testing.
  */
@@ -83,6 +88,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
     private boolean useBufferingInputStream;
     private String[] paths;
     private int minimumReplicasPerMutation = 1;
+    private SSTableTimeRangeFilter sstableTimeRangeFilter;
     private Set<Path> dataFilePaths = null;
 
     @Nullable
@@ -178,6 +184,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                 SchemaFeatureSet.initializeFromOptions(options),
                 getBoolean(options, lowerCaseKey("useBufferingInputStream"), 
getBoolean(options, lowerCaseKey("useSSTableInputStream"), false)),
                 options.get(lowerCaseKey("statsClass")),
+                parseSSTableTimeRangeFilter(options),
                 getOrThrow(options, lowerCaseKey("dirs")).split(","));
     }
 
@@ -194,6 +201,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
              Collections.emptyList(),
              false,
              null,
+             SSTableTimeRangeFilter.ALL,
              paths);
     }
 
@@ -211,6 +219,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
              Collections.emptyList(),
              false,
              null,
+             SSTableTimeRangeFilter.ALL,
              paths);
     }
 
@@ -223,6 +232,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                           @NotNull List<SchemaFeature> requestedFeatures,
                           boolean useBufferingInputStream,
                           @Nullable String statsClass,
+                          @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                           String... paths)
     {
         this.bridge = CassandraBridgeFactory.get(version);
@@ -237,6 +247,14 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
         this.requestedFeatures = requestedFeatures;
         this.useBufferingInputStream = useBufferingInputStream;
         this.statsClass = statsClass;
+        this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+        String compactionStrategy = 
CqlUtils.extractCompactionStrategy(cqlTable.createStatement());
+        if (sstableTimeRangeFilter != SSTableTimeRangeFilter.ALL
+            && !isTimeRangeFilterSupported(compactionStrategy))
+        {
+            throw new UnsupportedOperationException("SSTableTimeRangeFilter is 
only supported with TimeWindowCompactionStrategy. " +
+                                                    "Current compaction 
strategy is: " + compactionStrategy);
+        }
         this.paths = paths;
         this.dataFilePaths = new HashSet<>();
     }
@@ -249,6 +267,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                            @NotNull List<SchemaFeature> requestedFeatures,
                            boolean useBufferingInputStream,
                            @Nullable String statsClass,
+                           @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                            String... paths)
     {
         this.bridge = CassandraBridgeFactory.get(version);
@@ -258,6 +277,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
         this.requestedFeatures = requestedFeatures;
         this.useBufferingInputStream = useBufferingInputStream;
         this.statsClass = statsClass;
+        this.sstableTimeRangeFilter = sstableTimeRangeFilter;
         this.paths = paths;
     }
 
@@ -291,6 +311,13 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
         return true;
     }
 
+    @NotNull
+    @Override
+    public SSTableTimeRangeFilter sstableTimeRangeFilter()
+    {
+        return sstableTimeRangeFilter;
+    }
+
     @Override
     public TimeProvider timeProvider()
     {
@@ -429,6 +456,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
         {
             out.writeUTF(this.statsClass);
         }
+        out.writeObject(this.sstableTimeRangeFilter);
         out.writeObject(this.paths);
         out.writeInt(this.minimumReplicasPerMutation);
     }
@@ -445,6 +473,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                                        .collect(Collectors.toList());
         this.useBufferingInputStream = in.readBoolean();
         this.statsClass = in.readBoolean() ? in.readUTF() : null;
+        this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject();
         this.paths = (String[]) in.readObject();
         this.minimumReplicasPerMutation = in.readInt();
     }
@@ -465,6 +494,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                                                           
.toArray(String[]::new));
             out.writeBoolean(object.useBufferingInputStream);
             out.writeString(object.statsClass);
+            kryo.writeObject(out, object.sstableTimeRangeFilter);
             kryo.writeObject(out, object.paths);
             out.writeInt(object.minimumReplicasPerMutation);
         }
@@ -482,6 +512,7 @@ public class LocalDataLayer extends DataLayer implements 
Serializable
                           .collect(Collectors.toList()),
                     in.readBoolean(),
                     in.readString(),
+                    kryo.readObject(in, SSTableTimeRangeFilter.class),
                     kryo.readObject(in, String[].class)
             ).withMinimumReplicasPerMutation(in.readInt());
         }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
index e5d164ca..35fbc162 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
@@ -53,6 +53,7 @@ public class SparkCellIterator extends CellIterator
               dataLayer.stats(),
               dataLayer.typeConverter(),
               partitionKeyFilters,
+              dataLayer.sstableTimeRangeFilter(),
               (cqlTable) -> buildColumnFilter(requiredSchema, cqlTable),
               dataLayer::openCompactionScanner);
         this.dataLayer = dataLayer;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
index 05e4812c..66acb812 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
@@ -31,10 +31,14 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.In;
 
+import static 
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_END_TIMESTAMP_MICROS;
+import static 
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_START_TIMESTAMP_MICROS;
+
 public final class FilterUtils
 {
     private FilterUtils()
@@ -111,4 +115,19 @@ public final class FilterUtils
             }
         }
     }
+
+    /**
+     * Parses {@link SSTableTimeRangeFilter} from spark options.
+     */
+    public static SSTableTimeRangeFilter 
parseSSTableTimeRangeFilter(Map<String, String> options)
+    {
+        if (!options.containsKey(SSTABLE_START_TIMESTAMP_MICROS) && 
!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+        {
+            return SSTableTimeRangeFilter.ALL;
+        }
+
+        long startTimestamp = MapUtils.getLong(options, 
SSTABLE_START_TIMESTAMP_MICROS, 0L);
+        long endTimestamp = MapUtils.getLong(options, 
SSTABLE_END_TIMESTAMP_MICROS, Long.MAX_VALUE);
+        return SSTableTimeRangeFilter.create(startTimestamp, endTimestamp);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
index d8b04f73..1bec25ea 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.spark.utils.test.TestSchema;
@@ -105,6 +106,7 @@ public class SSTableReaderTests
                                                                               
ssTableSupplier,
                                                                               
null,
                                                                               
Collections.emptyList(),
+                                                                              
SSTableTimeRangeFilter.ALL,
                                                                               
null,
                                                                               
navigatableTimeProvider,
                                                                               
false,
@@ -143,4 +145,104 @@ public class SSTableReaderTests
         .forAll(TestUtils.partitioners())
         .checkAssert(partitioner -> runTest(partitioner, bridgeForTest, test));
     }
+
+    // TimeRangeFilter Tests
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testTimeRangeFilterSkipsSSTablesOutsideRange(CassandraBridge 
bridge)
+    {
+        // Use a reference time 5 days in the past
+        long fiveDaysAgoMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()) - 
TimeUnit.DAYS.toMicros(5);
+
+        // Create time range filter that excludes the SSTable (range from 5 
days ago to 4 days ago)
+        SSTableTimeRangeFilter excludingFilter = SSTableTimeRangeFilter.create(
+            fiveDaysAgoMicros,                                              // 
5 days ago
+            fiveDaysAgoMicros + TimeUnit.DAYS.toMicros(1)           // 4 days 
ago
+        );
+
+        // Should read 0 rows because SSTable is outside time range
+        testSSTableFiltering(bridge, excludingFilter, 0);
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testTimeRangeFilterIncludesSSTablesWithinRange(CassandraBridge 
bridge)
+    {
+        // Use a reference time 5 days in the past
+        long fiveDaysAgoMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())
+                                 - TimeUnit.DAYS.toMicros(5);
+
+        // Create time range filter from 5 days ago to 1 day in future - will 
include current SSTable
+        SSTableTimeRangeFilter includingFilter = SSTableTimeRangeFilter.create(
+        fiveDaysAgoMicros,                                              // 5 
days ago
+        fiveDaysAgoMicros + TimeUnit.DAYS.toMicros(6)           // 1 day in 
future
+        );
+
+        // Should read all 10 rows because SSTable timestamp is within the 
filter range
+        testSSTableFiltering(bridge, includingFilter, 10);
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testNoTimeRangeFilterReadsAllSSTables(CassandraBridge bridge)
+    {
+        // Should read all 10 rows with ALL filter
+        testSSTableFiltering(bridge, SSTableTimeRangeFilter.ALL, 10);
+    }
+
+    private void testSSTableFiltering(CassandraBridge bridgeForTest,
+                                      SSTableTimeRangeFilter filter,
+                                      int expectedRowCount)
+    {
+        TestRunnable test = (partitioner, dir, bridgeInTest) -> {
+            TestSchema schema = TestSchema.builder(bridgeInTest)
+                                          .withPartitionKey("a", 
bridgeInTest.aInt())
+                                          .withColumn("b", bridgeInTest.aInt())
+                                          .build();
+
+            // Write SSTable with data
+            schema.writeSSTable(dir, bridgeInTest, partitioner, writer -> {
+                for (int i = 0; i < 10; i++)
+                {
+                    writer.write(i, i);
+                }
+            });
+
+            assertThat(countSSTables(dir)).isEqualTo(1);
+
+            CqlTable table = schema.buildTable();
+            TestDataLayer dataLayer = new TestDataLayer(bridgeInTest,
+                                                        getFileType(dir, 
FileType.DATA).collect(Collectors.toList()), table);
+            BasicSupplier ssTableSupplier = new 
BasicSupplier(dataLayer.listSSTables().collect(Collectors.toSet()));
+
+            int rowCount = 0;
+            try (StreamScanner<RowData> scanner = 
bridgeInTest.getCompactionScanner(
+            table, partitioner, ssTableSupplier, null, Collections.emptyList(),
+            filter, null, TimeProvider.DEFAULT,
+            false, false, Stats.DoNothingStats.INSTANCE))
+            {
+                RowData rowData = scanner.data();
+                while (scanner.next())
+                {
+                    scanner.advanceToNextColumn();
+
+                    ByteBuffer colBuf = rowData.getColumnName();
+                    String colName = 
ByteBufferUtils.string(ByteBufferUtils.readBytesWithShortLength(colBuf));
+                    colBuf.get();
+                    if (StringUtils.isEmpty(colName))
+                    {
+                        continue;
+                    }
+
+                    rowCount++;
+                }
+            }
+
+            assertThat(rowCount).isEqualTo(expectedRowCount);
+        };
+
+        qt().forAll(TestUtils.partitioners())
+            .checkAssert(partitioner -> runTest(partitioner, bridgeForTest, 
test));
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
index f9f6b614..b42e33d2 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
@@ -23,10 +23,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
+
+import static com.google.common.collect.BoundType.CLOSED;
 import static 
org.apache.cassandra.spark.data.ClientConfig.SNAPSHOT_TTL_PATTERN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -132,4 +136,76 @@ class ClientConfigTests
                                                        clearSnapshot,
                                                        
clearSnapshotStrategyOption);
     }
+
+    @Test
+    void testEmptyTimeRangeFilterWhenNoTimestampsProvided()
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        ClientConfig clientConfig = ClientConfig.create(options);
+        SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+        assertThat(filter).isEqualTo(SSTableTimeRangeFilter.ALL);
+    }
+
+    @Test
+    void testTimeRangeFilterWithBothTimestamps()
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("sstable_start_timestamp_micros", "1000");
+        options.put("sstable_end_timestamp_micros", "2000");
+        ClientConfig clientConfig = ClientConfig.create(options);
+        SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+
+        assertThat(filter).isNotNull();
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().hasUpperBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(1000L);
+        assertThat(filter.range().upperEndpoint()).isEqualTo(2000L);
+        assertThat(filter.range().lowerBoundType()).isEqualTo(CLOSED);
+        assertThat(filter.range().upperBoundType()).isEqualTo(CLOSED);
+        assertThat(clientConfig.sstableTimeRangeFilter()).isNotNull();
+        
assertThat(clientConfig.sstableTimeRangeFilter().range().lowerEndpoint()).isEqualTo(1000L);
+        
assertThat(clientConfig.sstableTimeRangeFilter().range().upperEndpoint()).isEqualTo(2000L);
+    }
+
+    @Test
+    void testTimeRangeFilterWithOnlyStartTimestamp()
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("sstable_start_timestamp_micros", "5000");
+        ClientConfig clientConfig = ClientConfig.create(options);
+        SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+
+        assertThat(filter).isNotNull();
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(5000L);
+        assertThat(filter.range().hasUpperBound()).isTrue();
+        assertThat(filter.range().upperEndpoint()).isEqualTo(Long.MAX_VALUE);
+    }
+
+    @Test
+    void testTimeRangeFilterWithOnlyEndTimestamp()
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("sstable_end_timestamp_micros", "3000");
+        ClientConfig clientConfig = ClientConfig.create(options);
+        SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+
+        assertThat(filter).isNotNull();
+        assertThat(filter.range().hasLowerBound()).isTrue();
+        assertThat(filter.range().hasUpperBound()).isTrue();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(0L);
+        assertThat(filter.range().upperEndpoint()).isEqualTo(3000L);
+    }
+
+    @Test
+    void testTimeRangeFilterValidatesRangeOrdering()
+    {
+        Map<String, String> options = new 
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+        options.put("sstable_start_timestamp_micros", "2000");
+        options.put("sstable_end_timestamp_micros", "1000");
+
+        assertThatThrownBy(() -> ClientConfig.create(options))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessageContaining("Invalid range: [2000‥1000]");
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
index 863d6c57..f465f033 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
@@ -19,13 +19,19 @@
 
 package org.apache.cassandra.spark.data;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import org.junit.jupiter.params.ParameterizedTest;
@@ -35,9 +41,12 @@ import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.reader.SchemaTests;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static com.google.common.collect.BoundType.CLOSED;
 
 public class LocalDataLayerTests extends VersionRunner
 {
@@ -88,4 +97,116 @@ public class LocalDataLayerTests extends VersionRunner
         assertThat(dataLayer2).isEqualTo(dataLayer1);
         assertThat(dataLayer2.hashCode()).isEqualTo(dataLayer1.hashCode());
     }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+    public void testTimeRangeFilterFromOptions(CassandraBridge bridge)
+    {
+        String schemaWithTWCS = schemaWithTWCS();
+
+        Map<String, String> options = new HashMap<>();
+        options.put("version", bridge.getVersion().name());
+        options.put("partitioner", Partitioner.Murmur3Partitioner.name());
+        options.put("keyspace", "test_keyspace");
+        options.put("createstmt", schemaWithTWCS);
+        options.put("dirs", "/tmp/data1,/tmp/data2");
+        options.put("sstable_start_timestamp_micros", "1000");
+        options.put("sstable_end_timestamp_micros", "2000");
+
+        LocalDataLayer dataLayer = LocalDataLayer.from(options);
+
+        SSTableTimeRangeFilter filter = dataLayer.sstableTimeRangeFilter();
+        assertThat(filter.range().lowerEndpoint()).isEqualTo(1000L);
+        assertThat(filter.range().upperEndpoint()).isEqualTo(2000L);
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+    public void testTimeRangeFilterNotSupportedWithLCS(CassandraBridge bridge)
+    {
+        String schemaWithLeveledCompaction = "CREATE TABLE 
test_keyspace.test_table (\n"
+                                           + "    id uuid,\n"
+                                           + "    value text,\n"
+                                           + "    PRIMARY KEY(id)\n"
+                                           + ") WITH compaction = {'class': 
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}";
+
+        CassandraVersion version = bridge.getVersion();
+        SSTableTimeRangeFilter filter = SSTableTimeRangeFilter.create(1000L, 
2000L);
+
+        assertThatThrownBy(() -> new LocalDataLayer(
+            version,
+            Partitioner.Murmur3Partitioner,
+            "test_keyspace",
+            schemaWithLeveledCompaction,
+            Collections.emptySet(),
+            Collections.emptyList(),
+            false,
+            null,
+            filter,
+            "/tmp/data1", "/tmp/data2"
+        ))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageContaining("SSTableTimeRangeFilter is only supported with 
TimeWindowCompactionStrategy. " +
+                              "Current compaction strategy is: 
org.apache.cassandra.db.compaction.LeveledCompactionStrategy");
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+    public void testSerializationWithTimeRangeFilter(CassandraBridge bridge) 
throws Exception
+    {
+        // Use TimeWindowCompactionStrategy since time range filters are only 
supported with TWCS
+        String schemaWithTWCS = schemaWithTWCS();
+
+        CassandraVersion version = bridge.getVersion();
+        SSTableTimeRangeFilter filter = SSTableTimeRangeFilter.create(1000L, 
2000L);
+        LocalDataLayer dataLayer = new LocalDataLayer(
+            version,
+            Partitioner.Murmur3Partitioner,
+            "test_keyspace",
+            schemaWithTWCS,
+            Collections.emptySet(),
+            Collections.emptyList(),
+            false,
+            null,
+            filter,
+            "/tmp/data1", "/tmp/data2"
+        );
+
+        ByteArrayOutputStream baos = serialize(dataLayer);
+        LocalDataLayer deserialized = deserialize(baos);
+
+        SSTableTimeRangeFilter deserializedFilter = 
deserialized.sstableTimeRangeFilter();
+        assertThat(deserializedFilter).isEqualTo(filter);
+        
assertThat(deserializedFilter.range().lowerEndpoint()).isEqualTo(1000L);
+        
assertThat(deserializedFilter.range().upperEndpoint()).isEqualTo(2000L);
+        
assertThat(deserializedFilter.range().lowerBoundType()).isEqualTo(CLOSED);
+        
assertThat(deserializedFilter.range().upperBoundType()).isEqualTo(CLOSED);
+    }
+
+    private ByteArrayOutputStream serialize(LocalDataLayer dataLayer) throws 
Exception
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(dataLayer);
+        oos.close();
+        return baos;
+    }
+
+    private LocalDataLayer deserialize(ByteArrayOutputStream baos) throws 
Exception
+    {
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        LocalDataLayer deserialized = (LocalDataLayer) ois.readObject();
+        ois.close();
+        return deserialized;
+    }
+
+    private String schemaWithTWCS()
+    {
+        return "CREATE TABLE test_keyspace.test_table2 (\n"
+               + "    id uuid,\n"
+               + "    value text,\n"
+               + "    PRIMARY KEY(id)\n"
+               + ") WITH compaction = {'class': 
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}";
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
index b1169963..2768b146 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
@@ -52,6 +52,7 @@ import 
org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
 import org.apache.cassandra.spark.reader.EmptyStreamScanner;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.test.TestSchema;
 import org.apache.spark.TaskContext;
 
@@ -303,7 +304,8 @@ public class PartitionedDataLayerTests extends VersionRunner
 
         // Filter does not fall in spark token range
         StreamScanner scanner = dataLayer.openCompactionScanner(partitionId,
-                                                                
Collections.singletonList(filterOutsideRange));
+                                                                
Collections.singletonList(filterOutsideRange),
+                                                                
SSTableTimeRangeFilter.ALL);
         assertThat(scanner).isInstanceOf(EmptyStreamScanner.class);
     }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
index df059167..b9cb8449 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.spark.TestUtils;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.test.TestSSTable;
 import org.apache.cassandra.spark.utils.test.TestSchema;
@@ -261,6 +262,7 @@ public class CassandraBridgeUtilTests
                                            null,
                                            
expected.keySet().stream().map(Collections::singletonList).collect(Collectors.toList()),
                                            null,
+                                           SSTableTimeRangeFilter.ALL,
                                            (row) -> 
actual.put(row.get("a").toString(), row)
             );
             assertThat(actual).hasSameSizeAs(expected);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
index d9ca8b18..e13aeb52 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.test.TestSchema;
 
@@ -189,6 +190,7 @@ public class SparkRowIteratorTests
         when(dataLayer.stats()).thenReturn(Stats.DoNothingStats.INSTANCE);
         when(dataLayer.requestedFeatures()).thenCallRealMethod();
         when(dataLayer.typeConverter()).thenReturn(typeConverter);
+        
when(dataLayer.sstableTimeRangeFilter()).thenReturn(SSTableTimeRangeFilter.ALL);
 
         // Mock scanner
         StreamScanner scanner = mock(StreamScanner.class);
@@ -251,7 +253,7 @@ public class SparkRowIteratorTests
             return true;
         }).when(scanner).next();
 
-        when(dataLayer.openCompactionScanner(anyInt(), 
anyListOf(PartitionKeyFilter.class), any())).thenReturn(scanner);
+        when(dataLayer.openCompactionScanner(anyInt(), 
anyListOf(PartitionKeyFilter.class), any(), any())).thenReturn(scanner);
 
         // Use SparkRowIterator and verify values match expected
         SparkRowIterator it = new SparkRowIterator(0, dataLayer);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
index 17b7d3ce..5bb363d9 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.VersionRunner;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 
+import static 
org.apache.cassandra.spark.utils.CqlUtils.extractCompactionStrategy;
+import static 
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupported;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -797,6 +799,40 @@ public class CqlUtilsTest extends VersionRunner
     }
 
 
+    @Test
+    public void testTimeRangeFilterSupported()
+    {
+        String schemaWithTWCS = "CREATE TABLE k.t (a int PRIMARY KEY, b int) " 
+
+                                "WITH compaction = { 'class' : 
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'};";
+        String twcsCompaction = extractCompactionStrategy(schemaWithTWCS);
+        
assertThat(twcsCompaction).isEqualTo("org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy");
+        assertThat(isTimeRangeFilterSupported(twcsCompaction)).isTrue();
+
+        String schemaWithTWCSNoSpace = "CREATE TABLE k.t (a int PRIMARY KEY, b 
int) " +
+                                "WITH compaction = 
{'class':'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'};";
+        String twcsCompactionNoSpace = 
extractCompactionStrategy(schemaWithTWCSNoSpace);
+        
assertThat(twcsCompactionNoSpace).isEqualTo("org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy");
+        assertThat(isTimeRangeFilterSupported(twcsCompactionNoSpace)).isTrue();
+
+        String schemaWithLCS = "CREATE TABLE k.t (a int PRIMARY KEY, b int) " +
+                               "WITH compaction = { 'class' : 
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'};";
+        String lcsCompaction = extractCompactionStrategy(schemaWithLCS);
+        
assertThat(lcsCompaction).isEqualTo("org.apache.cassandra.db.compaction.LeveledCompactionStrategy");
+        assertThat(isTimeRangeFilterSupported(lcsCompaction)).isFalse();
+
+        String schemaWithSTCS = "CREATE TABLE k.t (a int PRIMARY KEY, b int) " 
+
+                                "WITH compaction = {'class': 
'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'};";
+        String stcsCompaction = extractCompactionStrategy(schemaWithSTCS);
+        assertThat(isTimeRangeFilterSupported(stcsCompaction)).isFalse();
+
+        String schemaNoCompaction = "CREATE TABLE k.t (a int PRIMARY KEY, b 
int);";
+        String nullCompaction = extractCompactionStrategy(schemaNoCompaction);
+        assertThat(nullCompaction).isNull();
+        assertThat(isTimeRangeFilterSupported(nullCompaction)).isTrue();
+
+        assertThat(isTimeRangeFilterSupported("")).isFalse();
+    }
+
     private static String loadFullSchemaSample() throws IOException
     {
         Path fullSchemaSampleFile = 
ResourceUtils.writeResourceToPath(CqlUtilsTest.class.getClassLoader(), 
tempPath, "cql/fullSchema.cql");
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java
new file mode 100644
index 00000000..31758a46
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static 
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_END_TIMESTAMP_MICROS;
+import static 
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_START_TIMESTAMP_MICROS;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration test for various filters used during bulk reading.
+ */
+class BulkReaderFilteringIntegrationTest extends 
SharedClusterSparkIntegrationTestBase
+{
+    static final int DATA_SIZE = 1000;
+
+    QualifiedName twcsTable = uniqueTestTableFullName(TEST_KEYSPACE);
+    QualifiedName lcsTable = uniqueTestTableFullName(TEST_KEYSPACE);
+
+    // Use base timestamp that's 10 minutes in the past
+    static final long BASE_TIMESTAMP_MILLIS = System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(10);
+
+    // Separate each batch by 2 minutes to ensure they go into different TWCS 
windows (1 minute window size)
+    static final long EARLY_TIMESTAMP_MICROS = 
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS);
+    static final long MIDDLE_TIMESTAMP_MICROS = 
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS + 
TimeUnit.MINUTES.toMillis(2));
+    static final long LATE_TIMESTAMP_MICROS = 
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS + 
TimeUnit.MINUTES.toMillis(4));
+
+    @Test
+    void testReadAllDataWithoutTimeRangeFilter()
+    {
+        // Read all data without any time range filter
+        Map<String, String> timeRangeOptions = Map.of();
+        int expectedDataSize = DATA_SIZE * 3; // all 3 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS, 
MIDDLE_TIMESTAMP_MICROS, LATE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartBoundInclusive()
+    {
+        // Read data starting MIDDLE_TIMESTAMP
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, 
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString());
+        int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(MIDDLE_TIMESTAMP_MICROS, 
LATE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartBoundExclusive()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS + 
1).toString());
+        Set<Long> expectedSSTableTimestamps = Set.of(LATE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE, 
expectedSSTableTimestamps); // 1 SSTables read
+    }
+
+    @Test
+    void testTimeRangeFilterWithEndBoundInclusive()
+    {
+        // Read data ending with MIDDLE_TIMESTAMP inclusive
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_END_TIMESTAMP_MICROS, 
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString());
+        int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS, 
MIDDLE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterWithEndBoundExclusive()
+    {
+        // Read data ending with MIDDLE_TIMESTAMP exclusive
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(MIDDLE_TIMESTAMP_MICROS - 
1).toString());
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE, 
expectedSSTableTimestamps); // 1 SSTables read
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartAndEndBound()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, 
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString(),
+                                                      
SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS - 
1).toString());
+        Set<Long> expectedSSTableTimestamps = Set.of(MIDDLE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE, 
expectedSSTableTimestamps); // 1 SSTables read
+    }
+
+    @Test
+    void testTimeRangeFilterWithStartAndEndBoundExclusive()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_START_TIMESTAMP_MICROS, Long.valueOf(EARLY_TIMESTAMP_MICROS + 
1).toString(),
+                                                      
SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS - 
1).toString());
+        int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+        Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS, 
MIDDLE_TIMESTAMP_MICROS);
+        runTimeRangeFilterTest(timeRangeOptions, expectedDataSize, 
expectedSSTableTimestamps);
+    }
+
+    @Test
+    void testTimeRangeFilterNonOverlappingBound()
+    {
+        Map<String, String> timeRangeOptions = 
Map.of(SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(EARLY_TIMESTAMP_MICROS - 
1).toString());
+        Dataset<Row> data = bulkReaderDataFrame(twcsTable, 
timeRangeOptions).load();
+
+        List<Row> rows = data.collectAsList();
+        assertThat(rows.size()).isEqualTo(0); // no data read
+    }
+
+    @Test
+    void testTimeRangeFilterWithoutTWCS()
+    {
+        // Attempt to use time range filter with non-TWCS table should throw 
exception
+        Map<String, String> timeRangeOptions = Map.of(
+            SSTABLE_START_TIMESTAMP_MICROS, 
Long.valueOf(EARLY_TIMESTAMP_MICROS).toString(),
+            SSTABLE_END_TIMESTAMP_MICROS, 
Long.valueOf(LATE_TIMESTAMP_MICROS).toString()
+        );
+
+        assertThatThrownBy(() -> {
+            Dataset<Row> data = bulkReaderDataFrame(lcsTable, 
timeRangeOptions).load();
+            data.collectAsList();
+        })
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("SSTableTimeRangeFilter is only supported with 
TimeWindowCompactionStrategy. " +
+                    "Current compaction strategy is: 
org.apache.cassandra.db.compaction.LeveledCompactionStrategy");
+    }
+
+    private void runTimeRangeFilterTest(Map<String, String> timeRangeOptions,
+                                        int expectedDataSize,
+                                        Set<Long> expectedTimestamps)
+    {
+        Dataset<Row> data = bulkReaderDataFrame(twcsTable, 
timeRangeOptions).load();
+
+        List<Row> rows = data.collectAsList();
+        assertThat(rows.size()).isEqualTo(expectedDataSize);
+
+        Set<Long> allTimestamps = rows.stream()
+                                      .map(row -> row.getLong(2))
+                                      .collect(Collectors.toSet());
+
+        assertThat(expectedTimestamps.size()).isEqualTo(allTimestamps.size());
+        assertThat(expectedTimestamps).containsAll(allTimestamps);
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+        IInstance instance = cluster.getFirstRunningInstance();
+        ICoordinator coordinator = instance.coordinator();
+
+        // Initialize schema for SSTable time range filtering
+
+        // Create table with TWCS compaction strategy with compaction window 1 
minute
+        createTestTable(twcsTable, "CREATE TABLE IF NOT EXISTS %s (" +
+                                   "    id text PRIMARY KEY," +
+                                   "    data text," +
+                                   "    timestamp bigint" +
+                                   ") WITH compaction = {" +
+                                   "    'class': 
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'," +
+                                   "    'compaction_window_size': '1'," +
+                                   "    'compaction_window_unit': 'MINUTES'" +
+                                   "};");
+
+        createTestTable(lcsTable, "CREATE TABLE IF NOT EXISTS %s (" +
+                                  "    id text PRIMARY KEY," +
+                                  "    data text" +
+                                  ") WITH compaction = {" +
+                                  "    'class': 
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'" +
+                                  "};");
+        for (int i = 0; i < 10; i++)
+        {
+            String query = String.format("INSERT INTO %s (id, data) VALUES 
('%s', 'data_%s')", lcsTable, i, "data" + i);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+        instance.nodetool("flush", TEST_KEYSPACE, lcsTable.table());
+
+        // create 3 SSTables in 3 time windows, each SSTable created 2 mins 
apart
+        // Insert early data with early timestamps
+        for (int i = 0; i < DATA_SIZE; i++)
+        {
+            long timestamp = EARLY_TIMESTAMP_MICROS + i;
+            String query = String.format("INSERT INTO %s (id, data, timestamp) 
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+                                         twcsTable, i, "data" + i, 
EARLY_TIMESTAMP_MICROS, timestamp);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+
+        // Flush to create first SSTable
+        instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+
+        // Insert middle data with middle timestamps
+        for (int i = 0; i < DATA_SIZE; i++)
+        {
+            int id = DATA_SIZE + i;
+            long timestamp = MIDDLE_TIMESTAMP_MICROS + i;
+            String query = String.format("INSERT INTO %s (id, data, timestamp) 
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+                                         twcsTable, id, "data" + id, 
MIDDLE_TIMESTAMP_MICROS, timestamp);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+
+        // Flush to create second SSTable
+        instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+
+        // Insert late data with late timestamps
+        for (int i = 0; i < DATA_SIZE; i++)
+        {
+            int id = DATA_SIZE * 2 + i;
+            long timestamp = LATE_TIMESTAMP_MICROS + i;
+            String query = String.format("INSERT INTO %s (id, data, timestamp) 
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+                                         twcsTable, id, "data" + id, 
LATE_TIMESTAMP_MICROS, timestamp);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+
+        // Flush to create third SSTable
+        instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 0701ae28..6bbf7f14 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -61,6 +61,7 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.util.CompressionUtil;
 import org.jetbrains.annotations.NotNull;
@@ -88,6 +89,7 @@ public abstract class CassandraBridge
                                                                 @NotNull 
SSTablesSupplier ssTables,
                                                                 @Nullable 
SparkRangeFilter sparkRangeFilter,
                                                                 @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
+                                                                @NotNull 
SSTableTimeRangeFilter sstableTimeRangeFilter,
                                                                 @Nullable 
PruneColumnFilter columnFilter,
                                                                 @NotNull 
TimeProvider timeProvider,
                                                                 boolean 
readIndexOffset,
@@ -572,33 +574,37 @@ public abstract class CassandraBridge
     /**
      * Convenience method around `readPartitionKeys` to accept partition keys 
as string values and encode with the correct types.
      *
-     * @param partitioner Cassandra partitioner
-     * @param keyspace    keyspace name
-     * @param createStmt  create table CQL statement
-     * @param ssTables    set of SSTables to read
-     * @param rowConsumer Consumer interface to consume rows as they are read 
to avoid buffering all rows in memory for consumption.
+     * @param partitioner               Cassandra partitioner
+     * @param keyspace                  keyspace name
+     * @param createStmt                create table CQL statement
+     * @param ssTables                  set of SSTables to read
+     * @param sstableTimeRangeFilter    SSTable time range filter for 
filtering out SSTable based on min and max timestamp
+     * @param rowConsumer               Consumer interface to consume rows as 
they are read to avoid buffering all rows in memory for consumption.
      * @throws IOException
      */
     public void readStringPartitionKeys(@NotNull Partitioner partitioner,
                                         @NotNull String keyspace,
                                         @NotNull String createStmt,
                                         @NotNull Set<SSTable> ssTables,
+                                        @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                         @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
     {
-        readStringPartitionKeys(partitioner, keyspace, createStmt, ssTables, 
null, null, null, rowConsumer);
+        readStringPartitionKeys(partitioner, keyspace, createStmt, ssTables, 
null, null, null, sstableTimeRangeFilter, rowConsumer);
     }
 
     /**
      * Convenience method around `readPartitionKeys` to accept partition keys 
as string values and encode with the correct types.
      *
-     * @param partitioner       Cassandra partitioner
-     * @param keyspace          keyspace name
-     * @param createStmt        create table CQL statement
-     * @param ssTables          set of SSTables to read
-     * @param tokenRange        optional token range to limit the bulk read to 
a restricted token range.
-     * @param partitionKeys     list of partition keys, if more than one 
partition keys they must be correctly ordered in the inner list.
-     * @param pruneColumnFilter optional filter to select a subset of columns, 
this can offer performance improvement if skipping over large blobs or columns.
-     * @param rowConsumer       Consumer interface to consume rows as they are 
read to avoid buffering all rows in memory for consumption.
+     * @param partitioner               Cassandra partitioner
+     * @param keyspace                  keyspace name
+     * @param createStmt                create table CQL statement
+     * @param ssTables                  set of SSTables to read
+     * @param tokenRange                optional token range to limit the bulk 
read to a restricted token range.
+     * @param partitionKeys             list of partition keys, if more than 
one partition keys they must be correctly ordered in the inner list.
+     * @param pruneColumnFilter         optional filter to select a subset of 
columns, this can offer performance
+     *                                  improvement if skipping over large 
blobs or columns.
+     * @param sstableTimeRangeFilter    SSTable time range filter, filters out 
SSTables not overlapping given time ranges
+     * @param rowConsumer               Consumer interface to consume rows as 
they are read to avoid buffering all rows in memory for consumption.
      * @throws IOException
      */
     public void readStringPartitionKeys(@NotNull Partitioner partitioner,
@@ -608,6 +614,7 @@ public abstract class CassandraBridge
                                         @Nullable TokenRange tokenRange,
                                         @Nullable List<List<String>> 
partitionKeys,
                                         @Nullable String[] pruneColumnFilter,
+                                        @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                         @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
     {
         readPartitionKeys(partitioner,
@@ -617,6 +624,7 @@ public abstract class CassandraBridge
                           tokenRange,
                           partitionKeys == null ? null : 
encodePartitionKeys(partitioner, keyspace, createStmt, partitionKeys),
                           pruneColumnFilter,
+                          sstableTimeRangeFilter,
                           rowConsumer);
     }
 
@@ -624,9 +632,10 @@ public abstract class CassandraBridge
                                   @NotNull String keyspace,
                                   @NotNull String createStmt,
                                   @NotNull Set<SSTable> ssTables,
+                                  @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                   @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
     {
-        readPartitionKeys(partitioner, keyspace, createStmt, ssTables, null, 
null, null, rowConsumer);
+        readPartitionKeys(partitioner, keyspace, createStmt, ssTables, null, 
null, null, sstableTimeRangeFilter, rowConsumer);
     }
 
     public void readPartitionKeys(@NotNull Partitioner partitioner,
@@ -636,9 +645,11 @@ public abstract class CassandraBridge
                                   @Nullable TokenRange tokenRange,
                                   @Nullable List<ByteBuffer> partitionKeys,
                                   @Nullable String[] pruneColumnFilter,
+                                  @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                   @NotNull Consumer<Map<String, Object>> 
rowConsumer) throws IOException
     {
-        readPartitionKeys(partitioner, keyspace, createStmt, new 
BasicSupplier(ssTables), tokenRange, partitionKeys, pruneColumnFilter, 
rowConsumer);
+        readPartitionKeys(partitioner, keyspace, createStmt, new 
BasicSupplier(ssTables), tokenRange, partitionKeys,
+                          pruneColumnFilter, sstableTimeRangeFilter, 
rowConsumer);
     }
 
     public abstract void readPartitionKeys(@NotNull Partitioner partitioner,
@@ -648,6 +659,7 @@ public abstract class CassandraBridge
                                            @Nullable TokenRange tokenRange,
                                            @Nullable List<ByteBuffer> 
partitionKeys,
                                            @Nullable String[] 
pruneColumnFilter,
+                                           @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                            @NotNull Consumer<Map<String, 
Object>> rowConsumer) throws IOException;
 
     // Kryo/Java (De-)Serialization
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 1653bab8..88d5c6df 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -105,6 +105,7 @@ import org.apache.cassandra.spark.sparksql.RowIterator;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
 import org.apache.cassandra.spark.utils.TimeProvider;
@@ -198,6 +199,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                        @NotNull 
SSTablesSupplier ssTables,
                                                        @Nullable 
SparkRangeFilter sparkRangeFilter,
                                                        @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
+                                                       @NotNull 
SSTableTimeRangeFilter sstableTimeRangeFilter,
                                                        @Nullable 
PruneColumnFilter columnFilter,
                                                        @NotNull TimeProvider 
timeProvider,
                                                        boolean readIndexOffset,
@@ -211,6 +213,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
             return 
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
                                                                   
.withSparkRangeFilter(sparkRangeFilter)
                                                                   
.withPartitionKeyFilters(partitionKeyFilters)
+                                                                  
.withTimeRangeFilter(sstableTimeRangeFilter)
                                                                   
.withColumnFilter(columnFilter)
                                                                   
.withReadIndexOffset(readIndexOffset)
                                                                   
.withStats(stats)
@@ -443,6 +446,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                   @Nullable TokenRange tokenRange,
                                   @Nullable List<ByteBuffer> partitionKeys,
                                   @Nullable String[] requiredColumns,
+                                  @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                   Consumer<Map<String, Object>> rowConsumer) 
throws IOException
     {
         IPartitioner iPartitioner = getPartitioner(partitioner);
@@ -462,8 +466,9 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                 Stats.DoNothingStats.INSTANCE,
                                                 TypeConverter.IDENTITY,
                                                 partitionKeyFilters,
+                                                sstableTimeRangeFilter,
                                                 (t) -> 
PruneColumnFilter.of(requiredColumns),
-                                                (partitionId1, 
partitionKeyFilters1, columnFilter1) ->
+                                                (partitionId1, 
partitionKeyFilters1, timeRangeFilter1, columnFilter1) ->
                                                 new CompactionStreamScanner(
                                                 metadata,
                                                 partitioner,
@@ -471,6 +476,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                 ssTables.openAll((ssTable, 
isRepairPrimary) ->
                                                                  
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
                                                                                
                                 .withPartitionKeyFilters(partitionKeyFilters1)
+                                                                               
                                 .withTimeRangeFilter(timeRangeFilter1)
                                                                                
                                 .build())
                                                 ))
         {
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 03ab569d..3dbe44d1 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -83,6 +83,7 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
@@ -118,6 +119,8 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
     @NotNull
     private final List<PartitionKeyFilter> partitionKeyFilters;
     @NotNull
+    private final SSTableTimeRangeFilter sstableTimeRangeFilter;
+    @NotNull
     private final Stats stats;
     @Nullable
     private Long startOffset = null;
@@ -143,6 +146,8 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         SparkRangeFilter sparkRangeFilter = null;
         @NotNull
         final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
+        @NotNull
+        SSTableTimeRangeFilter sstableTimeRangeFilter = 
SSTableTimeRangeFilter.ALL;
 
         Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
         {
@@ -171,6 +176,15 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
             return this;
         }
 
+        public Builder withTimeRangeFilter(@Nullable SSTableTimeRangeFilter 
sstableTimeRangeFilter)
+        {
+            if (sstableTimeRangeFilter != null)
+            {
+                this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+            }
+            return this;
+        }
+
         public Builder withColumnFilter(@Nullable PruneColumnFilter 
columnFilter)
         {
             this.columnFilter = columnFilter;
@@ -213,6 +227,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                                      ssTable,
                                      sparkRangeFilter,
                                      partitionKeyFilters,
+                                     sstableTimeRangeFilter,
                                      columnFilter,
                                      readIndexOffset,
                                      stats,
@@ -232,6 +247,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                          @NotNull SSTable ssTable,
                          @Nullable SparkRangeFilter sparkRangeFilter,
                          @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+                         @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                          @Nullable PruneColumnFilter columnFilter,
                          boolean readIndexOffset,
                          @NotNull Stats stats,
@@ -304,6 +320,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
             header = null;
             helper = null;
             this.metadata = null;
+            this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
             return;
         }
 
@@ -330,6 +347,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                 header = null;
                 helper = null;
                 this.metadata = null;
+                this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
                 return;
             }
         }
@@ -348,6 +366,21 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         }
 
         this.statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+        if (!sstableTimeRangeFilter.overlaps(statsMetadata.minTimestamp, 
statsMetadata.maxTimestamp))
+        {
+            LOGGER.info("Ignoring SSTableReader with minTimestamp={} 
maxTimestamp={}, does not overlap with filter {}",
+                        this.statsMetadata.minTimestamp, 
this.statsMetadata.maxTimestamp, sstableTimeRangeFilter);
+            header = null;
+            helper = null;
+            this.metadata = null;
+            this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
+            return;
+        }
+        else
+        {
+            this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+        }
+
         SerializationHeader.Component headerComp = 
(SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
         if (headerComp == null)
         {
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 008f62f3..943a3b76 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -102,6 +102,7 @@ import org.apache.cassandra.spark.sparksql.RowIterator;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
 import org.apache.cassandra.spark.utils.TimeProvider;
@@ -195,6 +196,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                        @NotNull 
SSTablesSupplier ssTables,
                                                        @Nullable 
SparkRangeFilter sparkRangeFilter,
                                                        @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
+                                                       @NotNull 
SSTableTimeRangeFilter sstableTimeRangeFilter,
                                                        @Nullable 
PruneColumnFilter columnFilter,
                                                        @NotNull TimeProvider 
timeProvider,
                                                        boolean readIndexOffset,
@@ -208,6 +210,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
             return 
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
                                                                   
.withSparkRangeFilter(sparkRangeFilter)
                                                                   
.withPartitionKeyFilters(partitionKeyFilters)
+                                                                  
.withTimeRangeFilter(sstableTimeRangeFilter)
                                                                   
.withColumnFilter(columnFilter)
                                                                   
.withReadIndexOffset(readIndexOffset)
                                                                   
.withStats(stats)
@@ -425,6 +428,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                   @Nullable TokenRange tokenRange,
                                   @Nullable List<ByteBuffer> partitionKeys,
                                   @Nullable String[] requiredColumns,
+                                  @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                                   Consumer<Map<String, Object>> rowConsumer) 
throws IOException
     {
         IPartitioner iPartitioner = getPartitioner(partitioner);
@@ -444,8 +448,9 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                 Stats.DoNothingStats.INSTANCE,
                                                 TypeConverter.IDENTITY,
                                                 partitionKeyFilters,
+                                                sstableTimeRangeFilter,
                                                 (t) -> 
PruneColumnFilter.of(requiredColumns),
-                                                (partitionId1, 
partitionKeyFilters1, columnFilter1) ->
+                                                (partitionId1, 
partitionKeyFilters1, timeRangeFilter1, columnFilter1) ->
                                                 new CompactionStreamScanner(
                                                 metadata,
                                                 partitioner,
@@ -453,6 +458,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                 ssTables.openAll((ssTable, 
isRepairPrimary) ->
                                                                  
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
                                                                                
                                 .withPartitionKeyFilters(partitionKeyFilters1)
+                                                                               
                                 .withTimeRangeFilter(timeRangeFilter1)
                                                                                
                                 .build())
                                                 ))
         {
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 210df479..350657e7 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -81,6 +81,7 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.Pair;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
@@ -116,6 +117,8 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
     @NotNull
     private final List<PartitionKeyFilter> partitionKeyFilters;
     @NotNull
+    private final SSTableTimeRangeFilter sstableTimeRangeFilter;
+    @NotNull
     private final Stats stats;
     @Nullable
     private Long startOffset = null;
@@ -141,6 +144,8 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         SparkRangeFilter sparkRangeFilter = null;
         @NotNull
         final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
+        @NotNull
+        SSTableTimeRangeFilter sstableTimeRangeFilter = 
SSTableTimeRangeFilter.ALL;
 
         Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
         {
@@ -163,6 +168,15 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
             return this;
         }
 
+        public Builder withTimeRangeFilter(@Nullable SSTableTimeRangeFilter 
sstableTimeRangeFilter)
+        {
+            if (sstableTimeRangeFilter != null)
+            {
+                this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+            }
+            return this;
+        }
+
         public Builder withPartitionKeyFilter(@NotNull PartitionKeyFilter 
partitionKeyFilter)
         {
             partitionKeyFilters.add(partitionKeyFilter);
@@ -211,6 +225,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                                      ssTable,
                                      sparkRangeFilter,
                                      partitionKeyFilters,
+                                     sstableTimeRangeFilter,
                                      columnFilter,
                                      readIndexOffset,
                                      stats,
@@ -230,6 +245,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                          @NotNull SSTable ssTable,
                          @Nullable SparkRangeFilter sparkRangeFilter,
                          @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+                         @NotNull SSTableTimeRangeFilter 
sstableTimeRangeFilter,
                          @Nullable PruneColumnFilter columnFilter,
                          boolean readIndexOffset,
                          @NotNull Stats stats,
@@ -299,6 +315,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
             header = null;
             helper = null;
             this.metadata = null;
+            this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
             return;
         }
 
@@ -325,6 +342,7 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                 header = null;
                 helper = null;
                 this.metadata = null;
+                this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
                 return;
             }
         }
@@ -343,6 +361,21 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
         }
 
         this.statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+        if (!sstableTimeRangeFilter.overlaps(statsMetadata.minTimestamp, 
statsMetadata.maxTimestamp))
+        {
+            LOGGER.info("Ignoring SSTableReader with minTimestamp={} 
maxTimestamp={}, does not overlap with filter {}",
+                        this.statsMetadata.minTimestamp, 
this.statsMetadata.maxTimestamp, sstableTimeRangeFilter);
+            header = null;
+            helper = null;
+            this.metadata = null;
+            this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
+            return;
+        }
+        else
+        {
+            this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+        }
+
         SerializationHeader.Component headerComp = 
(SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
         if (headerComp == null)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to