This is an automated email from the ASF dual-hosted git repository.
saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f36653d CASSANALYTICS-102: Add TimeRangeFilter to filter out SSTables
outside given time window (#155)
3f36653d is described below
commit 3f36653d68abcbcea1e41de877dabe1493bc067f
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Thu Nov 20 09:45:54 2025 -0800
CASSANALYTICS-102: Add TimeRangeFilter to filter out SSTables outside given
time window (#155)
patch by Saranya Krishnakumar; reviewed by Yifan Cai, for CASSANALYTICS-102
---
CHANGES.txt | 1 +
.../cassandra/spark/sparksql/CellIterator.java | 12 +-
.../sparksql/filters/SSTableTimeRangeFilter.java | 147 ++++++++++++
.../org/apache/cassandra/spark/utils/CqlUtils.java | 36 ++-
.../filters/SSTableTimeRangeFilterTest.java | 141 ++++++++++++
.../org/apache/cassandra/spark/KryoRegister.java | 2 +
.../spark/bulkwriter/SortedSSTableWriter.java | 2 +
.../cassandra/spark/data/CassandraDataLayer.java | 32 ++-
.../apache/cassandra/spark/data/ClientConfig.java | 17 ++
.../org/apache/cassandra/spark/data/DataLayer.java | 18 +-
.../cassandra/spark/data/LocalDataLayer.java | 31 +++
.../spark/sparksql/SparkCellIterator.java | 1 +
.../apache/cassandra/spark/utils/FilterUtils.java | 19 ++
.../apache/cassandra/spark/SSTableReaderTests.java | 102 +++++++++
.../cassandra/spark/data/ClientConfigTests.java | 76 +++++++
.../cassandra/spark/data/LocalDataLayerTests.java | 121 ++++++++++
.../spark/data/PartitionedDataLayerTests.java | 4 +-
.../spark/reader/CassandraBridgeUtilTests.java | 2 +
.../spark/sparksql/SparkRowIteratorTests.java | 4 +-
.../apache/cassandra/spark/utils/CqlUtilsTest.java | 36 +++
.../BulkReaderFilteringIntegrationTest.java | 246 +++++++++++++++++++++
.../apache/cassandra/bridge/CassandraBridge.java | 44 ++--
.../bridge/CassandraBridgeImplementation.java | 8 +-
.../cassandra/spark/reader/SSTableReader.java | 33 +++
.../bridge/CassandraBridgeImplementation.java | 8 +-
.../cassandra/spark/reader/SSTableReader.java | 33 +++
26 files changed, 1144 insertions(+), 32 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f2bab39..56c088f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Add TimeRangeFilter to filter out SSTables outside given time window
(CASSANALYTICS-102)
* Generated distribution artifacts fix (CASSANALYTICS-105)
* Fix SSTable descriptor mismatch preventing newly produced SSTables from
being uploaded (CASSANALYTICS-98)
* Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
index 971b545e..59707a6a 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -76,13 +77,15 @@ public abstract class CellIterator implements
Iterator<Cell>, AutoCloseable
public interface ScannerSupplier
{
/**
- * @param partitionId arbitrary id uniquely identifying this
partiton of the bulk read
- * @param partitionKeyFilters list of partition key filters to
push-down,
- * @param columnFilter optional column filter to only read
certain columns
+ * @param partitionId arbitrary id uniquely identifying
this partiton of the bulk read
+ * @param partitionKeyFilters list of partition key filters to
push-down,
+ * @param sstableTimeRangeFilter sstable time range filter to
filter based on min and max timestamp
+ * @param columnFilter optional column filter to only
read certain columns
* @return a StreamScanner to iterate over each cell of the data.g
*/
StreamScanner<RowData> get(int partitionId,
@NotNull List<PartitionKeyFilter>
partitionKeyFilters,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@Nullable PruneColumnFilter columnFilter);
}
@@ -91,6 +94,7 @@ public abstract class CellIterator implements Iterator<Cell>,
AutoCloseable
Stats stats,
TypeConverter typeConverter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
+ @NotNull SSTableTimeRangeFilter sstableTimeRangeFilter,
Function<CqlTable, PruneColumnFilter>
columnFilterSupplier,
ScannerSupplier scannerSupplier)
{
@@ -116,7 +120,7 @@ public abstract class CellIterator implements
Iterator<Cell>, AutoCloseable
// Open compaction scanner
startTimeNanos = System.nanoTime();
previousTimeNanos = startTimeNanos;
- scanner = scannerSupplier.get(partitionId, partitionKeyFilters,
columnFilter);
+ scanner = scannerSupplier.get(partitionId, partitionKeyFilters,
sstableTimeRangeFilter, columnFilter);
long openTimeNanos = System.nanoTime() - startTimeNanos;
LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
stats.openedCompactionScanner(openTimeNanos);
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilter.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilter.java
new file mode 100644
index 00000000..6dd48413
--- /dev/null
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.collect.Range;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link SSTableTimeRangeFilter} to filter out based on timestamp in
microseconds.
+ * Uses Google Guava's Range internally for storing time range.
+ */
+public class SSTableTimeRangeFilter implements Serializable
+{
+ public static final SSTableTimeRangeFilter ALL
+ = new SSTableTimeRangeFilter(Range.closed(0L, Long.MAX_VALUE));
+
+ /**
+ * {@code timeRange} is range of timestamp values represented in
microseconds. Supports only closed range.
+ */
+ private final Range<Long> timeRange;
+ private final int hashcode;
+
+ /**
+ * Creates a {@link SSTableTimeRangeFilter} with given time {@link Range}
of timestamp values represented in
+ * microseconds.
+ */
+ private SSTableTimeRangeFilter(Range<Long> timeRange)
+ {
+ this(timeRange, Objects.hash(timeRange));
+ }
+
+ // for serialization
+ private SSTableTimeRangeFilter(Range<Long> timeRange, int hashcode)
+ {
+ this.timeRange = timeRange;
+ this.hashcode = hashcode;
+ }
+
+ /**
+ * Returns the underlying Range.
+ *
+ * @return the time range of timestamp values in microseconds.
+ */
+ @NotNull
+ public Range<Long> range()
+ {
+ return timeRange;
+ }
+
+ /**
+ * Determines if SSTable with min and max timestamp overlap with the
filter. SSTable is included if it
+ * overlaps with filter time range.
+ *
+ * @param startMicros SSTable min timestamp in microseconds (inclusive)
+ * @param endMicros SSTable max timestamp in microseconds (inclusive)
+ * @return true if the SSTable should be included, false if it should be
omitted.
+ */
+ public boolean overlaps(long startMicros, long endMicros)
+ {
+ // Creates a closed range with startMicros and endMicros
+ Range<Long> sstableTimeRange = Range.closed(startMicros, endMicros);
+
+ // Check if ranges are connected (overlap or adjacent)
+ return timeRange.isConnected(sstableTimeRange);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("TimeRangeFilter%s", timeRange.toString());
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (!(o instanceof SSTableTimeRangeFilter))
+ {
+ return false;
+ }
+ SSTableTimeRangeFilter that = (SSTableTimeRangeFilter) o;
+ return timeRange.equals(that.timeRange);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return hashcode;
+ }
+
+ /**
+ * Creates a {@link SSTableTimeRangeFilter} for a specific time range.
+ *
+ * @param startMicros the start timestamp in microseconds (inclusive)
+ * @param endMicros the end timestamp in microseconds (inclusive)
+ * @return {@link SSTableTimeRangeFilter} with both start and end
timestamps
+ */
+ @NotNull
+ public static SSTableTimeRangeFilter create(long startMicros, long
endMicros)
+ {
+ return new SSTableTimeRangeFilter(Range.closed(startMicros,
endMicros));
+ }
+
+ // Kryo
+
+ public static class Serializer extends
com.esotericsoftware.kryo.Serializer<SSTableTimeRangeFilter>
+ {
+ public SSTableTimeRangeFilter read(Kryo kryo, Input in,
Class<SSTableTimeRangeFilter> type)
+ {
+ return new SSTableTimeRangeFilter(Range.closed(in.readLong(),
in.readLong()), in.readInt());
+ }
+
+ public void write(Kryo kryo, Output out, SSTableTimeRangeFilter object)
+ {
+ out.writeLong(object.timeRange.lowerEndpoint());
+ out.writeLong(object.timeRange.upperEndpoint());
+ out.writeInt(object.hashcode);
+ }
+ }
+}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
index e694db67..9d989be5 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
@@ -57,6 +57,7 @@ public final class CqlUtils
private static final Pattern ESCAPED_WHITESPACE_PATTERN =
Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+");
private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n");
private static final Pattern ESCAPED_DOUBLE_BACKSLASH =
Pattern.compile("\\\\");
+ private static final Pattern COMPACTION_STRATEGY_PATTERN =
Pattern.compile("compaction\\s*=\\s*\\{\\s*'class'\\s*:\\s*'([^']+)'");
private CqlUtils()
{
@@ -147,7 +148,7 @@ public final class CqlUtils
{
String keyspace = matcher.group(1);
String table = matcher.group(2);
- createStmts.put(TableIdentifier.of(keyspace, table),
extractCleanedTableSchema(cleaned, keyspace, table));
+ createStmts.put(TableIdentifier.of(keyspace, table),
extractCleanedTableSchema(cleaned, keyspace, table, false));
}
return createStmts;
}
@@ -179,19 +180,20 @@ public final class CqlUtils
public static String extractTableSchema(@NotNull String schemaStr,
@NotNull String keyspace, @NotNull String table)
{
- return extractCleanedTableSchema(cleanCql(schemaStr), keyspace, table);
+ return extractCleanedTableSchema(cleanCql(schemaStr), keyspace, table,
false);
}
public static String extractCleanedTableSchema(@NotNull String
createStatementToClean,
@NotNull String keyspace,
- @NotNull String table)
+ @NotNull String table,
+ boolean withTableProps)
{
Pattern pattern = Pattern.compile(String.format("CREATE TABLE (IF NOT
EXISTS)? ?\"?%s?\"?\\.{1}\"?%s\"?[^;]*;", keyspace, table));
Matcher matcher = pattern.matcher(createStatementToClean);
if (matcher.find())
{
String fullSchema =
createStatementToClean.substring(matcher.start(0), matcher.end(0));
- String redactedSchema = removeTableProps(fullSchema);
+ String redactedSchema = withTableProps ? fullSchema :
removeTableProps(fullSchema);
String clustering = extractClustering(fullSchema);
String separator = " WITH ";
if (clustering != null)
@@ -268,4 +270,30 @@ public final class CqlUtils
}
return indexCount;
}
+
+ /**
+ * Extracts the compaction strategy used from table schema.
+ *
+ * @param tableSchema table schema
+ * @return the compaction strategy, or null if not found
+ */
+ public static String extractCompactionStrategy(@NotNull String tableSchema)
+ {
+ Matcher matcher = COMPACTION_STRATEGY_PATTERN.matcher(tableSchema);
+ if (matcher.find())
+ {
+ return matcher.group(1);
+ }
+ return null;
+ }
+
+ /**
+ * Time range filter is only supported for TimeWindowCompactionStrategy.
+ *
+ * @return true if the strategy is TimeWindowCompactionStrategy, false
otherwise
+ */
+ public static boolean isTimeRangeFilterSupported(String compactionStrategy)
+ {
+ return compactionStrategy == null ||
compactionStrategy.endsWith("TimeWindowCompactionStrategy");
+ }
}
diff --git
a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilterTest.java
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilterTest.java
new file mode 100644
index 00000000..52e2312f
--- /dev/null
+++
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/sparksql/filters/SSTableTimeRangeFilterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql.filters;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link SSTableTimeRangeFilter}
+ */
+class SSTableTimeRangeFilterTest
+{
+ @Test
+ void testCreation()
+ {
+ SSTableTimeRangeFilter filter = SSTableTimeRangeFilter.create(100L,
200L);
+ assertThat(filter.range().hasLowerBound()).isTrue();
+ assertThat(filter.range().hasUpperBound()).isTrue();
+ assertThat(filter.range().lowerEndpoint()).isEqualTo(100);
+ assertThat(filter.range().upperEndpoint()).isEqualTo(200);
+ }
+
+ @Test
+ void testThrowsExceptionWhenStartGreaterThanEnd()
+ {
+ assertThatThrownBy(() -> SSTableTimeRangeFilter.create(200L, 100L))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid range: [200‥100]");
+ }
+
+ @Test
+ void testFiltering()
+ {
+ SSTableTimeRangeFilter sstableRange =
SSTableTimeRangeFilter.create(100L, 200L);
+ assertThat(sstableRange.overlaps(0L, 50L)).isFalse();
+ assertThat(sstableRange.overlaps(0L, 100L)).isTrue();
+ assertThat(sstableRange.overlaps(250L, 300L)).isFalse();
+ assertThat(sstableRange.overlaps(200L, 300L)).isTrue();
+ assertThat(sstableRange.overlaps(120L, 180L)).isTrue();
+ assertThat(sstableRange.overlaps(50L, 150L)).isTrue();
+ assertThat(sstableRange.overlaps(150L, 250L)).isTrue();
+ assertThat(sstableRange.overlaps(50L, 250L)).isTrue();
+ }
+
+ @Test
+ void testToString()
+ {
+ SSTableTimeRangeFilter boundedRangeFilter =
SSTableTimeRangeFilter.create(100L, 200L);
+
assertThat(boundedRangeFilter.toString()).isEqualTo("TimeRangeFilter[100‥200]");
+
+
assertThat(SSTableTimeRangeFilter.ALL.toString()).isEqualTo("TimeRangeFilter[0‥9223372036854775807]");
+ }
+
+ @Test
+ void testEquals()
+ {
+ SSTableTimeRangeFilter filter1 = SSTableTimeRangeFilter.create(100L,
200L);
+ SSTableTimeRangeFilter filter2 = SSTableTimeRangeFilter.create(100L,
200L);
+ SSTableTimeRangeFilter filter4 = SSTableTimeRangeFilter.create(100L,
300L);
+
+ assertThat(filter1).isEqualTo(filter2);
+ assertThat(filter1).isNotEqualTo(filter4);
+ }
+
+ @Test
+ void testHashCode()
+ {
+ SSTableTimeRangeFilter filter1 = SSTableTimeRangeFilter.create(100L,
200L);
+ SSTableTimeRangeFilter filter2 = SSTableTimeRangeFilter.create(100L,
200L);
+ SSTableTimeRangeFilter filter3 = SSTableTimeRangeFilter.create(100L,
300L);
+
+ assertThat(filter1.hashCode()).isEqualTo(filter2.hashCode());
+ assertThat(filter1.hashCode()).isNotEqualTo(filter3.hashCode());
+ }
+
+ @Test
+ void testStartEndSerialization() throws Exception
+ {
+ SSTableTimeRangeFilter original = SSTableTimeRangeFilter.create(100L,
200L);
+ ByteArrayOutputStream baos = serialize(original);
+ SSTableTimeRangeFilter deserialized = deserialize(baos);
+ assertThat(deserialized).isEqualTo(original);
+ assertThat(deserialized.range().hasLowerBound()).isTrue();
+ assertThat(deserialized.range().lowerEndpoint()).isEqualTo(100L);
+ assertThat(deserialized.range().hasUpperBound()).isTrue();
+ assertThat(deserialized.range().upperEndpoint()).isEqualTo(200L);
+ }
+
+ @Test
+ void testEmptyFilter()
+ {
+ SSTableTimeRangeFilter emptyFilter = SSTableTimeRangeFilter.ALL;
+
+ assertThat(emptyFilter.overlaps(0L, 100L)).isTrue();
+ assertThat(emptyFilter.overlaps(100L, 200L)).isTrue();
+ assertThat(emptyFilter.overlaps(1000L, 2000L)).isTrue();
+ assertThat(emptyFilter.overlaps(0L, 0L)).isTrue();
+ }
+
+ private ByteArrayOutputStream serialize(SSTableTimeRangeFilter filter)
throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(filter);
+ oos.close();
+ return baos;
+ }
+
+ private SSTableTimeRangeFilter deserialize(ByteArrayOutputStream baos)
throws Exception
+ {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ SSTableTimeRangeFilter deserialized = (SSTableTimeRangeFilter)
ois.readObject();
+ ois.close();
+ return deserialized;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
index a259d23c..f5311216 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/KryoRegister.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.data.partitioner.CassandraRing;
import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoRegistrator;
import org.jetbrains.annotations.NotNull;
@@ -73,6 +74,7 @@ public class KryoRegister implements KryoRegistrator
KRYO_SERIALIZERS.put(CassandraDataLayer.class, new
CassandraDataLayer.Serializer());
KRYO_SERIALIZERS.put(BigNumberConfigImpl.class, new
BigNumberConfigImpl.Serializer());
KRYO_SERIALIZERS.put(SslConfig.class, new SslConfig.Serializer());
+ KRYO_SERIALIZERS.put(SSTableTimeRangeFilter.class, new
SSTableTimeRangeFilter.Serializer());
}
public static <T> void addSerializer(@NotNull Class<T> type, @NotNull
Serializer<T> serializer)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
index 7cdaca01..bbea732e 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.spark.data.LocalDataLayer;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.DigestAlgorithm;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -264,6 +265,7 @@ public class SortedSSTableWriter
Collections.emptyList()
/* requestedFeatures */,
false /*
useSSTableInputStream */,
null /* statsClass */,
+
SSTableTimeRangeFilter.ALL,
outputDirectory.toString());
if (dataFilePaths != null)
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 67b06dd6..b6e84c6f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -88,6 +88,7 @@ import
org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
import org.apache.cassandra.spark.sparksql.RowBuilder;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.CqlUtils;
import org.apache.cassandra.spark.utils.ReaderTimeProvider;
import org.apache.cassandra.spark.utils.ScalaFunctions;
@@ -103,6 +104,7 @@ import org.apache.spark.util.ShutdownHookManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupported;
import static
org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED;
public class CassandraDataLayer extends PartitionedDataLayer implements
StartupValidatable, Serializable
@@ -144,6 +146,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
protected transient SidecarClient sidecar;
private SslConfig sslConfig;
+ private SSTableTimeRangeFilter sstableTimeRangeFilter;
@VisibleForTesting
transient Map<String, SidecarInstance> sidecarInstanceMap;
@@ -167,6 +170,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
this.useIncrementalRepair = options.useIncrementalRepair();
this.lastModifiedTimestampField = options.lastModifiedTimestampField();
this.requestedFeatures = options.requestedFeatures();
+ this.sstableTimeRangeFilter = options.sstableTimeRangeFilter;
}
// For serialization
@@ -193,7 +197,8 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
@Nullable String lastModifiedTimestampField,
List<SchemaFeature> requestedFeatures,
@NotNull Map<String, ReplicationFactor> rfMap,
- TimeProvider timeProvider)
+ TimeProvider timeProvider,
+ SSTableTimeRangeFilter sstableTimeRangeFilter)
{
super(consistencyLevel, datacenter);
this.snapshotName = snapshotName;
@@ -219,6 +224,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
}
this.rfMap = rfMap;
this.timeProvider = timeProvider;
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
this.maybeQuoteKeyspaceAndTable();
this.initSidecarClient();
this.initInstanceMap();
@@ -292,6 +298,17 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
int indexCount = CqlUtils.extractIndexCount(fullSchema, keyspace,
table);
Set<String> udts = CqlUtils.extractUdts(fullSchema, keyspace);
ReplicationFactor replicationFactor =
CqlUtils.extractReplicationFactor(fullSchema, keyspace);
+
+ String tableSchemaWithProps =
CqlUtils.extractCleanedTableSchema(fullSchema, keyspace, table, true);
+ String compactionStrategy =
CqlUtils.extractCompactionStrategy(tableSchemaWithProps);
+ if (sstableTimeRangeFilter != null
+ && sstableTimeRangeFilter != SSTableTimeRangeFilter.ALL
+ && !isTimeRangeFilterSupported(compactionStrategy))
+ {
+ throw new UnsupportedOperationException("SSTableTimeRangeFilter is
only supported with TimeWindowCompactionStrategy. " +
+ "Current compaction
strategy is: " + compactionStrategy);
+ }
+
rfMap = Map.of(keyspace, replicationFactor);
CompletableFuture<Integer> sizingFuture =
CompletableFuture.supplyAsync(
() -> getSizing(ringFuture, replicationFactor,
options).getEffectiveNumberOfCores(),
@@ -497,6 +514,13 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
return null;
}
+ @NotNull
+ @Override
+ public SSTableTimeRangeFilter sstableTimeRangeFilter()
+ {
+ return sstableTimeRangeFilter;
+ }
+
@Override
public CqlTable cqlTable()
{
@@ -748,6 +772,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
}
this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
this.timeProvider = new ReaderTimeProvider(in.readInt());
+ this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject();
this.maybeQuoteKeyspaceAndTable();
this.initSidecarClient();
this.initInstanceMap();
@@ -793,6 +818,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
}
out.writeObject(this.rfMap);
out.writeInt(timeProvider.referenceEpochInSeconds());
+ out.writeObject(this.sstableTimeRangeFilter);
}
private static void writeNullable(ObjectOutputStream out, @Nullable String
string) throws IOException
@@ -868,6 +894,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
kryo.writeObject(out, listWrapper);
kryo.writeObject(out, dataLayer.rfMap);
out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds());
+ kryo.writeObject(out, dataLayer.sstableTimeRangeFilter);
}
@SuppressWarnings("unchecked")
@@ -909,7 +936,8 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
in.readString(),
kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
kryo.readObject(in, HashMap.class),
- new ReaderTimeProvider(in.readInt()));
+ new ReaderTimeProvider(in.readInt()),
+ kryo.readObject(in, SSTableTimeRangeFilter.class));
}
// Wrapper only used internally for Kryo serialization/deserialization
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
index 800efb9e..4ca6c921 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/ClientConfig.java
@@ -33,11 +33,13 @@ import org.apache.cassandra.bridge.BigNumberConfigImpl;
import org.apache.cassandra.spark.config.SchemaFeature;
import org.apache.cassandra.spark.config.SchemaFeatureSet;
import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.MapUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static
org.apache.cassandra.spark.data.CassandraDataLayer.aliasLastModifiedTimestamp;
+import static
org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
public class ClientConfig
{
@@ -81,6 +83,14 @@ public class ClientConfig
public static final String ENABLE_EXPANSION_SHRINK_CHECK_KEY =
"enableExpansionShrinkCheck";
public static final String SIDECAR_PORT = "sidecar_port";
public static final String QUOTE_IDENTIFIERS = "quote_identifiers";
+ /**
+ * {@code sstable_start_timestamp_micros} and {@code
sstable_end_timestamp_micros} define a time range filter
+ * for SSTable selection. Both timestamps are represented in microseconds
and both bounds are always inclusive
+ * (closed range).
+ */
+ public static final String SSTABLE_START_TIMESTAMP_MICROS =
"sstable_start_timestamp_micros";
+ public static final String SSTABLE_END_TIMESTAMP_MICROS =
"sstable_end_timestamp_micros";
+
public static final int DEFAULT_SIDECAR_PORT = 9043;
protected String sidecarContactPoints;
@@ -107,6 +117,7 @@ public class ClientConfig
protected Boolean enableExpansionShrinkCheck;
protected int sidecarPort;
protected boolean quoteIdentifiers;
+ protected SSTableTimeRangeFilter sstableTimeRangeFilter;
protected ClientConfig(Map<String, String> options)
{
@@ -138,6 +149,7 @@ public class ClientConfig
this.requestedFeatures = initRequestedFeatures(options);
this.sidecarPort = MapUtils.getInt(options, SIDECAR_PORT,
DEFAULT_SIDECAR_PORT);
this.quoteIdentifiers = MapUtils.getBoolean(options,
QUOTE_IDENTIFIERS, false);
+ this.sstableTimeRangeFilter = parseSSTableTimeRangeFilter(options);
}
protected String parseSidecarContactPoints(Map<String, String> options)
@@ -277,6 +289,11 @@ public class ClientConfig
return quoteIdentifiers;
}
+ public SSTableTimeRangeFilter sstableTimeRangeFilter()
+ {
+ return sstableTimeRangeFilter;
+ }
+
public static ClientConfig create(Map<String, String> options)
{
return new ClientConfig(options);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
index 6878f869..394913ac 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
@@ -47,6 +47,7 @@ import
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.Filter;
@@ -198,6 +199,17 @@ public abstract class DataLayer implements Serializable
return null;
}
+ /**
+ * Returns {@link SSTableTimeRangeFilter} to filter out SSTables based on
min and max timestamp.
+ *
+ * @return {@link SSTableTimeRangeFilter}
+ */
+ @NotNull
+ public SSTableTimeRangeFilter sstableTimeRangeFilter()
+ {
+ return SSTableTimeRangeFilter.ALL;
+ }
+
/**
* DataLayer implementation should provide an ExecutorService for doing
blocking I/O
* when opening SSTable readers.
@@ -224,9 +236,9 @@ public abstract class DataLayer implements Serializable
*/
public abstract String jobId();
- public StreamScanner openCompactionScanner(int partitionId,
List<PartitionKeyFilter> partitionKeyFilters)
+ public StreamScanner openCompactionScanner(int partitionId,
List<PartitionKeyFilter> partitionKeyFilters, SSTableTimeRangeFilter
sstableTimeRangeFilter)
{
- return openCompactionScanner(partitionId, partitionKeyFilters, null);
+ return openCompactionScanner(partitionId, partitionKeyFilters,
sstableTimeRangeFilter, null);
}
/**
@@ -262,6 +274,7 @@ public abstract class DataLayer implements Serializable
*/
public StreamScanner<RowData> openCompactionScanner(int partitionId,
List<PartitionKeyFilter> partitionKeyFilters,
+ SSTableTimeRangeFilter
sstableTimeRangeFilter,
@Nullable
PruneColumnFilter columnFilter)
{
List<PartitionKeyFilter> filtersInRange;
@@ -279,6 +292,7 @@ public abstract class DataLayer implements Serializable
sstables(partitionId,
sparkRangeFilter, filtersInRange),
sparkRangeFilter,
filtersInRange,
+ sstableTimeRangeFilter,
columnFilter,
timeProvider(),
readIndexOffset(),
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index bef35240..a6d44904 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -59,12 +59,17 @@ import
org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
+import org.apache.cassandra.spark.utils.CqlUtils;
import org.apache.cassandra.spark.utils.Throwing;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.parquet.Strings;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupported;
+import static
org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
+
/**
* Basic DataLayer implementation to read SSTables from local file system.
Mostly used for testing.
*/
@@ -83,6 +88,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
private boolean useBufferingInputStream;
private String[] paths;
private int minimumReplicasPerMutation = 1;
+ private SSTableTimeRangeFilter sstableTimeRangeFilter;
private Set<Path> dataFilePaths = null;
@Nullable
@@ -178,6 +184,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
SchemaFeatureSet.initializeFromOptions(options),
getBoolean(options, lowerCaseKey("useBufferingInputStream"),
getBoolean(options, lowerCaseKey("useSSTableInputStream"), false)),
options.get(lowerCaseKey("statsClass")),
+ parseSSTableTimeRangeFilter(options),
getOrThrow(options, lowerCaseKey("dirs")).split(","));
}
@@ -194,6 +201,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
Collections.emptyList(),
false,
null,
+ SSTableTimeRangeFilter.ALL,
paths);
}
@@ -211,6 +219,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
Collections.emptyList(),
false,
null,
+ SSTableTimeRangeFilter.ALL,
paths);
}
@@ -223,6 +232,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
@NotNull List<SchemaFeature> requestedFeatures,
boolean useBufferingInputStream,
@Nullable String statsClass,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
String... paths)
{
this.bridge = CassandraBridgeFactory.get(version);
@@ -237,6 +247,14 @@ public class LocalDataLayer extends DataLayer implements
Serializable
this.requestedFeatures = requestedFeatures;
this.useBufferingInputStream = useBufferingInputStream;
this.statsClass = statsClass;
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+ String compactionStrategy =
CqlUtils.extractCompactionStrategy(cqlTable.createStatement());
+ if (sstableTimeRangeFilter != SSTableTimeRangeFilter.ALL
+ && !isTimeRangeFilterSupported(compactionStrategy))
+ {
+ throw new UnsupportedOperationException("SSTableTimeRangeFilter is
only supported with TimeWindowCompactionStrategy. " +
+ "Current compaction
strategy is: " + compactionStrategy);
+ }
this.paths = paths;
this.dataFilePaths = new HashSet<>();
}
@@ -249,6 +267,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
@NotNull List<SchemaFeature> requestedFeatures,
boolean useBufferingInputStream,
@Nullable String statsClass,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
String... paths)
{
this.bridge = CassandraBridgeFactory.get(version);
@@ -258,6 +277,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
this.requestedFeatures = requestedFeatures;
this.useBufferingInputStream = useBufferingInputStream;
this.statsClass = statsClass;
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
this.paths = paths;
}
@@ -291,6 +311,13 @@ public class LocalDataLayer extends DataLayer implements
Serializable
return true;
}
+ @NotNull
+ @Override
+ public SSTableTimeRangeFilter sstableTimeRangeFilter()
+ {
+ return sstableTimeRangeFilter;
+ }
+
@Override
public TimeProvider timeProvider()
{
@@ -429,6 +456,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
{
out.writeUTF(this.statsClass);
}
+ out.writeObject(this.sstableTimeRangeFilter);
out.writeObject(this.paths);
out.writeInt(this.minimumReplicasPerMutation);
}
@@ -445,6 +473,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
.collect(Collectors.toList());
this.useBufferingInputStream = in.readBoolean();
this.statsClass = in.readBoolean() ? in.readUTF() : null;
+ this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject();
this.paths = (String[]) in.readObject();
this.minimumReplicasPerMutation = in.readInt();
}
@@ -465,6 +494,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
.toArray(String[]::new));
out.writeBoolean(object.useBufferingInputStream);
out.writeString(object.statsClass);
+ kryo.writeObject(out, object.sstableTimeRangeFilter);
kryo.writeObject(out, object.paths);
out.writeInt(object.minimumReplicasPerMutation);
}
@@ -482,6 +512,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
.collect(Collectors.toList()),
in.readBoolean(),
in.readString(),
+ kryo.readObject(in, SSTableTimeRangeFilter.class),
kryo.readObject(in, String[].class)
).withMinimumReplicasPerMutation(in.readInt());
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
index e5d164ca..35fbc162 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
@@ -53,6 +53,7 @@ public class SparkCellIterator extends CellIterator
dataLayer.stats(),
dataLayer.typeConverter(),
partitionKeyFilters,
+ dataLayer.sstableTimeRangeFilter(),
(cqlTable) -> buildColumnFilter(requiredSchema, cqlTable),
dataLayer::openCompactionScanner);
this.dataLayer = dataLayer;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
index 05e4812c..66acb812 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/FilterUtils.java
@@ -31,10 +31,14 @@ import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.In;
+import static
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_END_TIMESTAMP_MICROS;
+import static
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_START_TIMESTAMP_MICROS;
+
public final class FilterUtils
{
private FilterUtils()
@@ -111,4 +115,19 @@ public final class FilterUtils
}
}
}
+
+ /**
+ * Parses {@link SSTableTimeRangeFilter} from spark options.
+ */
+ public static SSTableTimeRangeFilter
parseSSTableTimeRangeFilter(Map<String, String> options)
+ {
+ if (!options.containsKey(SSTABLE_START_TIMESTAMP_MICROS) &&
!options.containsKey(SSTABLE_END_TIMESTAMP_MICROS))
+ {
+ return SSTableTimeRangeFilter.ALL;
+ }
+
+ long startTimestamp = MapUtils.getLong(options,
SSTABLE_START_TIMESTAMP_MICROS, 0L);
+ long endTimestamp = MapUtils.getLong(options,
SSTABLE_END_TIMESTAMP_MICROS, Long.MAX_VALUE);
+ return SSTableTimeRangeFilter.create(startTimestamp, endTimestamp);
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
index d8b04f73..1bec25ea 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.cassandra.spark.utils.test.TestSchema;
@@ -105,6 +106,7 @@ public class SSTableReaderTests
ssTableSupplier,
null,
Collections.emptyList(),
+
SSTableTimeRangeFilter.ALL,
null,
navigatableTimeProvider,
false,
@@ -143,4 +145,104 @@ public class SSTableReaderTests
.forAll(TestUtils.partitioners())
.checkAssert(partitioner -> runTest(partitioner, bridgeForTest, test));
}
+
+ // TimeRangeFilter Tests
+
+ @ParameterizedTest
+ @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+ public void testTimeRangeFilterSkipsSSTablesOutsideRange(CassandraBridge
bridge)
+ {
+ // Use a reference time 5 days in the past
+ long fiveDaysAgoMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()) -
TimeUnit.DAYS.toMicros(5);
+
+ // Create time range filter that excludes the SSTable (range from 5
days ago to 4 days ago)
+ SSTableTimeRangeFilter excludingFilter = SSTableTimeRangeFilter.create(
+ fiveDaysAgoMicros, //
5 days ago
+ fiveDaysAgoMicros + TimeUnit.DAYS.toMicros(1) // 4 days
ago
+ );
+
+ // Should read 0 rows because SSTable is outside time range
+ testSSTableFiltering(bridge, excludingFilter, 0);
+ }
+
+ @ParameterizedTest
+ @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+ public void testTimeRangeFilterIncludesSSTablesWithinRange(CassandraBridge
bridge)
+ {
+ // Use a reference time 5 days in the past
+ long fiveDaysAgoMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())
+ - TimeUnit.DAYS.toMicros(5);
+
+ // Create time range filter from 5 days ago to 1 day in future - will
include current SSTable
+ SSTableTimeRangeFilter includingFilter = SSTableTimeRangeFilter.create(
+ fiveDaysAgoMicros, // 5
days ago
+ fiveDaysAgoMicros + TimeUnit.DAYS.toMicros(6) // 1 day in
future
+ );
+
+ // Should read all 10 rows because SSTable timestamp is within the
filter range
+ testSSTableFiltering(bridge, includingFilter, 10);
+ }
+
+ @ParameterizedTest
+ @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+ public void testNoTimeRangeFilterReadsAllSSTables(CassandraBridge bridge)
+ {
+ // Should read all 10 rows with ALL filter
+ testSSTableFiltering(bridge, SSTableTimeRangeFilter.ALL, 10);
+ }
+
+ private void testSSTableFiltering(CassandraBridge bridgeForTest,
+ SSTableTimeRangeFilter filter,
+ int expectedRowCount)
+ {
+ TestRunnable test = (partitioner, dir, bridgeInTest) -> {
+ TestSchema schema = TestSchema.builder(bridgeInTest)
+ .withPartitionKey("a",
bridgeInTest.aInt())
+ .withColumn("b", bridgeInTest.aInt())
+ .build();
+
+ // Write SSTable with data
+ schema.writeSSTable(dir, bridgeInTest, partitioner, writer -> {
+ for (int i = 0; i < 10; i++)
+ {
+ writer.write(i, i);
+ }
+ });
+
+ assertThat(countSSTables(dir)).isEqualTo(1);
+
+ CqlTable table = schema.buildTable();
+ TestDataLayer dataLayer = new TestDataLayer(bridgeInTest,
+ getFileType(dir,
FileType.DATA).collect(Collectors.toList()), table);
+ BasicSupplier ssTableSupplier = new
BasicSupplier(dataLayer.listSSTables().collect(Collectors.toSet()));
+
+ int rowCount = 0;
+ try (StreamScanner<RowData> scanner =
bridgeInTest.getCompactionScanner(
+ table, partitioner, ssTableSupplier, null, Collections.emptyList(),
+ filter, null, TimeProvider.DEFAULT,
+ false, false, Stats.DoNothingStats.INSTANCE))
+ {
+ RowData rowData = scanner.data();
+ while (scanner.next())
+ {
+ scanner.advanceToNextColumn();
+
+ ByteBuffer colBuf = rowData.getColumnName();
+ String colName =
ByteBufferUtils.string(ByteBufferUtils.readBytesWithShortLength(colBuf));
+ colBuf.get();
+ if (StringUtils.isEmpty(colName))
+ {
+ continue;
+ }
+
+ rowCount++;
+ }
+ }
+
+ assertThat(rowCount).isEqualTo(expectedRowCount);
+ };
+
+ qt().forAll(TestUtils.partitioners())
+ .checkAssert(partitioner -> runTest(partitioner, bridgeForTest,
test));
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
index f9f6b614..b42e33d2 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/ClientConfigTests.java
@@ -23,10 +23,14 @@ import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
+
+import static com.google.common.collect.BoundType.CLOSED;
import static
org.apache.cassandra.spark.data.ClientConfig.SNAPSHOT_TTL_PATTERN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -132,4 +136,76 @@ class ClientConfigTests
clearSnapshot,
clearSnapshotStrategyOption);
}
+
+ @Test
+ void testEmptyTimeRangeFilterWhenNoTimestampsProvided()
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ ClientConfig clientConfig = ClientConfig.create(options);
+ SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+ assertThat(filter).isEqualTo(SSTableTimeRangeFilter.ALL);
+ }
+
+ @Test
+ void testTimeRangeFilterWithBothTimestamps()
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("sstable_start_timestamp_micros", "1000");
+ options.put("sstable_end_timestamp_micros", "2000");
+ ClientConfig clientConfig = ClientConfig.create(options);
+ SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+
+ assertThat(filter).isNotNull();
+ assertThat(filter.range().hasLowerBound()).isTrue();
+ assertThat(filter.range().hasUpperBound()).isTrue();
+ assertThat(filter.range().lowerEndpoint()).isEqualTo(1000L);
+ assertThat(filter.range().upperEndpoint()).isEqualTo(2000L);
+ assertThat(filter.range().lowerBoundType()).isEqualTo(CLOSED);
+ assertThat(filter.range().upperBoundType()).isEqualTo(CLOSED);
+ assertThat(clientConfig.sstableTimeRangeFilter()).isNotNull();
+
assertThat(clientConfig.sstableTimeRangeFilter().range().lowerEndpoint()).isEqualTo(1000L);
+
assertThat(clientConfig.sstableTimeRangeFilter().range().upperEndpoint()).isEqualTo(2000L);
+ }
+
+ @Test
+ void testTimeRangeFilterWithOnlyStartTimestamp()
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("sstable_start_timestamp_micros", "5000");
+ ClientConfig clientConfig = ClientConfig.create(options);
+ SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+
+ assertThat(filter).isNotNull();
+ assertThat(filter.range().hasLowerBound()).isTrue();
+ assertThat(filter.range().lowerEndpoint()).isEqualTo(5000L);
+ assertThat(filter.range().hasUpperBound()).isTrue();
+ assertThat(filter.range().upperEndpoint()).isEqualTo(Long.MAX_VALUE);
+ }
+
+ @Test
+ void testTimeRangeFilterWithOnlyEndTimestamp()
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("sstable_end_timestamp_micros", "3000");
+ ClientConfig clientConfig = ClientConfig.create(options);
+ SSTableTimeRangeFilter filter = clientConfig.sstableTimeRangeFilter();
+
+ assertThat(filter).isNotNull();
+ assertThat(filter.range().hasLowerBound()).isTrue();
+ assertThat(filter.range().hasUpperBound()).isTrue();
+ assertThat(filter.range().lowerEndpoint()).isEqualTo(0L);
+ assertThat(filter.range().upperEndpoint()).isEqualTo(3000L);
+ }
+
+ @Test
+ void testTimeRangeFilterValidatesRangeOrdering()
+ {
+ Map<String, String> options = new
HashMap<>(REQUIRED_CLIENT_CONFIG_OPTIONS);
+ options.put("sstable_start_timestamp_micros", "2000");
+ options.put("sstable_end_timestamp_micros", "1000");
+
+ assertThatThrownBy(() -> ClientConfig.create(options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid range: [2000‥1000]");
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
index 863d6c57..f465f033 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/LocalDataLayerTests.java
@@ -19,13 +19,19 @@
package org.apache.cassandra.spark.data;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
@@ -35,9 +41,12 @@ import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.reader.SchemaTests;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static com.google.common.collect.BoundType.CLOSED;
public class LocalDataLayerTests extends VersionRunner
{
@@ -88,4 +97,116 @@ public class LocalDataLayerTests extends VersionRunner
assertThat(dataLayer2).isEqualTo(dataLayer1);
assertThat(dataLayer2.hashCode()).isEqualTo(dataLayer1.hashCode());
}
+
+ @ParameterizedTest
+ @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+ public void testTimeRangeFilterFromOptions(CassandraBridge bridge)
+ {
+ String schemaWithTWCS = schemaWithTWCS();
+
+ Map<String, String> options = new HashMap<>();
+ options.put("version", bridge.getVersion().name());
+ options.put("partitioner", Partitioner.Murmur3Partitioner.name());
+ options.put("keyspace", "test_keyspace");
+ options.put("createstmt", schemaWithTWCS);
+ options.put("dirs", "/tmp/data1,/tmp/data2");
+ options.put("sstable_start_timestamp_micros", "1000");
+ options.put("sstable_end_timestamp_micros", "2000");
+
+ LocalDataLayer dataLayer = LocalDataLayer.from(options);
+
+ SSTableTimeRangeFilter filter = dataLayer.sstableTimeRangeFilter();
+ assertThat(filter.range().lowerEndpoint()).isEqualTo(1000L);
+ assertThat(filter.range().upperEndpoint()).isEqualTo(2000L);
+ }
+
+ @ParameterizedTest
+ @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+ public void testTimeRangeFilterNotSupportedWithLCS(CassandraBridge bridge)
+ {
+ String schemaWithLeveledCompaction = "CREATE TABLE
test_keyspace.test_table (\n"
+ + " id uuid,\n"
+ + " value text,\n"
+ + " PRIMARY KEY(id)\n"
+ + ") WITH compaction = {'class':
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}";
+
+ CassandraVersion version = bridge.getVersion();
+ SSTableTimeRangeFilter filter = SSTableTimeRangeFilter.create(1000L,
2000L);
+
+ assertThatThrownBy(() -> new LocalDataLayer(
+ version,
+ Partitioner.Murmur3Partitioner,
+ "test_keyspace",
+ schemaWithLeveledCompaction,
+ Collections.emptySet(),
+ Collections.emptyList(),
+ false,
+ null,
+ filter,
+ "/tmp/data1", "/tmp/data2"
+ ))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("SSTableTimeRangeFilter is only supported with
TimeWindowCompactionStrategy. " +
+ "Current compaction strategy is:
org.apache.cassandra.db.compaction.LeveledCompactionStrategy");
+ }
+
+ @ParameterizedTest
+ @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
+ public void testSerializationWithTimeRangeFilter(CassandraBridge bridge)
throws Exception
+ {
+ // Use TimeWindowCompactionStrategy since time range filters are only
supported with TWCS
+ String schemaWithTWCS = schemaWithTWCS();
+
+ CassandraVersion version = bridge.getVersion();
+ SSTableTimeRangeFilter filter = SSTableTimeRangeFilter.create(1000L,
2000L);
+ LocalDataLayer dataLayer = new LocalDataLayer(
+ version,
+ Partitioner.Murmur3Partitioner,
+ "test_keyspace",
+ schemaWithTWCS,
+ Collections.emptySet(),
+ Collections.emptyList(),
+ false,
+ null,
+ filter,
+ "/tmp/data1", "/tmp/data2"
+ );
+
+ ByteArrayOutputStream baos = serialize(dataLayer);
+ LocalDataLayer deserialized = deserialize(baos);
+
+ SSTableTimeRangeFilter deserializedFilter =
deserialized.sstableTimeRangeFilter();
+ assertThat(deserializedFilter).isEqualTo(filter);
+
assertThat(deserializedFilter.range().lowerEndpoint()).isEqualTo(1000L);
+
assertThat(deserializedFilter.range().upperEndpoint()).isEqualTo(2000L);
+
assertThat(deserializedFilter.range().lowerBoundType()).isEqualTo(CLOSED);
+
assertThat(deserializedFilter.range().upperBoundType()).isEqualTo(CLOSED);
+ }
+
+ private ByteArrayOutputStream serialize(LocalDataLayer dataLayer) throws
Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(dataLayer);
+ oos.close();
+ return baos;
+ }
+
+ private LocalDataLayer deserialize(ByteArrayOutputStream baos) throws
Exception
+ {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ LocalDataLayer deserialized = (LocalDataLayer) ois.readObject();
+ ois.close();
+ return deserialized;
+ }
+
+ private String schemaWithTWCS()
+ {
+ return "CREATE TABLE test_keyspace.test_table2 (\n"
+ + " id uuid,\n"
+ + " value text,\n"
+ + " PRIMARY KEY(id)\n"
+ + ") WITH compaction = {'class':
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}";
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
index b1169963..2768b146 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
@@ -52,6 +52,7 @@ import
org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
import org.apache.cassandra.spark.reader.EmptyStreamScanner;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.test.TestSchema;
import org.apache.spark.TaskContext;
@@ -303,7 +304,8 @@ public class PartitionedDataLayerTests extends VersionRunner
// Filter does not fall in spark token range
StreamScanner scanner = dataLayer.openCompactionScanner(partitionId,
-
Collections.singletonList(filterOutsideRange));
+
Collections.singletonList(filterOutsideRange),
+
SSTableTimeRangeFilter.ALL);
assertThat(scanner).isInstanceOf(EmptyStreamScanner.class);
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
index df059167..b9cb8449 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.spark.TestUtils;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.test.TestSSTable;
import org.apache.cassandra.spark.utils.test.TestSchema;
@@ -261,6 +262,7 @@ public class CassandraBridgeUtilTests
null,
expected.keySet().stream().map(Collections::singletonList).collect(Collectors.toList()),
null,
+ SSTableTimeRangeFilter.ALL,
(row) ->
actual.put(row.get("a").toString(), row)
);
assertThat(actual).hasSameSizeAs(expected);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
index d9ca8b18..e13aeb52 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.test.TestSchema;
@@ -189,6 +190,7 @@ public class SparkRowIteratorTests
when(dataLayer.stats()).thenReturn(Stats.DoNothingStats.INSTANCE);
when(dataLayer.requestedFeatures()).thenCallRealMethod();
when(dataLayer.typeConverter()).thenReturn(typeConverter);
+
when(dataLayer.sstableTimeRangeFilter()).thenReturn(SSTableTimeRangeFilter.ALL);
// Mock scanner
StreamScanner scanner = mock(StreamScanner.class);
@@ -251,7 +253,7 @@ public class SparkRowIteratorTests
return true;
}).when(scanner).next();
- when(dataLayer.openCompactionScanner(anyInt(),
anyListOf(PartitionKeyFilter.class), any())).thenReturn(scanner);
+ when(dataLayer.openCompactionScanner(anyInt(),
anyListOf(PartitionKeyFilter.class), any(), any())).thenReturn(scanner);
// Use SparkRowIterator and verify values match expected
SparkRowIterator it = new SparkRowIterator(0, dataLayer);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
index 17b7d3ce..5bb363d9 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.VersionRunner;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import static
org.apache.cassandra.spark.utils.CqlUtils.extractCompactionStrategy;
+import static
org.apache.cassandra.spark.utils.CqlUtils.isTimeRangeFilterSupported;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -797,6 +799,40 @@ public class CqlUtilsTest extends VersionRunner
}
+ @Test
+ public void testTimeRangeFilterSupported()
+ {
+ String schemaWithTWCS = "CREATE TABLE k.t (a int PRIMARY KEY, b int) "
+
+ "WITH compaction = { 'class' :
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'};";
+ String twcsCompaction = extractCompactionStrategy(schemaWithTWCS);
+
assertThat(twcsCompaction).isEqualTo("org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy");
+ assertThat(isTimeRangeFilterSupported(twcsCompaction)).isTrue();
+
+ String schemaWithTWCSNoSpace = "CREATE TABLE k.t (a int PRIMARY KEY, b
int) " +
+ "WITH compaction =
{'class':'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'};";
+ String twcsCompactionNoSpace =
extractCompactionStrategy(schemaWithTWCSNoSpace);
+
assertThat(twcsCompactionNoSpace).isEqualTo("org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy");
+ assertThat(isTimeRangeFilterSupported(twcsCompactionNoSpace)).isTrue();
+
+ String schemaWithLCS = "CREATE TABLE k.t (a int PRIMARY KEY, b int) " +
+ "WITH compaction = { 'class' :
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'};";
+ String lcsCompaction = extractCompactionStrategy(schemaWithLCS);
+
assertThat(lcsCompaction).isEqualTo("org.apache.cassandra.db.compaction.LeveledCompactionStrategy");
+ assertThat(isTimeRangeFilterSupported(lcsCompaction)).isFalse();
+
+ String schemaWithSTCS = "CREATE TABLE k.t (a int PRIMARY KEY, b int) "
+
+ "WITH compaction = {'class':
'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'};";
+ String stcsCompaction = extractCompactionStrategy(schemaWithSTCS);
+ assertThat(isTimeRangeFilterSupported(stcsCompaction)).isFalse();
+
+ String schemaNoCompaction = "CREATE TABLE k.t (a int PRIMARY KEY, b
int);";
+ String nullCompaction = extractCompactionStrategy(schemaNoCompaction);
+ assertThat(nullCompaction).isNull();
+ assertThat(isTimeRangeFilterSupported(nullCompaction)).isTrue();
+
+ assertThat(isTimeRangeFilterSupported("")).isFalse();
+ }
+
private static String loadFullSchemaSample() throws IOException
{
Path fullSchemaSampleFile =
ResourceUtils.writeResourceToPath(CqlUtilsTest.class.getClassLoader(),
tempPath, "cql/fullSchema.cql");
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java
new file mode 100644
index 00000000..31758a46
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderFilteringIntegrationTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_END_TIMESTAMP_MICROS;
+import static
org.apache.cassandra.spark.data.ClientConfig.SSTABLE_START_TIMESTAMP_MICROS;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration test for various filters used during bulk reading.
+ */
+class BulkReaderFilteringIntegrationTest extends
SharedClusterSparkIntegrationTestBase
+{
+ static final int DATA_SIZE = 1000;
+
+ QualifiedName twcsTable = uniqueTestTableFullName(TEST_KEYSPACE);
+ QualifiedName lcsTable = uniqueTestTableFullName(TEST_KEYSPACE);
+
+ // Use base timestamp that's 10 minutes in the past
+ static final long BASE_TIMESTAMP_MILLIS = System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(10);
+
+ // Separate each batch by 2 minutes to ensure they go into different TWCS
windows (1 minute window size)
+ static final long EARLY_TIMESTAMP_MICROS =
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS);
+ static final long MIDDLE_TIMESTAMP_MICROS =
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS +
TimeUnit.MINUTES.toMillis(2));
+ static final long LATE_TIMESTAMP_MICROS =
TimeUnit.MILLISECONDS.toMicros(BASE_TIMESTAMP_MILLIS +
TimeUnit.MINUTES.toMillis(4));
+
+ @Test
+ void testReadAllDataWithoutTimeRangeFilter()
+ {
+ // Read all data without any time range filter
+ Map<String, String> timeRangeOptions = Map.of();
+ int expectedDataSize = DATA_SIZE * 3; // all 3 SSTables read
+ Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS,
MIDDLE_TIMESTAMP_MICROS, LATE_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, expectedDataSize,
expectedSSTableTimestamps);
+ }
+
+ @Test
+ void testTimeRangeFilterWithStartBoundInclusive()
+ {
+ // Read data starting MIDDLE_TIMESTAMP
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_START_TIMESTAMP_MICROS,
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString());
+ int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+ Set<Long> expectedSSTableTimestamps = Set.of(MIDDLE_TIMESTAMP_MICROS,
LATE_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, expectedDataSize,
expectedSSTableTimestamps);
+ }
+
+ @Test
+ void testTimeRangeFilterWithStartBoundExclusive()
+ {
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_START_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS +
1).toString());
+ Set<Long> expectedSSTableTimestamps = Set.of(LATE_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE,
expectedSSTableTimestamps); // 1 SSTables read
+ }
+
+ @Test
+ void testTimeRangeFilterWithEndBoundInclusive()
+ {
+ // Read data ending with MIDDLE_TIMESTAMP inclusive
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_END_TIMESTAMP_MICROS,
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString());
+ int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+ Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS,
MIDDLE_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, expectedDataSize,
expectedSSTableTimestamps);
+ }
+
+ @Test
+ void testTimeRangeFilterWithEndBoundExclusive()
+ {
+ // Read data ending with MIDDLE_TIMESTAMP exclusive
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(MIDDLE_TIMESTAMP_MICROS -
1).toString());
+ Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE,
expectedSSTableTimestamps); // 1 SSTables read
+ }
+
+ @Test
+ void testTimeRangeFilterWithStartAndEndBound()
+ {
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_START_TIMESTAMP_MICROS,
Long.valueOf(MIDDLE_TIMESTAMP_MICROS).toString(),
+
SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS -
1).toString());
+ Set<Long> expectedSSTableTimestamps = Set.of(MIDDLE_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, DATA_SIZE,
expectedSSTableTimestamps); // 1 SSTables read
+ }
+
+ @Test
+ void testTimeRangeFilterWithStartAndEndBoundExclusive()
+ {
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_START_TIMESTAMP_MICROS, Long.valueOf(EARLY_TIMESTAMP_MICROS +
1).toString(),
+
SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(LATE_TIMESTAMP_MICROS -
1).toString());
+ int expectedDataSize = DATA_SIZE * 2; // 2 SSTables read
+ Set<Long> expectedSSTableTimestamps = Set.of(EARLY_TIMESTAMP_MICROS,
MIDDLE_TIMESTAMP_MICROS);
+ runTimeRangeFilterTest(timeRangeOptions, expectedDataSize,
expectedSSTableTimestamps);
+ }
+
+ @Test
+ void testTimeRangeFilterNonOverlappingBound()
+ {
+ Map<String, String> timeRangeOptions =
Map.of(SSTABLE_END_TIMESTAMP_MICROS, Long.valueOf(EARLY_TIMESTAMP_MICROS -
1).toString());
+ Dataset<Row> data = bulkReaderDataFrame(twcsTable,
timeRangeOptions).load();
+
+ List<Row> rows = data.collectAsList();
+ assertThat(rows.size()).isEqualTo(0); // no data read
+ }
+
+ @Test
+ void testTimeRangeFilterWithoutTWCS()
+ {
+ // Attempt to use time range filter with non-TWCS table should throw
exception
+ Map<String, String> timeRangeOptions = Map.of(
+ SSTABLE_START_TIMESTAMP_MICROS,
Long.valueOf(EARLY_TIMESTAMP_MICROS).toString(),
+ SSTABLE_END_TIMESTAMP_MICROS,
Long.valueOf(LATE_TIMESTAMP_MICROS).toString()
+ );
+
+ assertThatThrownBy(() -> {
+ Dataset<Row> data = bulkReaderDataFrame(lcsTable,
timeRangeOptions).load();
+ data.collectAsList();
+ })
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("SSTableTimeRangeFilter is only supported with
TimeWindowCompactionStrategy. " +
+ "Current compaction strategy is:
org.apache.cassandra.db.compaction.LeveledCompactionStrategy");
+ }
+
+ private void runTimeRangeFilterTest(Map<String, String> timeRangeOptions,
+ int expectedDataSize,
+ Set<Long> expectedTimestamps)
+ {
+ Dataset<Row> data = bulkReaderDataFrame(twcsTable,
timeRangeOptions).load();
+
+ List<Row> rows = data.collectAsList();
+ assertThat(rows.size()).isEqualTo(expectedDataSize);
+
+ Set<Long> allTimestamps = rows.stream()
+ .map(row -> row.getLong(2))
+ .collect(Collectors.toSet());
+
+ assertThat(expectedTimestamps.size()).isEqualTo(allTimestamps.size());
+ assertThat(expectedTimestamps).containsAll(allTimestamps);
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+ IInstance instance = cluster.getFirstRunningInstance();
+ ICoordinator coordinator = instance.coordinator();
+
+ // Initialize schema for SSTable time range filtering
+
+ // Create table with TWCS compaction strategy with compaction window 1
minute
+ createTestTable(twcsTable, "CREATE TABLE IF NOT EXISTS %s (" +
+ " id text PRIMARY KEY," +
+ " data text," +
+ " timestamp bigint" +
+ ") WITH compaction = {" +
+ " 'class':
'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'," +
+ " 'compaction_window_size': '1'," +
+ " 'compaction_window_unit': 'MINUTES'" +
+ "};");
+
+ createTestTable(lcsTable, "CREATE TABLE IF NOT EXISTS %s (" +
+ " id text PRIMARY KEY," +
+ " data text" +
+ ") WITH compaction = {" +
+ " 'class':
'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'" +
+ "};");
+ for (int i = 0; i < 10; i++)
+ {
+ String query = String.format("INSERT INTO %s (id, data) VALUES
('%s', 'data_%s')", lcsTable, i, "data" + i);
+ coordinator.execute(query, ConsistencyLevel.ALL);
+ }
+ instance.nodetool("flush", TEST_KEYSPACE, lcsTable.table());
+
+ // create 3 SSTables in 3 time windows, each SSTable created 2 mins
apart
+ // Insert early data with early timestamps
+ for (int i = 0; i < DATA_SIZE; i++)
+ {
+ long timestamp = EARLY_TIMESTAMP_MICROS + i;
+ String query = String.format("INSERT INTO %s (id, data, timestamp)
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+ twcsTable, i, "data" + i,
EARLY_TIMESTAMP_MICROS, timestamp);
+ coordinator.execute(query, ConsistencyLevel.ALL);
+ }
+
+ // Flush to create first SSTable
+ instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+
+ // Insert middle data with middle timestamps
+ for (int i = 0; i < DATA_SIZE; i++)
+ {
+ int id = DATA_SIZE + i;
+ long timestamp = MIDDLE_TIMESTAMP_MICROS + i;
+ String query = String.format("INSERT INTO %s (id, data, timestamp)
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+ twcsTable, id, "data" + id,
MIDDLE_TIMESTAMP_MICROS, timestamp);
+ coordinator.execute(query, ConsistencyLevel.ALL);
+ }
+
+ // Flush to create second SSTable
+ instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+
+ // Insert late data with late timestamps
+ for (int i = 0; i < DATA_SIZE; i++)
+ {
+ int id = DATA_SIZE * 2 + i;
+ long timestamp = LATE_TIMESTAMP_MICROS + i;
+ String query = String.format("INSERT INTO %s (id, data, timestamp)
VALUES ('%s', 'data_%s', %d) USING TIMESTAMP %d",
+ twcsTable, id, "data" + id,
LATE_TIMESTAMP_MICROS, timestamp);
+ coordinator.execute(query, ConsistencyLevel.ALL);
+ }
+
+ // Flush to create third SSTable
+ instance.nodetool("flush", TEST_KEYSPACE, twcsTable.table());
+ }
+}
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 0701ae28..6bbf7f14 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -61,6 +61,7 @@ import
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.cassandra.util.CompressionUtil;
import org.jetbrains.annotations.NotNull;
@@ -88,6 +89,7 @@ public abstract class CassandraBridge
@NotNull
SSTablesSupplier ssTables,
@Nullable
SparkRangeFilter sparkRangeFilter,
@NotNull
Collection<PartitionKeyFilter> partitionKeyFilters,
+ @NotNull
SSTableTimeRangeFilter sstableTimeRangeFilter,
@Nullable
PruneColumnFilter columnFilter,
@NotNull
TimeProvider timeProvider,
boolean
readIndexOffset,
@@ -572,33 +574,37 @@ public abstract class CassandraBridge
/**
* Convenience method around `readPartitionKeys` to accept partition keys
as string values and encode with the correct types.
*
- * @param partitioner Cassandra partitioner
- * @param keyspace keyspace name
- * @param createStmt create table CQL statement
- * @param ssTables set of SSTables to read
- * @param rowConsumer Consumer interface to consume rows as they are read
to avoid buffering all rows in memory for consumption.
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createStmt create table CQL statement
+ * @param ssTables set of SSTables to read
+ * @param sstableTimeRangeFilter SSTable time range filter for
filtering out SSTable based on min and max timestamp
+ * @param rowConsumer Consumer interface to consume rows as
they are read to avoid buffering all rows in memory for consumption.
* @throws IOException
*/
public void readStringPartitionKeys(@NotNull Partitioner partitioner,
@NotNull String keyspace,
@NotNull String createStmt,
@NotNull Set<SSTable> ssTables,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
{
- readStringPartitionKeys(partitioner, keyspace, createStmt, ssTables,
null, null, null, rowConsumer);
+ readStringPartitionKeys(partitioner, keyspace, createStmt, ssTables,
null, null, null, sstableTimeRangeFilter, rowConsumer);
}
/**
* Convenience method around `readPartitionKeys` to accept partition keys
as string values and encode with the correct types.
*
- * @param partitioner Cassandra partitioner
- * @param keyspace keyspace name
- * @param createStmt create table CQL statement
- * @param ssTables set of SSTables to read
- * @param tokenRange optional token range to limit the bulk read to
a restricted token range.
- * @param partitionKeys list of partition keys, if more than one
partition keys they must be correctly ordered in the inner list.
- * @param pruneColumnFilter optional filter to select a subset of columns,
this can offer performance improvement if skipping over large blobs or columns.
- * @param rowConsumer Consumer interface to consume rows as they are
read to avoid buffering all rows in memory for consumption.
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createStmt create table CQL statement
+ * @param ssTables set of SSTables to read
+ * @param tokenRange optional token range to limit the bulk
read to a restricted token range.
+ * @param partitionKeys list of partition keys, if more than
one partition keys they must be correctly ordered in the inner list.
+ * @param pruneColumnFilter optional filter to select a subset of
columns, this can offer performance
+ * improvement if skipping over large
blobs or columns.
+ * @param sstableTimeRangeFilter SSTable time range filter, filters out
SSTables not overlapping given time ranges
+ * @param rowConsumer Consumer interface to consume rows as
they are read to avoid buffering all rows in memory for consumption.
* @throws IOException
*/
public void readStringPartitionKeys(@NotNull Partitioner partitioner,
@@ -608,6 +614,7 @@ public abstract class CassandraBridge
@Nullable TokenRange tokenRange,
@Nullable List<List<String>>
partitionKeys,
@Nullable String[] pruneColumnFilter,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
{
readPartitionKeys(partitioner,
@@ -617,6 +624,7 @@ public abstract class CassandraBridge
tokenRange,
partitionKeys == null ? null :
encodePartitionKeys(partitioner, keyspace, createStmt, partitionKeys),
pruneColumnFilter,
+ sstableTimeRangeFilter,
rowConsumer);
}
@@ -624,9 +632,10 @@ public abstract class CassandraBridge
@NotNull String keyspace,
@NotNull String createStmt,
@NotNull Set<SSTable> ssTables,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
{
- readPartitionKeys(partitioner, keyspace, createStmt, ssTables, null,
null, null, rowConsumer);
+ readPartitionKeys(partitioner, keyspace, createStmt, ssTables, null,
null, null, sstableTimeRangeFilter, rowConsumer);
}
public void readPartitionKeys(@NotNull Partitioner partitioner,
@@ -636,9 +645,11 @@ public abstract class CassandraBridge
@Nullable TokenRange tokenRange,
@Nullable List<ByteBuffer> partitionKeys,
@Nullable String[] pruneColumnFilter,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
{
- readPartitionKeys(partitioner, keyspace, createStmt, new
BasicSupplier(ssTables), tokenRange, partitionKeys, pruneColumnFilter,
rowConsumer);
+ readPartitionKeys(partitioner, keyspace, createStmt, new
BasicSupplier(ssTables), tokenRange, partitionKeys,
+ pruneColumnFilter, sstableTimeRangeFilter,
rowConsumer);
}
public abstract void readPartitionKeys(@NotNull Partitioner partitioner,
@@ -648,6 +659,7 @@ public abstract class CassandraBridge
@Nullable TokenRange tokenRange,
@Nullable List<ByteBuffer>
partitionKeys,
@Nullable String[]
pruneColumnFilter,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@NotNull Consumer<Map<String,
Object>> rowConsumer) throws IOException;
// Kryo/Java (De-)Serialization
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 1653bab8..88d5c6df 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -105,6 +105,7 @@ import org.apache.cassandra.spark.sparksql.RowIterator;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
import org.apache.cassandra.spark.utils.TimeProvider;
@@ -198,6 +199,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
@NotNull
SSTablesSupplier ssTables,
@Nullable
SparkRangeFilter sparkRangeFilter,
@NotNull
Collection<PartitionKeyFilter> partitionKeyFilters,
+ @NotNull
SSTableTimeRangeFilter sstableTimeRangeFilter,
@Nullable
PruneColumnFilter columnFilter,
@NotNull TimeProvider
timeProvider,
boolean readIndexOffset,
@@ -211,6 +213,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
return
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
.withSparkRangeFilter(sparkRangeFilter)
.withPartitionKeyFilters(partitionKeyFilters)
+
.withTimeRangeFilter(sstableTimeRangeFilter)
.withColumnFilter(columnFilter)
.withReadIndexOffset(readIndexOffset)
.withStats(stats)
@@ -443,6 +446,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
@Nullable TokenRange tokenRange,
@Nullable List<ByteBuffer> partitionKeys,
@Nullable String[] requiredColumns,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
Consumer<Map<String, Object>> rowConsumer)
throws IOException
{
IPartitioner iPartitioner = getPartitioner(partitioner);
@@ -462,8 +466,9 @@ public class CassandraBridgeImplementation extends
CassandraBridge
Stats.DoNothingStats.INSTANCE,
TypeConverter.IDENTITY,
partitionKeyFilters,
+ sstableTimeRangeFilter,
(t) ->
PruneColumnFilter.of(requiredColumns),
- (partitionId1,
partitionKeyFilters1, columnFilter1) ->
+ (partitionId1,
partitionKeyFilters1, timeRangeFilter1, columnFilter1) ->
new CompactionStreamScanner(
metadata,
partitioner,
@@ -471,6 +476,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
ssTables.openAll((ssTable,
isRepairPrimary) ->
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
.withPartitionKeyFilters(partitionKeyFilters1)
+
.withTimeRangeFilter(timeRangeFilter1)
.build())
))
{
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 03ab569d..3dbe44d1 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -83,6 +83,7 @@ import
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.ThrowableUtils;
@@ -118,6 +119,8 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
@NotNull
private final List<PartitionKeyFilter> partitionKeyFilters;
@NotNull
+ private final SSTableTimeRangeFilter sstableTimeRangeFilter;
+ @NotNull
private final Stats stats;
@Nullable
private Long startOffset = null;
@@ -143,6 +146,8 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
SparkRangeFilter sparkRangeFilter = null;
@NotNull
final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
+ @NotNull
+ SSTableTimeRangeFilter sstableTimeRangeFilter =
SSTableTimeRangeFilter.ALL;
Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
{
@@ -171,6 +176,15 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
return this;
}
+ public Builder withTimeRangeFilter(@Nullable SSTableTimeRangeFilter
sstableTimeRangeFilter)
+ {
+ if (sstableTimeRangeFilter != null)
+ {
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+ }
+ return this;
+ }
+
public Builder withColumnFilter(@Nullable PruneColumnFilter
columnFilter)
{
this.columnFilter = columnFilter;
@@ -213,6 +227,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
ssTable,
sparkRangeFilter,
partitionKeyFilters,
+ sstableTimeRangeFilter,
columnFilter,
readIndexOffset,
stats,
@@ -232,6 +247,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
@NotNull SSTable ssTable,
@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@Nullable PruneColumnFilter columnFilter,
boolean readIndexOffset,
@NotNull Stats stats,
@@ -304,6 +320,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
header = null;
helper = null;
this.metadata = null;
+ this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
return;
}
@@ -330,6 +347,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
header = null;
helper = null;
this.metadata = null;
+ this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
return;
}
}
@@ -348,6 +366,21 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
}
this.statsMetadata = (StatsMetadata)
componentMap.get(MetadataType.STATS);
+ if (!sstableTimeRangeFilter.overlaps(statsMetadata.minTimestamp,
statsMetadata.maxTimestamp))
+ {
+ LOGGER.info("Ignoring SSTableReader with minTimestamp={}
maxTimestamp={}, does not overlap with filter {}",
+ this.statsMetadata.minTimestamp,
this.statsMetadata.maxTimestamp, sstableTimeRangeFilter);
+ header = null;
+ helper = null;
+ this.metadata = null;
+ this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
+ return;
+ }
+ else
+ {
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+ }
+
SerializationHeader.Component headerComp =
(SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
if (headerComp == null)
{
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 008f62f3..943a3b76 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -102,6 +102,7 @@ import org.apache.cassandra.spark.sparksql.RowIterator;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
import org.apache.cassandra.spark.utils.TimeProvider;
@@ -195,6 +196,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
@NotNull
SSTablesSupplier ssTables,
@Nullable
SparkRangeFilter sparkRangeFilter,
@NotNull
Collection<PartitionKeyFilter> partitionKeyFilters,
+ @NotNull
SSTableTimeRangeFilter sstableTimeRangeFilter,
@Nullable
PruneColumnFilter columnFilter,
@NotNull TimeProvider
timeProvider,
boolean readIndexOffset,
@@ -208,6 +210,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
return
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
.withSparkRangeFilter(sparkRangeFilter)
.withPartitionKeyFilters(partitionKeyFilters)
+
.withTimeRangeFilter(sstableTimeRangeFilter)
.withColumnFilter(columnFilter)
.withReadIndexOffset(readIndexOffset)
.withStats(stats)
@@ -425,6 +428,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
@Nullable TokenRange tokenRange,
@Nullable List<ByteBuffer> partitionKeys,
@Nullable String[] requiredColumns,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
Consumer<Map<String, Object>> rowConsumer)
throws IOException
{
IPartitioner iPartitioner = getPartitioner(partitioner);
@@ -444,8 +448,9 @@ public class CassandraBridgeImplementation extends
CassandraBridge
Stats.DoNothingStats.INSTANCE,
TypeConverter.IDENTITY,
partitionKeyFilters,
+ sstableTimeRangeFilter,
(t) ->
PruneColumnFilter.of(requiredColumns),
- (partitionId1,
partitionKeyFilters1, columnFilter1) ->
+ (partitionId1,
partitionKeyFilters1, timeRangeFilter1, columnFilter1) ->
new CompactionStreamScanner(
metadata,
partitioner,
@@ -453,6 +458,7 @@ public class CassandraBridgeImplementation extends
CassandraBridge
ssTables.openAll((ssTable,
isRepairPrimary) ->
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
.withPartitionKeyFilters(partitionKeyFilters1)
+
.withTimeRangeFilter(timeRangeFilter1)
.build())
))
{
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 210df479..350657e7 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -81,6 +81,7 @@ import
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.ThrowableUtils;
@@ -116,6 +117,8 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
@NotNull
private final List<PartitionKeyFilter> partitionKeyFilters;
@NotNull
+ private final SSTableTimeRangeFilter sstableTimeRangeFilter;
+ @NotNull
private final Stats stats;
@Nullable
private Long startOffset = null;
@@ -141,6 +144,8 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
SparkRangeFilter sparkRangeFilter = null;
@NotNull
final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
+ @NotNull
+ SSTableTimeRangeFilter sstableTimeRangeFilter =
SSTableTimeRangeFilter.ALL;
Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
{
@@ -163,6 +168,15 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
return this;
}
+ public Builder withTimeRangeFilter(@Nullable SSTableTimeRangeFilter
sstableTimeRangeFilter)
+ {
+ if (sstableTimeRangeFilter != null)
+ {
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+ }
+ return this;
+ }
+
public Builder withPartitionKeyFilter(@NotNull PartitionKeyFilter
partitionKeyFilter)
{
partitionKeyFilters.add(partitionKeyFilter);
@@ -211,6 +225,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
ssTable,
sparkRangeFilter,
partitionKeyFilters,
+ sstableTimeRangeFilter,
columnFilter,
readIndexOffset,
stats,
@@ -230,6 +245,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
@NotNull SSTable ssTable,
@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
+ @NotNull SSTableTimeRangeFilter
sstableTimeRangeFilter,
@Nullable PruneColumnFilter columnFilter,
boolean readIndexOffset,
@NotNull Stats stats,
@@ -299,6 +315,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
header = null;
helper = null;
this.metadata = null;
+ this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
return;
}
@@ -325,6 +342,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
header = null;
helper = null;
this.metadata = null;
+ this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
return;
}
}
@@ -343,6 +361,21 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
}
this.statsMetadata = (StatsMetadata)
componentMap.get(MetadataType.STATS);
+ if (!sstableTimeRangeFilter.overlaps(statsMetadata.minTimestamp,
statsMetadata.maxTimestamp))
+ {
+ LOGGER.info("Ignoring SSTableReader with minTimestamp={}
maxTimestamp={}, does not overlap with filter {}",
+ this.statsMetadata.minTimestamp,
this.statsMetadata.maxTimestamp, sstableTimeRangeFilter);
+ header = null;
+ helper = null;
+ this.metadata = null;
+ this.sstableTimeRangeFilter = SSTableTimeRangeFilter.ALL;
+ return;
+ }
+ else
+ {
+ this.sstableTimeRangeFilter = sstableTimeRangeFilter;
+ }
+
SerializationHeader.Component headerComp =
(SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
if (headerComp == null)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]