This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 17a38d3 CASSANALYTICS-4: Add CassandraBridge helper APIs that can be
used by external tooling (#98)
17a38d3 is described below
commit 17a38d33321003f1bbfd3ba73617014424df1416
Author: jberragan <[email protected]>
AuthorDate: Mon Feb 10 15:35:16 2025 -0800
CASSANALYTICS-4: Add CassandraBridge helper APIs that can be used by
external tooling (#98)
Patch by James Berragan; Reviewed by Francisco Guerrero , Yifan Cai for
CASSANALYTICS-4
---
CHANGES.txt | 1 +
.../cassandra/spark/data/ReplicationFactor.java | 8 +
.../apache/cassandra/spark/data/TypeConverter.java | 2 +
.../spark/sparksql/PartialRowBuilder.java | 2 +
.../cassandra/spark/sparksql/RowIterator.java | 49 +++
.../spark/sparksql/filters/PartitionKeyFilter.java | 13 +-
.../spark/sparksql/filters/PruneColumnFilter.java | 9 +
.../cassandra/spark/utils/ByteBufferUtils.java | 10 +
.../apache/cassandra/spark/utils/RandomUtils.java | 21 ++
.../cassandra/spark/utils/test/TestSSTable.java | 11 +-
.../spark/sparksql/AbstractSparkRowIterator.java | 3 +-
.../java/org/apache/cassandra/spark/TestUtils.java | 12 +
.../java/org/apache/cassandra/spark/Tester.java | 2 +-
.../spark/reader/CassandraBridgeUtilTests.java | 331 +++++++++++++++++++++
.../org/apache/cassandra/bridge/BloomFilter.java | 35 +--
.../apache/cassandra/bridge/CassandraBridge.java | 207 +++++++++++++
.../org/apache/cassandra/bridge/Tokenizer.java | 33 +-
.../bridge/CassandraBridgeImplementation.java | 258 +++++++++++++++-
.../apache/cassandra/spark/reader/IndexReader.java | 2 +-
.../apache/cassandra/spark/reader/ReaderUtils.java | 169 ++++++++---
.../cassandra/spark/reader/SSTableCache.java | 3 +-
.../cassandra/spark/reader/SSTableReader.java | 46 +--
.../cassandra/spark/reader/SummaryDbUtils.java | 10 +-
.../cassandra/spark/reader/ReaderUtilsTests.java | 10 +-
.../cassandra/spark/reader/SSTableCacheTests.java | 2 +-
.../cassandra/spark/reader/SSTableReaderTests.java | 24 +-
26 files changed, 1103 insertions(+), 170 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index b941a3c..2c98bc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Add CassandraBridge helper APIs that can be used by external tooling
(CASSANALYTICS-4)
* Refactor to decouple RowIterator and CellIterator from Spark so bulk reads
can be performed outside of Spark (CASSANDRA-20259)
* CEP-44 Kafka integration for Cassandra CDC using Sidecar (CASSANDRA-19962)
* Expose detailed bulk write failure message for better insight
(CASSANDRA-20066)
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
index ce85f57..0b9b682 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +98,12 @@ public class ReplicationFactor implements Serializable
}
}
+ public static ReplicationFactor simpleStrategy(int rf)
+ {
+ return new
ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
+ ImmutableMap.of("replication_factor",
rf));
+ }
+
@NotNull
private final ReplicationStrategy replicationStrategy;
@NotNull
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
index b190479..4e885af 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java
@@ -23,6 +23,8 @@ import org.jetbrains.annotations.NotNull;
public interface TypeConverter
{
+ TypeConverter IDENTITY = (cqlType, value, isFrozen) -> value;
+
/**
* Converts deserialized Cassandra Java value to desired equivalent type.
* E.g. SparkSQL uses `org.apache.spark.unsafe.types.UTF8String` to wrap
strings.
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
index 6df672c..f492f26 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.sparksql;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -51,6 +52,7 @@ public class PartialRowBuilder<T> extends FullRowBuilder<T>
Function<Object[], T> rowBuilder)
{
super(table, hasProjectedValueColumns, rowBuilder);
+ Objects.requireNonNull(requiredSchema, "requiredColumns must be
non-null to use PartialRowBuilder");
this.requiredSchema = requiredSchema;
this.columnIndex = new HashMap<>(requiredSchema.length);
for (int i = 0; i < requiredSchema.length; i++)
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
index 6d3382c..7b5369d 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
@@ -20,9 +20,13 @@
package org.apache.cassandra.spark.sparksql;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.Function;
import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -58,6 +62,51 @@ public abstract class RowIterator<T>
this.builder = newBuilder(decorator);
}
+ /**
+ * @param it CellIterator cell iterate instance to iterate
over each cell in the data.
+ * @param stats stats instance to emit KPIs
+ * @param requiredColumns optional column filter to select a subset of
columns, if null then all columns are returned.
+ * @return a basic RowIterator that transforms Object[] to a Map keyed on
column name
+ */
+ public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator
it, Stats stats, @Nullable String[] requiredColumns)
+ {
+ CqlTable cqlTable = it.cqlTable();
+ boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
+ return new RowIterator<Map<String, Object>>(it, stats,
requiredColumns, Function.identity())
+ {
+ @SuppressWarnings("DataFlowIssue") // requiredColumns null checked
in PartialRowBuilder constructor
+ @Override
+ public PartialRowBuilder<Map<String, Object>> newPartialBuilder()
+ {
+ return new PartialRowBuilder<>(
+ requiredColumns, it.cqlTable(), hasProjectedValueColumns,
+ (valueArray) -> {
+ Map<String, Object> row = new
HashMap<>(requiredColumns.length);
+ for (int i = 0; i < requiredColumns.length; i++)
+ {
+ row.put(requiredColumns[i], valueArray[i]);
+ }
+ return row;
+ });
+ }
+
+ @Override
+ public FullRowBuilder<Map<String, Object>> newFullRowBuilder()
+ {
+ return new FullRowBuilder<>(
+ cqlTable, hasProjectedValueColumns,
+ (valueArray) -> {
+ Map<String, Object> row = new
HashMap<>(cqlTable.numFields());
+ for (CqlField field : cqlTable.fields())
+ {
+ row.put(field.name(), valueArray[field.position()]);
+ }
+ return row;
+ });
+ }
+ };
+ }
+
@NotNull
protected RowBuilder<T> newBuilder(Function<RowBuilder<T>, RowBuilder<T>>
decorator)
{
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
index 6d04850..59a4a76 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.jetbrains.annotations.NotNull;
-public final class PartitionKeyFilter implements Serializable
+public final class PartitionKeyFilter implements Serializable,
Comparable<PartitionKeyFilter>
{
@NotNull
private BigInteger token;
@@ -141,4 +141,15 @@ public final class PartitionKeyFilter implements
Serializable
return filterKey.equals(that.filterKey)
&& token.equals(that.token);
}
+
+ @Override
+ public int compareTo(@NotNull PartitionKeyFilter other)
+ {
+ int compare = token.compareTo(other.token);
+ if (compare == 0)
+ {
+ return key().compareTo(other.key());
+ }
+ return compare;
+ }
}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
index c032659..e46e1a3 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
@@ -19,9 +19,12 @@
package org.apache.cassandra.spark.sparksql.filters;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Prune column push-down filter to skip reading columns that are not needed
@@ -35,6 +38,12 @@ public class PruneColumnFilter
this.requiredColumns = requiredColumns;
}
+ @Nullable
+ public static PruneColumnFilter of(@Nullable String[] requiredColumns)
+ {
+ return requiredColumns == null ? null : new PruneColumnFilter(new
HashSet<>(Arrays.asList(requiredColumns)));
+ }
+
public Set<String> requiredColumns()
{
return requiredColumns;
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
index adb0eab..fd4ece2 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java
@@ -91,6 +91,16 @@ public final class ByteBufferUtils
return bytes;
}
+ public static String readShortLengthCompositeTypeString(ByteBuffer buf)
+ {
+ int len = buf.getShort();
+ byte[] ar = new byte[len];
+ buf.get(ar);
+ String str = new String(ar, StandardCharsets.UTF_8);
+ buf.get(); // CompositeType pads with a null byte at the end
+ return str;
+ }
+
/**
* Decode ByteBuffer into String using provided CharsetDecoder.
*
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
index c59e11e..5827b44 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/RandomUtils.java
@@ -23,6 +23,7 @@ import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.IntStream;
@@ -119,6 +120,26 @@ public final class RandomUtils
return randomAlphanumeric(RandomUtils.nextInt(minLengthInclusive,
maxLengthExclusive));
}
+ public static String randomAlphanumeric(Set<String> alreadyExist)
+ {
+ return randomAlphanumeric(alreadyExist, 32);
+ }
+
+ public static String randomAlphanumeric(Set<String> alreadyExist, int
length)
+ {
+ String str = randomAlphanumeric(length);
+ while (alreadyExist.contains(str))
+ {
+ str = randomAlphanumeric(length);
+ }
+ return str;
+ }
+
+ public static String randomAlphanumeric()
+ {
+ return randomAlphanumeric(32);
+ }
+
public static String randomAlphanumeric(int length)
{
StringBuilder sb = new StringBuilder(length);
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
index d7e42c4..6e9d6a4 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
@@ -98,7 +98,7 @@ public final class TestSSTable extends SSTable
@Override
protected InputStream openInputStream(FileType fileType)
{
- Path filePath = FileType.resolveComponentFile(fileType, dataFile);
+ Path filePath = fileComponentPath(fileType);
try
{
return filePath != null ? new BufferedInputStream(new
FileInputStream(filePath.toFile())) : null;
@@ -109,14 +109,19 @@ public final class TestSSTable extends SSTable
}
}
+ public Path fileComponentPath(FileType fileType)
+ {
+ return FileType.resolveComponentFile(fileType, dataFile);
+ }
+
public long length(FileType fileType)
{
- return IOUtils.size(FileType.resolveComponentFile(fileType, dataFile));
+ return IOUtils.size(fileComponentPath(fileType));
}
public boolean isMissing(FileType fileType)
{
- return FileType.resolveComponentFile(fileType, dataFile) == null;
+ return fileComponentPath(fileType) == null;
}
@Override
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
index 85fd17b..632e73f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.spark.sparksql;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -87,10 +86,10 @@ abstract class AbstractSparkRowIterator<T> extends
RowIterator<T>
*/
public abstract T rowBuilder(Object[] valueArray);
+ @SuppressWarnings("DataFlowIssue") // Null check performed within
PartialRowBuilder constructor
@Override
public PartialRowBuilder<T> newPartialBuilder()
{
- Objects.requireNonNull(requiredColumns, "requiredColumns must be
non-null for PartialRowBuilder");
return new PartialRowBuilder<>(requiredColumns, it.cqlTable(),
it.hasProjectedValueColumns(), this::rowBuilder);
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
index f85be6c..865de42 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
@@ -388,4 +388,16 @@ public final class TestUtils extends CommonTestUtils
{
return Range.openClosed(start, end);
}
+
+ public static void createDirectory(Path directory)
+ {
+ try
+ {
+ Files.createDirectory(directory);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
index 1fa6eb0..34767f2 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
@@ -59,7 +59,7 @@ import static
org.quicktheories.generators.SourceDSL.arbitrary;
public final class Tester
{
- static final int DEFAULT_NUM_ROWS = 200;
+ public static final int DEFAULT_NUM_ROWS = 200;
@NotNull
private final List<CassandraVersion> versions;
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
new file mode 100644
index 0000000..0ae7825
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/CassandraBridgeUtilTests.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.cassandra.bridge.BloomFilter;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.SSTableSummary;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.spark.TestUtils;
+import org.apache.cassandra.spark.data.FileType;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.test.TestSSTable;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static org.apache.cassandra.spark.Tester.DEFAULT_NUM_ROWS;
+import static
org.apache.cassandra.spark.utils.ByteBufferUtils.readShortLengthCompositeTypeString;
+import static org.apache.cassandra.spark.utils.RandomUtils.randomAlphanumeric;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+
+public class CassandraBridgeUtilTests
+{
+ @TempDir
+ private static Path tempPath;
+
+ @Test
+ public void testLastRepairTime()
+ {
+ runTest((partitioner, bridge, schema, testDir) -> {
+ writeSSTable(partitioner, bridge, schema, testDir,
+ (writer) -> IntStream.range(0, DEFAULT_NUM_ROWS)
+ .forEach(i ->
writer.write(randomAlphanumeric(), randomAlphanumeric(), randomAlphanumeric()))
+ );
+
+ try
+ {
+ assertEquals(0, bridge.lastRepairTime("ks", "tb",
TestSSTable.firstIn(testDir)));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBuildPartitionKey()
+ {
+ runTest((partitioner, bridge, schema, testDir) -> {
+ ByteBuffer pk1 = bridge.encodePartitionKey(partitioner, "ks",
schema.createStatement, Collections.singletonList("a"));
+ assertEquals("a", ByteBufferUtils.string(pk1));
+ String str1 = randomAlphanumeric();
+ ByteBuffer pk2 = bridge.encodePartitionKey(partitioner, "ks",
schema.createStatement, Collections.singletonList(str1));
+ assertEquals(str1, ByteBufferUtils.string(pk2));
+ List<String> keys = Arrays.asList(randomAlphanumeric(),
randomAlphanumeric(), randomAlphanumeric());
+
+ TestSchema threePkSchema = TestSchema.builder(bridge)
+ .withPartitionKey("a",
bridge.text())
+ .withPartitionKey("b",
bridge.text())
+ .withPartitionKey("c",
bridge.text())
+ .withColumn("d",
bridge.bigint())
+ .withColumn("e",
bridge.aInt())
+ .build();
+ ByteBuffer pk3 = bridge.encodePartitionKey(partitioner, "ks",
threePkSchema.createStatement, keys);
+ assertEquals(keys.get(0), readShortLengthCompositeTypeString(pk3));
+ assertEquals(keys.get(1), readShortLengthCompositeTypeString(pk3));
+ assertEquals(keys.get(2), readShortLengthCompositeTypeString(pk3));
+ });
+ }
+
+ @Test
+ public void testOverlaps()
+ {
+ runTest((partitioner, bridge, schema, testDir) -> {
+ // write SSTable
+ List<String> keys = IntStream.range(0, 3).mapToObj(i ->
randomAlphanumeric()).collect(Collectors.toList());
+ List<ByteBuffer> buffers = bridge.encodePartitionKeys(
+ partitioner,
+ schema.keyspace,
+ schema.createStatement,
+ keys.stream()
+ .map(Collections::singletonList)
+ .collect(Collectors.toList())
+ );
+ List<BigInteger> sortedTokens = buffers
+ .stream()
+ .map(partitioner::hash)
+ .sorted()
+ .collect(Collectors.toList());
+ assertTrue(TestSSTable.allIn(testDir).isEmpty());
+ writeSSTable(partitioner, bridge, schema, testDir,
+ (writer) -> keys.forEach(key -> writer.write(key,
randomAlphanumeric(), randomAlphanumeric())));
+ assertEquals(1, TestSSTable.allIn(testDir).size());
+
+ TestSSTable ssTable = (TestSSTable) TestSSTable.firstIn(testDir);
+ List<TokenRange> testRanges = Arrays.asList(
+ TokenRange.openClosed(sortedTokens.get(0), sortedTokens.get(0)),
+ TokenRange.closed(sortedTokens.get(0), sortedTokens.get(1)),
+ TokenRange.closed(sortedTokens.get(0), sortedTokens.get(2)),
+ TokenRange.openClosed(sortedTokens.get(1), sortedTokens.get(1)),
+ TokenRange.closed(sortedTokens.get(1), sortedTokens.get(2)),
+ TokenRange.openClosed(sortedTokens.get(2),
sortedTokens.get(2).add(BigInteger.ONE)) // out of range
+ );
+
+ Path summaryDbFile = ssTable.fileComponentPath(FileType.SUMMARY);
+ for (int j = 0; j < 2; j++) // loop around after deleting
Summary.db file to verify we can check using Index.db file
+ {
+ SSTableSummary summary = bridge.getSSTableSummary(partitioner,
ssTable, 128, 256);
+ assertEquals(sortedTokens.get(0), summary.firstToken);
+ assertEquals(sortedTokens.get(2), summary.lastToken);
+ TokenRange sstableRange =
TokenRange.closed(summary.firstToken, summary.lastToken);
+ for (BigInteger token : sortedTokens)
+ {
+ assertTrue(sstableRange.contains(token));
+ }
+ List<Boolean> result = bridge.overlaps(ssTable, partitioner,
128, 256, testRanges);
+
+ assertEquals(testRanges.size(), result.size());
+ for (int i = 0; i < result.size(); i++)
+ {
+ assertEquals(i < result.size() - 1, result.get(i));
+ }
+
+ // delete Summary.db file and check we can read the Index.db
file too
+ Files.deleteIfExists(summaryDbFile);
+ }
+ });
+ }
+
+ @Test
+ public void testContains()
+ {
+ runTest((partitioner, bridge, schema, testDir) -> {
+ // write SSTable
+ Set<String> keys = IntStream.range(0, 25).mapToObj(i ->
randomAlphanumeric()).collect(Collectors.toSet());
+ List<ByteBuffer> buffers = bridge.encodePartitionKeys(
+ partitioner,
+ schema.keyspace,
+ schema.createStatement,
+ keys.stream()
+ .map(Collections::singletonList)
+ .collect(Collectors.toList())
+ );
+ assertTrue(TestSSTable.allIn(testDir).isEmpty());
+ writeSSTable(partitioner, bridge, schema, testDir,
+ (writer) -> keys.forEach(key -> writer.write(key,
randomAlphanumeric(), randomAlphanumeric())));
+ assertEquals(1, TestSSTable.allIn(testDir).size());
+
+ TestSSTable ssTable = (TestSSTable) TestSSTable.firstIn(testDir);
+
+ // should return all positives for the keys contained in the
SSTable
+ BloomFilter filter = bridge.openBloomFilter(partitioner,
schema.keyspace, schema.table, ssTable);
+ List<Boolean> result =
buffers.stream().map(filter::mightContain).collect(Collectors.toList());
+ assertEquals(result.size(), buffers.size());
+ assertTrue(result.stream().allMatch(boolValue -> boolValue));
+
+ // random keys should return some negatives for keys not contained
in the SSTable
+ List<String> otherKeys = IntStream.range(0,
DEFAULT_NUM_ROWS).mapToObj(i ->
randomAlphanumeric(keys)).collect(Collectors.toList());
+ List<ByteBuffer> randomBuffers = bridge.encodePartitionKeys(
+ partitioner,
+ schema.keyspace,
+ schema.createStatement,
+ otherKeys.stream()
+ .map(Collections::singletonList)
+ .collect(Collectors.toList())
+ );
+ BloomFilter randomBloomFilter =
bridge.openBloomFilter(partitioner, schema.keyspace, schema.table, ssTable);
+ List<Boolean> randomResult =
randomBuffers.stream().map(randomBloomFilter::mightContain).collect(Collectors.toList());
+ assertEquals(randomResult.size(), otherKeys.size());
+ assertTrue(randomResult.stream().anyMatch(boolValue ->
!boolValue));
+
+ // perform exact contains query to confirm expected keys exist and
random keys don't
+ assertTrue(bridge.contains(partitioner, schema.keyspace,
schema.table, ssTable, buffers).stream().allMatch(aBoolean -> aBoolean));
+ assertTrue(bridge.contains(partitioner, schema.keyspace,
schema.table, ssTable, randomBuffers).stream().noneMatch(aBoolean -> aBoolean));
+ List<ByteBuffer> allKeys = new ArrayList<>();
+ allKeys.addAll(buffers);
+ allKeys.addAll(randomBuffers);
+ List<Boolean> exactResult = bridge.contains(partitioner,
schema.keyspace, schema.table, ssTable, allKeys);
+ assertEquals(allKeys.size(), exactResult.size());
+ for (int i = 0; i < exactResult.size(); i++)
+ {
+ boolean contains = exactResult.get(i);
+ assertEquals(i < buffers.size(), contains);
+ if (filter.doesNotContain(allKeys.get(i)))
+ {
+ // if bloom filter returns true for `doesNotContain` then
contains should always be false
+ assertFalse(contains);
+ }
+ if (contains)
+ {
+ // bloom filter should always return true if SSTable
contains key
+ assertTrue(filter.mightContain(allKeys.get(i)));
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testReadKeys()
+ {
+ runTest(
+ (partitioner, bridge, schema, testDir) -> {
+ Map<String, String> expected = IntStream.range(0, DEFAULT_NUM_ROWS)
+ .mapToObj(i ->
randomAlphanumeric(32))
+
.collect(Collectors.toMap(Function.identity(), i -> randomAlphanumeric(32)));
+ writeSSTable(partitioner, bridge, schema, testDir, (writer) ->
expected.forEach((key, value) -> writer.write(key, value, value)));
+
+ List<SSTable> all = TestSSTable.allIn(testDir);
+ Set<SSTable> ssTables = new HashSet<>(all);
+ Map<String, Map<String, Object>> actual = new
HashMap<>(DEFAULT_NUM_ROWS);
+ bridge.readStringPartitionKeys(partitioner,
+ schema.keyspace,
+ schema.createStatement,
+ ssTables,
+ null,
+
expected.keySet().stream().map(Collections::singletonList).collect(Collectors.toList()),
+ null,
+ (row) ->
actual.put(row.get("a").toString(), row)
+ );
+ assertEquals(expected.size(), actual.size());
+ for (Map.Entry<String, String> entry : expected.entrySet())
+ {
+ Map<String, Object> actualRow = actual.get(entry.getKey());
+ assertNotNull(actualRow);
+ assertEquals(actualRow.get("b"), entry.getValue());
+ assertEquals(actualRow.get("c"), entry.getValue());
+ }
+ }
+ );
+ }
+
+ /* test utils */
+
+ private void writeSSTable(Partitioner partitioner, CassandraBridge bridge,
TestSchema schema, Path testDir, Consumer<CassandraBridge.Writer> writer)
+ {
+ bridge.writeSSTable(partitioner,
+ schema.keyspace,
+ schema.table,
+ testDir,
+ schema.createStatement,
+ schema.insertStatement,
+ writer);
+ }
+
+ private interface TestRunable
+ {
+ void run(Partitioner partitioner, CassandraBridge bridge, TestSchema
schema, Path testDir) throws IOException;
+ }
+
+ private static void runTest(TestRunable test)
+ {
+ qt().forAll(TestUtils.partitioners(), TestUtils.bridges())
+ .checkAssert(
+ ((partitioner, bridge) -> {
+ TestSchema schema = TestSchema.builder(bridge)
+ .withPartitionKey("a",
bridge.text())
+ .withColumn("b", bridge.text())
+ .withColumn("c", bridge.text())
+ .build();
+
+ String testId = UUID.randomUUID().toString();
+ Path testDir = tempPath.resolve(testId);
+ try
+ {
+ TestUtils.createDirectory(testDir);
+ test.run(partitioner, bridge, schema, testDir);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ try
+ {
+ FileUtils.deleteDirectory(testDir.toFile());
+ }
+ catch (IOException ignore)
+ {
+ }
+ }
+ }
+ )
+ );
+ }
+}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BloomFilter.java
similarity index 56%
copy from
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
copy to
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BloomFilter.java
index c032659..7ab1332 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BloomFilter.java
@@ -17,36 +17,27 @@
* under the License.
*/
-package org.apache.cassandra.spark.sparksql.filters;
+package org.apache.cassandra.bridge;
-import java.util.Set;
-
-import org.jetbrains.annotations.NotNull;
+import java.nio.ByteBuffer;
+import java.util.function.Predicate;
/**
- * Prune column push-down filter to skip reading columns that are not needed
+ * Version independent interface to front bloom filter.
*/
-public class PruneColumnFilter
+public interface BloomFilter extends Predicate<ByteBuffer>
{
- private final Set<String> requiredColumns;
-
- public PruneColumnFilter(@NotNull Set<String> requiredColumns)
- {
- this.requiredColumns = requiredColumns;
- }
-
- public Set<String> requiredColumns()
- {
- return requiredColumns;
- }
-
- public int size()
+ /**
+ * @param partitionKey serialzied partition key.
+ * @return true if SSTable might contain a given partition key, might
return false-positives but never false-negatives.
+ */
+ default boolean mightContain(ByteBuffer partitionKey)
{
- return requiredColumns.size();
+ return test(partitionKey);
}
- public boolean includeColumn(String columnName)
+ default boolean doesNotContain(ByteBuffer partitionKey)
{
- return requiredColumns.contains(columnName);
+ return !mightContain(partitionKey);
}
}
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 62d1d81..0701ae2 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableMap;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
+import org.apache.cassandra.spark.data.BasicSupplier;
import org.apache.cassandra.spark.data.CassandraTypes;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
@@ -401,6 +402,11 @@ public abstract class CassandraBridge
@NotNull String table,
@NotNull SSTable ssTable);
+ public abstract SSTableSummary getSSTableSummary(@NotNull Partitioner
partitioner,
+ @NotNull SSTable ssTable,
+ int minIndexInterval,
+ int maxIndexInterval);
+
// Version-Specific Test Utility Methods
@VisibleForTesting
@@ -443,6 +449,207 @@ public abstract class CassandraBridge
public abstract CompressionUtil compressionUtil();
+ // additional SSTable utils methods
+
+ /**
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @return last repair time for a given SSTable by reading the
Statistics.db file.
+ * @throws IOException
+ */
+ public abstract long lastRepairTime(@NotNull String keyspace,
+ @NotNull String table,
+ @NotNull SSTable ssTable) throws
IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param partitioner Cassandra partitioner
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @param ranges a list of token ranges
+ * @return a list boolean value if corresponding token range in `ranges`
list parameter overlaps with the SSTable.
+ * The SSTable may or may not contain data for the range.
+ */
+ public abstract List<Boolean> overlaps(@NotNull SSTable ssTable,
+ @NotNull Partitioner partitioner,
+ int minIndexInterval,
+ int maxIndexInterval,
+ @NotNull List<TokenRange> ranges)
throws IOException;
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace Cassandra keyspace
+ * @param createTableStmt CQL table create statement
+ * @param partitionKeys list of
+ * @return list of tokens corresponding to each input `partitionKeys`
+ */
+ public List<BigInteger> toTokens(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createTableStmt,
+ @NotNull List<List<String>> partitionKeys)
+ {
+ return toTokens(partitioner, encodePartitionKeys(partitioner,
keyspace, createTableStmt, partitionKeys));
+ }
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param partitionKeys list of encoded partition keys
+ * @return list of tokens corresponding to each input `partitionKeys`
+ */
+ public List<BigInteger> toTokens(@NotNull Partitioner partitioner,
+ @NotNull List<ByteBuffer> partitionKeys)
+ {
+ Tokenizer tokenizer = tokenizer(partitioner);
+ return partitionKeys
+ .stream()
+ .map(tokenizer::toToken)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @return a Tokenizer instance for the provided Partitioner that maps a
partition key to the token.
+ */
+ public abstract Tokenizer tokenizer(@NotNull Partitioner partitioner);
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createTableStmt CQL create table statement
+ * @param partitionKey partition key
+ * @return encoded ByteBuffer for the input `partitionKey`
+ */
+ public ByteBuffer encodePartitionKey(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createTableStmt,
+ @NotNull List<String> partitionKey)
+ {
+ return encodePartitionKeys(partitioner, keyspace, createTableStmt,
Collections.singletonList(partitionKey)).get(0);
+ }
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createTableStmt CQL create table statement
+ * @param partitionKeys list of partition keys
+ * @return a list encoded ByteBuffers corresponding to the partition keys
input in `partitionKeys`
+ */
+ public abstract List<ByteBuffer> encodePartitionKeys(@NotNull Partitioner
partitioner,
+ @NotNull String
keyspace,
+ @NotNull String
createTableStmt,
+ @NotNull
List<List<String>> partitionKeys);
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @return version independent BloomFilter instance to answer if SSTable
might contain a partition key
+ * (might return false-positives but never false-negatives)
+ * @throws IOException
+ */
+ public abstract BloomFilter openBloomFilter(@NotNull Partitioner
partitioner,
+ @NotNull String keyspace,
+ @NotNull String table,
+ @NotNull SSTable ssTable)
throws IOException;
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @param partitionKeys list of partition keys
+ * @return list of booleans returning true if an SSTable contains a
partition key, corresponding to the partition keys input in `partitionKeys`.
+ * @throws IOException
+ */
+ public abstract List<Boolean> contains(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String table,
+ @NotNull SSTable ssTable,
+ @NotNull List<ByteBuffer>
partitionKeys) throws IOException;
+
+ /**
+ * Convenience method around `readPartitionKeys` to accept partition keys
as string values and encode with the correct types.
+ *
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createStmt create table CQL statement
+ * @param ssTables set of SSTables to read
+ * @param rowConsumer Consumer interface to consume rows as they are read
to avoid buffering all rows in memory for consumption.
+ * @throws IOException
+ */
+ public void readStringPartitionKeys(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createStmt,
+ @NotNull Set<SSTable> ssTables,
+ @NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
+ {
+ readStringPartitionKeys(partitioner, keyspace, createStmt, ssTables,
null, null, null, rowConsumer);
+ }
+
+ /**
+ * Convenience method around `readPartitionKeys` to accept partition keys
as string values and encode with the correct types.
+ *
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createStmt create table CQL statement
+ * @param ssTables set of SSTables to read
+ * @param tokenRange optional token range to limit the bulk read to
a restricted token range.
+ * @param partitionKeys list of partition keys, if more than one
partition keys they must be correctly ordered in the inner list.
+ * @param pruneColumnFilter optional filter to select a subset of columns,
this can offer performance improvement if skipping over large blobs or columns.
+ * @param rowConsumer Consumer interface to consume rows as they are
read to avoid buffering all rows in memory for consumption.
+ * @throws IOException
+ */
+ public void readStringPartitionKeys(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createStmt,
+ @NotNull Set<SSTable> ssTables,
+ @Nullable TokenRange tokenRange,
+ @Nullable List<List<String>>
partitionKeys,
+ @Nullable String[] pruneColumnFilter,
+ @NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
+ {
+ readPartitionKeys(partitioner,
+ keyspace,
+ createStmt,
+ new BasicSupplier(ssTables),
+ tokenRange,
+ partitionKeys == null ? null :
encodePartitionKeys(partitioner, keyspace, createStmt, partitionKeys),
+ pruneColumnFilter,
+ rowConsumer);
+ }
+
+ public void readPartitionKeys(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createStmt,
+ @NotNull Set<SSTable> ssTables,
+ @NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
+ {
+ readPartitionKeys(partitioner, keyspace, createStmt, ssTables, null,
null, null, rowConsumer);
+ }
+
+ public void readPartitionKeys(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createStmt,
+ @NotNull Set<SSTable> ssTables,
+ @Nullable TokenRange tokenRange,
+ @Nullable List<ByteBuffer> partitionKeys,
+ @Nullable String[] pruneColumnFilter,
+ @NotNull Consumer<Map<String, Object>>
rowConsumer) throws IOException
+ {
+ readPartitionKeys(partitioner, keyspace, createStmt, new
BasicSupplier(ssTables), tokenRange, partitionKeys, pruneColumnFilter,
rowConsumer);
+ }
+
+ public abstract void readPartitionKeys(@NotNull Partitioner partitioner,
+ @NotNull String keyspace,
+ @NotNull String createStmt,
+ @NotNull SSTablesSupplier ssTables,
+ @Nullable TokenRange tokenRange,
+ @Nullable List<ByteBuffer>
partitionKeys,
+ @Nullable String[]
pruneColumnFilter,
+ @NotNull Consumer<Map<String,
Object>> rowConsumer) throws IOException;
+
// Kryo/Java (De-)Serialization
public abstract void kryoRegister(Kryo kryo);
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/Tokenizer.java
similarity index 54%
copy from
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
copy to
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/Tokenizer.java
index c032659..90c2ddd 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PruneColumnFilter.java
+++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/Tokenizer.java
@@ -17,36 +17,15 @@
* under the License.
*/
-package org.apache.cassandra.spark.sparksql.filters;
+package org.apache.cassandra.bridge;
-import java.util.Set;
-
-import org.jetbrains.annotations.NotNull;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
/**
- * Prune column push-down filter to skip reading columns that are not needed
+ * Interface that converts partition key to a BigInteger token
*/
-public class PruneColumnFilter
+public interface Tokenizer
{
- private final Set<String> requiredColumns;
-
- public PruneColumnFilter(@NotNull Set<String> requiredColumns)
- {
- this.requiredColumns = requiredColumns;
- }
-
- public Set<String> requiredColumns()
- {
- return requiredColumns;
- }
-
- public int size()
- {
- return requiredColumns.size();
- }
-
- public boolean includeColumn(String columnName)
- {
- return requiredColumns.contains(columnName);
- }
+ BigInteger toToken(ByteBuffer partitionKey);
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index e6d6db9..18d6694 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.bridge;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
@@ -32,21 +33,29 @@ import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
+import org.apache.cassandra.analytics.reader.common.IndexIterator;
+import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
@@ -62,6 +71,9 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -72,9 +84,10 @@ import org.apache.cassandra.spark.data.CqlType;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.data.SSTablesSupplier;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.data.TypeConverter;
import org.apache.cassandra.spark.data.complex.CqlTuple;
import org.apache.cassandra.spark.data.complex.CqlUdt;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.reader.CompactionStreamScanner;
import org.apache.cassandra.spark.reader.IndexEntry;
import org.apache.cassandra.spark.reader.IndexReader;
@@ -83,18 +96,20 @@ import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.SchemaBuilder;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.reader.SummaryDbUtils;
-import org.apache.cassandra.analytics.reader.common.IndexIterator;
+import org.apache.cassandra.spark.sparksql.CellIterator;
+import org.apache.cassandra.spark.sparksql.RowIterator;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.cassandra.tools.JsonTransformer;
import org.apache.cassandra.tools.Util;
import org.apache.cassandra.util.CompressionUtil;
+import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.CompressionUtilImplementation;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.TokenUtils;
import org.apache.cassandra.utils.UUIDGen;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -102,13 +117,9 @@ import org.jetbrains.annotations.Nullable;
@SuppressWarnings("unused")
public class CassandraBridgeImplementation extends CassandraBridge
{
- private final Map<Class<?>, Serializer<?>> kryoSerializers;
-
- public static void main(String[] args)
- {
-
System.out.println(UUIDGen.unixTimestamp(UUID.fromString("ac3f2f40-8637-11ef-a12b-2d1e10948b4b")));
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraBridgeImplementation.class);
- }
+ private final Map<Class<?>, Serializer<?>> kryoSerializers;
static
{
@@ -264,6 +275,206 @@ public class CassandraBridgeImplementation extends
CassandraBridge
return CompressionUtilImplementation.INSTANCE;
}
+ @Override
+ public long lastRepairTime(String keyspace, String table, SSTable ssTable)
throws IOException
+ {
+ Map<MetadataType, MetadataComponent> componentMap =
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable,
EnumSet.of(MetadataType.STATS));
+ StatsMetadata statsMetadata = (StatsMetadata)
componentMap.get(MetadataType.STATS);
+ if (statsMetadata == null)
+ {
+ throw new IllegalStateException("Could not read StatsMetadata");
+ }
+ return statsMetadata.repairedAt;
+ }
+
+ @Override
+ public List<Boolean> overlaps(SSTable ssTable,
+ Partitioner partitioner,
+ int minIndexInterval,
+ int maxIndexInterval,
+ List<TokenRange> ranges)
+ {
+ SSTableSummary summary = getSSTableSummary(partitioner, ssTable,
minIndexInterval, maxIndexInterval);
+ TokenRange sstableRange = TokenRange.closed(summary.firstToken,
summary.lastToken);
+ return ranges.stream()
+ .map(range -> range.isConnected(sstableRange))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Tokenizer tokenizer(Partitioner partitioner)
+ {
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ return partitionKey -> {
+ DecoratedKey decoratedKey = iPartitioner.decorateKey(partitionKey);
+ return TokenUtils.tokenToBigInteger(decoratedKey.getToken());
+ };
+ }
+
+ @Override
+ public List<ByteBuffer> encodePartitionKeys(Partitioner partitioner,
String keyspace, String createTableStmt, List<List<String>> keys)
+ {
+ CqlTable table = new SchemaBuilder(createTableStmt, keyspace,
ReplicationFactor.simpleStrategy(1), partitioner).build();
+ return keys.stream().map(key -> buildPartitionKey(table,
key)).collect(Collectors.toList());
+ }
+
+ @Override
+ public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner
partitioner,
+ String
keyspace,
+ String
table,
+ SSTable
ssTable) throws IOException
+ {
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace,
table, ssTable);
+ // closing `SharedCloseableImpl` instances is known to cause SIGSEGV
errors
+ BloomFilter filter = openBloomFilter(descriptor, ssTable);
+ return partitionKey -> {
+ DecoratedKey decoratedKey = iPartitioner.decorateKey(partitionKey);
+ return filter.isPresent(decoratedKey);
+ };
+ }
+
+ private BloomFilter openBloomFilter(Descriptor descriptor, SSTable
ssTable) throws IOException
+ {
+ return ReaderUtils.readFilter(ssTable, descriptor);
+ }
+
+ @Override
+ public List<Boolean> contains(Partitioner partitioner, String keyspace,
String table, SSTable ssTable, List<ByteBuffer> partitionKeys) throws
IOException
+ {
+ if (partitionKeys.isEmpty())
+ {
+ return Collections.emptyList();
+ }
+
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ List<DecoratedKey> decoratedKeys =
partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList());
+ Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace,
table, ssTable);
+ BloomFilter filter = openBloomFilter(descriptor, ssTable);
+ List<Boolean> result =
decoratedKeys.stream().map(filter::isPresent).collect(Collectors.toList());
+ if (result.stream().noneMatch(found -> found))
+ {
+ // no matches in the bloom filter, so we can exit early
+ return result;
+ }
+
+ // sorted by token with index into original partitionKeys list
+ List<Pair<BigInteger, Integer>> sortedByTokens = IntStream.range(0,
decoratedKeys.size())
+
.mapToObj(idx -> {
+
DecoratedKey key = decoratedKeys.get(idx);
+
BigInteger token = TokenUtils.tokenToBigInteger(key.getToken());
+ return
Pair.of(token, idx);
+ })
+
.sorted(Comparator.comparing(Pair::getLeft))
+
.collect(Collectors.toList());
+ try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+ {
+ if (primaryIndex == null)
+ {
+ throw new IOException("Could not read Index.db file");
+ }
+
+ final int[] position = new int[]{0};
+ ReaderUtils.readPrimaryIndex(primaryIndex, (buffer) -> {
+ DecoratedKey key = iPartitioner.decorateKey(buffer);
+ BigInteger token =
TokenUtils.tokenToBigInteger(key.getToken());
+
+ Pair<BigInteger, Integer> current =
sortedByTokens.get(position[0]);
+ int compare = token.compareTo(current.getLeft());
+ while (compare > 0)
+ {
+ // we passed without finding the key
+ result.set(current.getRight(), false);
+ position[0]++;
+ if (position[0] >= decoratedKeys.size())
+ {
+ // if we've found all the keys we can exit early
+ return true;
+ }
+ current = sortedByTokens.get(position[0]);
+ compare = token.compareTo(current.getLeft());
+ }
+
+ ByteBuffer currentKey = partitionKeys.get(current.getRight());
+ if (compare == 0 && buffer.equals(currentKey)) // token and
key matches
+ {
+ result.set(current.getRight(), true);
+ position[0]++;
+ }
+
+ // if we've found all the keys we can exit early
+ return position[0] >= decoratedKeys.size();
+ });
+
+ // mark as false any keys we didn't reach
+ IntStream.range(position[0], sortedByTokens.size())
+ .forEach(i ->
result.set(sortedByTokens.get(i).getRight(), false));
+ }
+
+ return result;
+ }
+
+ @Override
+ public void readPartitionKeys(Partitioner partitioner,
+ String keyspace,
+ String createStmt,
+ SSTablesSupplier ssTables,
+ @Nullable TokenRange tokenRange,
+ @Nullable List<ByteBuffer> partitionKeys,
+ @Nullable String[] requiredColumns,
+ Consumer<Map<String, Object>> rowConsumer)
throws IOException
+ {
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspace,
ReplicationFactor.simpleStrategy(1), partitioner);
+ TableMetadata metadata = schemaBuilder.tableMetaData();
+ CqlTable table = schemaBuilder.build();
+ List<BigInteger> tokens = partitionKeys == null ?
Collections.emptyList() : toTokens(partitioner, partitionKeys);
+ List<PartitionKeyFilter> partitionKeyFilters = partitionKeys == null ?
Collections.emptyList() :
+ IntStream
+ .range(0,
partitionKeys.size())
+ .mapToObj(i ->
PartitionKeyFilter.create(partitionKeys.get(i), tokens.get(i)))
+ .sorted()
+
.collect(Collectors.toList());
+
+ try (CellIterator it = new CellIterator(0,
+ table,
+ Stats.DoNothingStats.INSTANCE,
+ TypeConverter.IDENTITY,
+ partitionKeyFilters,
+ (t) ->
PruneColumnFilter.of(requiredColumns),
+ (partitionId1,
partitionKeyFilters1, columnFilter1) ->
+ new CompactionStreamScanner(
+ metadata,
+ partitioner,
+ TimeProvider.DEFAULT,
+ ssTables.openAll((ssTable,
isRepairPrimary) ->
+
org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
+
.withPartitionKeyFilters(partitionKeyFilters1)
+
.build())
+ ))
+ {
+ @Override
+ public boolean isInPartition(int partitionId, BigInteger token,
ByteBuffer partitionKey)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean equals(CqlField field, Object obj1, Object obj2)
+ {
+ return Objects.equals(obj1, obj2);
+ }
+ })
+ {
+ RowIterator<Map<String, Object>> rowIterator =
RowIterator.rowMapIterator(it, Stats.DoNothingStats.INSTANCE, requiredColumns);
+
+ while (rowIterator.next())
+ {
+ rowConsumer.accept(rowIterator.get());
+ }
+ }
+ }
+
@Override
public synchronized void writeSSTable(Partitioner partitioner,
String keyspace,
@@ -336,15 +547,32 @@ public class CassandraBridgeImplementation extends
CassandraBridge
{
throw new RuntimeException("Could not create table metadata needed
for reading SSTable summaries for keyspace: " + keyspace);
}
+ return getSSTableSummary(metadata.partitioner, ssTable,
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
+ }
+
+ @Override
+ public SSTableSummary getSSTableSummary(@NotNull Partitioner partitioner,
+ @NotNull SSTable ssTable,
+ int minIndexInterval,
+ int maxIndexInterval)
+ {
+ return getSSTableSummary(getPartitioner(partitioner), ssTable,
minIndexInterval, maxIndexInterval);
+ }
+
+ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner
partitioner,
+ @NotNull SSTable ssTable,
+ int minIndexInterval,
+ int maxIndexInterval)
+ {
try
{
- SummaryDbUtils.Summary summary =
SummaryDbUtils.readSummary(metadata, ssTable);
- Pair<DecoratedKey, DecoratedKey> keys =
Pair.create(summary.first(), summary.last());
- if (keys.left == null || keys.right == null)
+ SummaryDbUtils.Summary summary =
SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval,
maxIndexInterval);
+ Pair<DecoratedKey, DecoratedKey> keys = summary == null ? null :
Pair.of(summary.first(), summary.last());
+ if (summary == null)
{
- keys = ReaderUtils.keysFromIndex(metadata, ssTable);
+ keys = ReaderUtils.keysFromIndex(partitioner, ssTable);
}
- if (keys.left == null || keys.right == null)
+ if (keys == null)
{
throw new RuntimeException("Could not load SSTable first or
last tokens for SSTable: " + ssTable.getDataFileName());
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
index 0626479..1af3208 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
@@ -64,7 +64,7 @@ public class IndexReader implements IIndexReader
long startTimeNanos = now;
try
{
- File file = SSTableReader.constructFilename(metadata.keyspace,
metadata.name, ssTable.getDataFileName());
+ File file = ReaderUtils.constructFilename(metadata.keyspace,
metadata.name, ssTable.getDataFileName());
Descriptor descriptor = Descriptor.fromFilename(file);
Version version = descriptor.version;
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 1e7de7f..3c84591 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.reader;
import java.io.DataInputStream;
import java.io.EOFException;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -29,15 +30,17 @@ import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.DecoratedKey;
@@ -62,13 +65,14 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.BloomFilterSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TokenUtils;
import org.apache.cassandra.utils.vint.VIntCoding;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -83,6 +87,36 @@ public final class ReaderUtils extends TokenUtils
.orElseThrow(() -> new RuntimeException("Could not find
SerializationHeader.Component constructor"));
public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN =
ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ public static Descriptor constructDescriptor(@NotNull String keyspace,
@NotNull String table, @NotNull SSTable ssTable)
+ {
+ File file = ReaderUtils.constructFilename(keyspace, table,
ssTable.getDataFileName());
+ return Descriptor.fromFilename(file);
+ }
+
+ /**
+ * Constructs full file path for a given combination of keyspace, table,
and data file name,
+ * while adjusting for data files with non-standard names prefixed with
keyspace and table
+ *
+ * @param keyspace Name of the keyspace
+ * @param table Name of the table
+ * @param filename Name of the data file
+ * @return A full file path, adjusted for non-standard file names
+ */
+ @VisibleForTesting
+ @NotNull
+ public static File constructFilename(@NotNull String keyspace, @NotNull
String table, @NotNull String filename)
+ {
+ String[] components = filename.split("-");
+ if (components.length == 6
+ && components[0].equals(keyspace)
+ && components[1].equals(table))
+ {
+ filename = filename.substring(keyspace.length() + table.length() +
2);
+ }
+
+ return new File(String.format("./%s/%s", keyspace, table), filename);
+ }
+
static
{
SERIALIZATION_HEADER.setAccessible(true);
@@ -166,23 +200,30 @@ public final class ReaderUtils extends TokenUtils
return CompositeType.build(ByteBufferAccessor.instance, isStatic,
values);
}
+ @Nullable
public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull
TableMetadata metadata,
@NotNull
SSTable ssTable) throws IOException
+ {
+ return keysFromIndex(metadata.partitioner, ssTable);
+ }
+
+ @Nullable
+ public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull
IPartitioner partitioner,
+ @NotNull
SSTable ssTable) throws IOException
{
try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
{
if (primaryIndex != null)
{
- IPartitioner partitioner = metadata.partitioner;
- Pair<ByteBuffer, ByteBuffer> keys =
readPrimaryIndex(primaryIndex, true, Collections.emptyList());
- return Pair.create(partitioner.decorateKey(keys.left),
partitioner.decorateKey(keys.right));
+ Pair<ByteBuffer, ByteBuffer> keys =
primaryIndexReadFirstAndLastKey(primaryIndex);
+ return Pair.of(partitioner.decorateKey(keys.left),
partitioner.decorateKey(keys.right));
}
}
- return Pair.create(null, null);
+ return null;
}
- static boolean anyFilterKeyInIndex(@NotNull SSTable ssTable,
- @NotNull List<PartitionKeyFilter>
filters) throws IOException
+ public static boolean anyFilterKeyInIndex(@NotNull SSTable ssTable,
+ @NotNull
List<PartitionKeyFilter> filters) throws IOException
{
if (filters.isEmpty())
{
@@ -193,23 +234,37 @@ public final class ReaderUtils extends TokenUtils
{
if (primaryIndex != null)
{
- Pair<ByteBuffer, ByteBuffer> keys =
readPrimaryIndex(primaryIndex, false, filters);
- if (keys.left != null || keys.right != null)
- {
- return false;
- }
+ return primaryIndexContainsAnyKey(primaryIndex, filters);
}
}
- return true;
+
+ return true; // could not read primary index, so to be safe assume it
contains the keys
}
- static Map<MetadataType, MetadataComponent>
deserializeStatsMetadata(SSTable ssTable,
-
Descriptor descriptor) throws IOException
+ public static Map<MetadataType, MetadataComponent>
deserializeStatsMetadata(String keyspace,
+
String table,
+
SSTable ssTable,
+
EnumSet<MetadataType> selectedTypes) throws IOException
+ {
+ return deserializeStatsMetadata(ssTable, selectedTypes,
constructDescriptor(keyspace, table, ssTable));
+ }
+
+ public static Map<MetadataType, MetadataComponent>
deserializeStatsMetadata(SSTable ssTable,
+
Descriptor descriptor) throws IOException
+ {
+ return deserializeStatsMetadata(ssTable,
+ EnumSet.of(MetadataType.VALIDATION,
MetadataType.STATS, MetadataType.HEADER),
+ descriptor);
+ }
+
+ public static Map<MetadataType, MetadataComponent>
deserializeStatsMetadata(SSTable ssTable,
+
EnumSet<MetadataType> selectedTypes,
+
Descriptor descriptor) throws IOException
{
try (InputStream statsStream = ssTable.openStatsStream())
{
return deserializeStatsMetadata(statsStream,
-
EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER),
+ selectedTypes,
descriptor);
}
}
@@ -395,39 +450,69 @@ public final class ReaderUtils extends TokenUtils
return
TypeParser.parse(UTF8Type.instance.compose(ByteBufferUtil.readWithVIntLength(in)));
}
+ public static Pair<ByteBuffer, ByteBuffer>
primaryIndexReadFirstAndLastKey(@NotNull InputStream primaryIndex) throws
IOException
+ {
+ ByteBuffer[] firstAndLast = new ByteBuffer[]{null, null};
+ readPrimaryIndex(primaryIndex, (buffer) -> {
+ if (firstAndLast[0] == null)
+ {
+ firstAndLast[0] = buffer;
+ }
+ firstAndLast[1] = buffer;
+ return false; // never exit early
+ });
+ return Pair.of(firstAndLast[0], firstAndLast[1]);
+ }
+
+ /**
+ * Reads primary Index.db file returning true and exiting early if it
contains any of the PartitionKeyFilter
+ *
+ * @param primaryIndex input stream for Index.db file
+ * @param filters list of filters to search for
+ * @return true if Index.db file contains any of the keys
+ * @throws IOException
+ */
+ public static boolean primaryIndexContainsAnyKey(@NotNull InputStream
primaryIndex,
+ @NotNull
List<PartitionKeyFilter> filters) throws IOException
+ {
+ final boolean[] result = new boolean[]{false};
+ readPrimaryIndex(primaryIndex, (buffer) -> {
+ boolean anyMatch = filters.stream().anyMatch(filter ->
filter.matches(buffer));
+ if (anyMatch)
+ {
+ result[0] = true;
+ return true; // exit early, we found at least one key
+ }
+ return false;
+ });
+ return result[0];
+ }
+
/**
- * Read primary Index.db file, read through all partitions to get first
and last partition key
+ * Read primary Index.db file
*
* @param primaryIndex input stream for Index.db file
+ * @param tracker tracker that consumes each key byffer and returns
true if can exit early, otherwise continues to read primary index
* @return pair of first and last decorated keys
* @throws IOException
*/
- @SuppressWarnings("InfiniteLoopStatement")
- static Pair<ByteBuffer, ByteBuffer> readPrimaryIndex(@NotNull InputStream
primaryIndex,
- boolean
readFirstLastKey,
- @NotNull
List<PartitionKeyFilter> filters) throws IOException
+ public static void readPrimaryIndex(@NotNull InputStream primaryIndex,
+ @NotNull Function<ByteBuffer, Boolean>
tracker) throws IOException
{
- ByteBuffer firstKey = null;
- ByteBuffer lastKey = null;
try (DataInputStream dis = new DataInputStream(primaryIndex))
{
- byte[] last = null;
try
{
while (true)
{
int length = dis.readUnsignedShort();
- byte[] buffer = new byte[length];
- dis.readFully(buffer);
- if (firstKey == null)
+ byte[] array = new byte[length];
+ dis.readFully(array);
+ ByteBuffer buffer = ByteBuffer.wrap(array);
+ if (tracker.apply(buffer))
{
- firstKey = ByteBuffer.wrap(buffer);
- }
- last = buffer;
- ByteBuffer key = ByteBuffer.wrap(last);
- if (!readFirstLastKey && filters.stream().anyMatch(filter
-> filter.filter(key)))
- {
- return Pair.create(null, null);
+ // exit early if tracker returns true
+ return;
}
// Read position and skip promoted index
@@ -437,14 +522,7 @@ public final class ReaderUtils extends TokenUtils
catch (EOFException ignored)
{
}
-
- if (last != null)
- {
- lastKey = ByteBuffer.wrap(last);
- }
}
-
- return Pair.create(firstKey, lastKey);
}
static void skipRowIndexEntry(DataInputStream dis) throws IOException
@@ -505,7 +583,12 @@ public final class ReaderUtils extends TokenUtils
}
}
- static BloomFilter readFilter(@NotNull SSTable ssTable, boolean
hasOldBfFormat) throws IOException
+ public static BloomFilter readFilter(@NotNull SSTable ssTable, Descriptor
descriptor) throws IOException
+ {
+ return readFilter(ssTable, descriptor.version.hasOldBfFormat());
+ }
+
+ public static BloomFilter readFilter(@NotNull SSTable ssTable, boolean
hasOldBfFormat) throws IOException
{
try (InputStream filterStream = ssTable.openFilterStream())
{
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
index 6e9079e..e4c3ebc 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java
@@ -40,9 +40,9 @@ import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -116,6 +116,7 @@ public class SSTableCache
return get(summary, ssTable, () ->
SummaryDbUtils.readSummary(metadata, ssTable));
}
+ @Nullable
public Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull
TableMetadata metadata,
@NotNull SSTable
ssTable) throws IOException
{
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 085bc4b..8f821d3 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.spark.reader;
import java.io.DataInputStream;
import java.io.EOFException;
-import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.math.BigInteger;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import org.slf4j.Logger;
@@ -84,9 +82,9 @@ import
org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -156,9 +154,12 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
return this;
}
- public Builder withPartitionKeyFilters(@NotNull
Collection<PartitionKeyFilter> partitionKeyFilters)
+ public Builder withPartitionKeyFilters(@Nullable
Collection<PartitionKeyFilter> partitionKeyFilters)
{
- this.partitionKeyFilters.addAll(partitionKeyFilters);
+ if (partitionKeyFilters != null)
+ {
+ this.partitionKeyFilters.addAll(partitionKeyFilters);
+ }
return this;
}
@@ -243,25 +244,24 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
this.isRepaired = isRepaired;
this.sparkRangeFilter = sparkRangeFilter;
- File file = constructFilename(metadata.keyspace, metadata.name,
ssTable.getDataFileName());
- Descriptor descriptor = Descriptor.fromFilename(file);
+ Descriptor descriptor =
ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
this.version = descriptor.version;
SummaryDbUtils.Summary summary = null;
- Pair<DecoratedKey, DecoratedKey> keys = Pair.create(null, null);
+ Pair<DecoratedKey, DecoratedKey> keys = null;
try
{
now = System.nanoTime();
summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
stats.readSummaryDb(ssTable, System.nanoTime() - now);
- keys = Pair.create(summary.first(), summary.last());
+ keys = Pair.of(summary.first(), summary.last());
}
catch (IOException exception)
{
LOGGER.warn("Failed to read Summary.db file ssTable='{}'",
ssTable, exception);
}
- if (keys.left == null || keys.right == null)
+ if (keys == null)
{
LOGGER.warn("Could not load first and last key from Summary.db
file, so attempting Index.db fileName={}",
ssTable.getDataFileName());
@@ -270,7 +270,7 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
stats.readIndexDb(ssTable, System.nanoTime() - now);
}
- if (keys.left == null || keys.right == null)
+ if (keys == null)
{
throw new IOException("Could not load SSTable first or last
tokens");
}
@@ -406,30 +406,6 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
this.openedNanos = System.nanoTime();
}
- /**
- * Constructs full file path for a given combination of keyspace, table,
and data file name,
- * while adjusting for data files with non-standard names prefixed with
keyspace and table
- *
- * @param keyspace Name of the keyspace
- * @param table Name of the table
- * @param filename Name of the data file
- * @return A full file path, adjusted for non-standard file names
- */
- @VisibleForTesting
- @NotNull
- static File constructFilename(@NotNull String keyspace, @NotNull String
table, @NotNull String filename)
- {
- String[] components = filename.split("-");
- if (components.length == 6
- && components[0].equals(keyspace)
- && components[1].equals(table))
- {
- filename = filename.substring(keyspace.length() + table.length() +
2);
- }
-
- return new File(String.format("./%s/%s", keyspace, table), filename);
- }
-
private static Map<ByteBuffer, DroppedColumn> buildDroppedColumns(String
keyspace,
String
table,
SSTable
ssTable,
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
index 0a2df42..a7cc98d 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Helper methods for reading the Summary.db SSTable file component
@@ -75,10 +76,16 @@ public final class SummaryDbUtils
}
public static Summary readSummary(@NotNull TableMetadata metadata,
@NotNull SSTable ssTable) throws IOException
+ {
+ return readSummary(ssTable, metadata.partitioner,
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
+ }
+
+ @Nullable
+ public static Summary readSummary(@NotNull SSTable ssTable, IPartitioner
partitioner, int minIndexInterval, int maxIndexInterval) throws IOException
{
try (InputStream in = ssTable.openSummaryStream())
{
- return readSummary(in, metadata.partitioner,
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
+ return readSummary(in, partitioner, minIndexInterval,
maxIndexInterval);
}
}
@@ -92,6 +99,7 @@ public final class SummaryDbUtils
* @return Summary object
* @throws IOException io exception
*/
+ @Nullable
static Summary readSummary(InputStream summaryStream,
IPartitioner partitioner,
int minIndexInterval,
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
index c3f4fa0..b9c2eb8 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java
@@ -55,10 +55,10 @@ import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.TemporaryDirectory;
import org.apache.cassandra.spark.utils.test.TestSSTable;
import org.apache.cassandra.spark.utils.test.TestSchema;
-import org.apache.cassandra.utils.Pair;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,7 +98,7 @@ public class ReaderUtilsTests
String dataFile =
TestSSTable.firstIn(directory.path()).getDataFileName();
Descriptor descriptor = Descriptor.fromFilename(
- new File(String.format("./%s/%s", schema.keyspace,
schema.table), dataFile));
+ new File(String.format("./%s/%s", schema.keyspace,
schema.table), dataFile));
Path statsFile = TestSSTable.firstIn(directory.path(),
FileType.STATISTICS);
// Deserialize stats meta data and verify components match
expected values
@@ -178,9 +178,9 @@ public class ReaderUtilsTests
Pair<DecoratedKey, DecoratedKey> indexKeys;
try (InputStream in = new
BufferedInputStream(Files.newInputStream(indexFile)))
{
- Pair<ByteBuffer, ByteBuffer> keys =
ReaderUtils.readPrimaryIndex(in, true, Collections.emptyList());
- indexKeys =
Pair.create(Murmur3Partitioner.instance.decorateKey(keys.left),
-
Murmur3Partitioner.instance.decorateKey(keys.right));
+ Pair<ByteBuffer, ByteBuffer> keys =
ReaderUtils.primaryIndexReadFirstAndLastKey(in);
+ indexKeys =
Pair.of(Murmur3Partitioner.instance.decorateKey(keys.left),
+
Murmur3Partitioner.instance.decorateKey(keys.right));
}
assertNotNull(indexKeys);
assertEquals(indexKeys.left, summaryKeys.first());
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
index 4ffe403..ec40c7b 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java
@@ -37,11 +37,11 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.TemporaryDirectory;
import org.apache.cassandra.spark.utils.test.TestSSTable;
import org.apache.cassandra.spark.utils.test.TestSchema;
import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.Pair;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
index 4c9a402..282b36e 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
@@ -76,11 +76,11 @@ import
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
import org.apache.cassandra.spark.utils.TemporaryDirectory;
import org.apache.cassandra.spark.utils.Throwing;
import org.apache.cassandra.spark.utils.test.TestSSTable;
import org.apache.cassandra.spark.utils.test.TestSchema;
-import org.apache.cassandra.utils.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -276,9 +276,9 @@ public class SSTableReaderTests
Pair<DecoratedKey, DecoratedKey> firstAndLast;
try (InputStream is = new BufferedInputStream(new
FileInputStream(indexFile.toFile())))
{
- Pair<ByteBuffer, ByteBuffer> keys =
ReaderUtils.readPrimaryIndex(is, true, Collections.emptyList());
- firstAndLast =
Pair.create(BRIDGE.getPartitioner(partitioner).decorateKey(keys.left),
-
BRIDGE.getPartitioner(partitioner).decorateKey(keys.right));
+ Pair<ByteBuffer, ByteBuffer> keys =
ReaderUtils.primaryIndexReadFirstAndLastKey(is);
+ firstAndLast =
Pair.of(BRIDGE.getPartitioner(partitioner).decorateKey(keys.left),
+
BRIDGE.getPartitioner(partitioner).decorateKey(keys.right));
}
BigInteger first =
ReaderUtils.tokenToBigInteger(firstAndLast.left.getToken());
BigInteger last =
ReaderUtils.tokenToBigInteger(firstAndLast.right.getToken());
@@ -753,25 +753,25 @@ public class SSTableReaderTests
{
// Standard SSTable data file name
assertEquals(new File("./keyspace/table/na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"na-1-big-Data.db"));
// Non-standard SSTable data file name
assertEquals(new File("./keyspace/table/na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "keyspace-table-na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"keyspace-table-na-1-big-Data.db"));
// Malformed SSTable data file names
assertEquals(new
File("./keyspace/table/keyspace-table-qwerty-na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "keyspace-table-qwerty-na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"keyspace-table-qwerty-na-1-big-Data.db"));
assertEquals(new
File("./keyspace/table/keyspace-qwerty-na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "keyspace-qwerty-na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"keyspace-qwerty-na-1-big-Data.db"));
assertEquals(new
File("./keyspace/table/qwerty-table-na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "qwerty-table-na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"qwerty-table-na-1-big-Data.db"));
assertEquals(new File("./keyspace/table/keyspace-na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "keyspace-na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"keyspace-na-1-big-Data.db"));
assertEquals(new File("./keyspace/table/table-na-1-big-Data.db"),
- SSTableReader.constructFilename("keyspace",
"table", "table-na-1-big-Data.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"table-na-1-big-Data.db"));
assertEquals(new File("./keyspace/table/qwerty.db"),
- SSTableReader.constructFilename("keyspace",
"table", "qwerty.db"));
+ ReaderUtils.constructFilename("keyspace", "table",
"qwerty.db"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]