yifan-c commented on code in PR #98:
URL:
https://github.com/apache/cassandra-analytics/pull/98#discussion_r1943398707
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java:
##########
@@ -23,6 +23,8 @@
public interface TypeConverter
{
+ TypeConverter STUB = (cqlType, value, isFrozen) -> value;
Review Comment:
My understanding of the term stub is a placeholder.
The function is more like `Function.identity`, which returns the input
as-is.
Maybe we name it `IDENTITY` too?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java:
##########
@@ -96,6 +98,12 @@ public static ReplicationStrategy getEnum(String value)
}
}
+ public static ReplicationFactor simple(int rf)
Review Comment:
nit: how about `simpleStrategy`?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java:
##########
@@ -58,6 +65,55 @@ protected RowIterator(CellIterator it,
this.builder = newBuilder(decorator);
}
+ /**
+ * @param it CellIterator cell iterate instance to iterate
over each cell in the data.
+ * @param stats stats instance to emit KPIs
+ * @param requiredColumns optional column filter to select a subset of
columns, if null then all columns are returned.
+ * @return a basic RowIterator that transforms Object[] to a Map keyed on
column name
+ */
+ public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator
it, Stats stats, @Nullable String[] requiredColumns)
+ {
+ final CqlTable cqlTable = it.cqlTable();
+ final boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
Review Comment:
please remove the `final`s
##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws
IOException
public abstract CompressionUtil compressionUtil();
+ // additional SSTable utils methods
+
+ /**
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @return last repair time for a given SSTable by reading the
Statistics.db file.
+ * @throws IOException
+ */
+ public abstract long lastRepairTime(String keyspace, String table, SSTable
ssTable) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param partitioner Cassandra partitioner
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @return the first and last token by attempting to read from the
Summary.db file first then failing back to the Index.db.
+ * @throws IOException
+ */
+ public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable
ssTable,
+ Partitioner
partitioner,
+ int
minIndexInterval,
+ int
maxIndexInterval) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param partitioner Cassandra partitioner
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @param ranges a list of token ranges
+ * @return a list boolean value if corresponding token range in `ranges`
list parameter overlaps with the SSTable.
+ * The SSTable may or may not contain data for the range.
+ */
+ public abstract List<Boolean> overlaps(SSTable ssTable,
+ Partitioner partitioner,
+ int minIndexInterval,
+ int maxIndexInterval,
+ List<TokenRange> ranges) throws
IOException;
Review Comment:
It seems to be out of the scope if the range of the sstable is already
exposed, since client should be able to check quite easily with the list of
ranges. But I am not strongly against adding it.
##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws
IOException
public abstract CompressionUtil compressionUtil();
+ // additional SSTable utils methods
+
+ /**
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @return last repair time for a given SSTable by reading the
Statistics.db file.
+ * @throws IOException
+ */
+ public abstract long lastRepairTime(String keyspace, String table, SSTable
ssTable) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param partitioner Cassandra partitioner
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @return the first and last token by attempting to read from the
Summary.db file first then failing back to the Index.db.
+ * @throws IOException
+ */
+ public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable
ssTable,
+ Partitioner
partitioner,
+ int
minIndexInterval,
+ int
maxIndexInterval) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param partitioner Cassandra partitioner
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @param ranges a list of token ranges
+ * @return a list boolean value if corresponding token range in `ranges`
list parameter overlaps with the SSTable.
+ * The SSTable may or may not contain data for the range.
+ */
+ public abstract List<Boolean> overlaps(SSTable ssTable,
+ Partitioner partitioner,
+ int minIndexInterval,
+ int maxIndexInterval,
+ List<TokenRange> ranges) throws
IOException;
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace Cassandra keyspace
+ * @param createTableStmt CQL table create statement
+ * @param partitionKeys list of
+ * @return list of tokens corresponding to each input `partitionKeys`
+ */
+ public List<BigInteger> toTokens(Partitioner partitioner, String keyspace,
String createTableStmt, List<List<String>> partitionKeys)
Review Comment:
Nit: I am wonder if we can expose some sort of `Tokenizer` instead to
convert bytebuffer to token? And the tokenizer can be composed with
`encodePartitionKey` to handle the string input.
##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java:
##########
@@ -264,6 +276,244 @@ public CompressionUtil compressionUtil()
return CompressionUtilImplementation.INSTANCE;
}
+ @Override
+ public long lastRepairTime(String keyspace, String table, SSTable ssTable)
throws IOException
+ {
+ Map<MetadataType, MetadataComponent> componentMap =
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable,
EnumSet.of(MetadataType.STATS));
+ StatsMetadata statsMetadata = (StatsMetadata)
componentMap.get(MetadataType.STATS);
+ if (statsMetadata == null)
+ {
+ throw new IllegalStateException("Could not read StatsMetadata");
+ }
+ return statsMetadata.repairedAt;
+ }
+
+ @Override
+ public Pair<BigInteger, BigInteger> firstLastToken(SSTable ssTable,
Partitioner partitioner, int minIndexInterval, int maxIndexInterval) throws
IOException
+ {
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+
+ // attempt Summary.db file first
+ Pair<BigInteger, BigInteger> firstAndLast = null;
+ try
+ {
+ SummaryDbUtils.Summary summary =
SummaryDbUtils.readSummary(ssTable, iPartitioner, minIndexInterval,
maxIndexInterval);
+ if (summary != null)
+ {
+ BigInteger first =
TokenUtils.tokenToBigInteger(summary.first().getToken());
+ BigInteger last =
TokenUtils.tokenToBigInteger(summary.last().getToken());
+ firstAndLast = Pair.of(first, last);
+ }
+ }
+ catch (IOException e)
+ {
+ // this can happen if the minIndexInterval and maxIndexInterval do
not match the expected serialized in the Summary.db file
+ LOGGER.warn("IOException reading Summary.db file", e);
+ }
+
+ if (firstAndLast == null)
+ {
+ // try Index.db file
+ LOGGER.warn("Failed to read Summary.db file, falling back to
Index.db file");
+ Pair<DecoratedKey, DecoratedKey> keys =
ReaderUtils.keysFromIndex(iPartitioner, ssTable);
+ Token first = keys.left.getToken();
+ Token last = keys.right.getToken();
+ if (first == null || last == null)
+ {
+ throw new RuntimeException("Unable to extract first and last
keys from Summary.db or Index.db");
+ }
+ firstAndLast = Pair.of(TokenUtils.tokenToBigInteger(first),
TokenUtils.tokenToBigInteger(last));
+ }
+
+ return firstAndLast;
+ }
+
+ @Override
+ public List<Boolean> overlaps(SSTable ssTable,
+ Partitioner partitioner,
+ int minIndexInterval,
+ int maxIndexInterval,
+ List<TokenRange> ranges) throws IOException
+ {
+ Pair<BigInteger, BigInteger> firstAndLastKeys =
firstLastToken(ssTable, partitioner, minIndexInterval, maxIndexInterval);
+ TokenRange sstableRange = TokenRange.closed(firstAndLastKeys.left,
firstAndLastKeys.right);
+ return ranges.stream()
+ .map(range -> range.isConnected(sstableRange))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<BigInteger> toTokens(Partitioner partitioner, @NotNull
List<ByteBuffer> partitionKeys)
+ {
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ return partitionKeys
+ .stream()
+ .map(key -> {
+ DecoratedKey decoratedKey = iPartitioner.decorateKey(key);
+ return
TokenUtils.tokenToBigInteger(decoratedKey.getToken());
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<ByteBuffer> encodePartitionKeys(Partitioner partitioner,
String keyspace, String createTableStmt, List<List<String>> keys)
+ {
+ CqlTable table = new SchemaBuilder(createTableStmt, keyspace,
ReplicationFactor.simple(1), partitioner).build();
+ return keys.stream().map(key -> buildPartitionKey(table,
key)).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<Boolean> maybeContains(Partitioner partitioner,
+ String keyspace,
+ String table,
+ SSTable ssTable,
+ List<ByteBuffer> partitionKeys) throws
IOException
+ {
+ if (partitionKeys.isEmpty())
+ {
+ return List.of();
+ }
+ IPartitioner iPartitioner = getPartitioner(partitioner);
+ Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace,
table, ssTable);
+ return maybeContains(descriptor, ssTable,
partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList()));
+ }
+
+ private List<Boolean> maybeContains(Descriptor descriptor, SSTable
ssTable, Collection<DecoratedKey> partitionKeys) throws IOException
+ {
+ if (partitionKeys.isEmpty())
+ {
+ return List.of();
+ }
+ BloomFilter filter = ReaderUtils.readFilter(ssTable, descriptor);
Review Comment:
`BloomFilter` is closable, and it is not closed.
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -272,6 +285,28 @@ public static ByteBuffer buildPartitionKey(List<CqlField>
partitionKeys, Object.
return build(false, buffers);
}
+ public static Map<String, Object> decodePartitionKey(CqlTable table,
ByteBuffer partitionKey)
Review Comment:
Add a test for it?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java:
##########
@@ -58,6 +65,55 @@ protected RowIterator(CellIterator it,
this.builder = newBuilder(decorator);
}
+ /**
+ * @param it CellIterator cell iterate instance to iterate
over each cell in the data.
+ * @param stats stats instance to emit KPIs
+ * @param requiredColumns optional column filter to select a subset of
columns, if null then all columns are returned.
+ * @return a basic RowIterator that transforms Object[] to a Map keyed on
column name
+ */
+ public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator
it, Stats stats, @Nullable String[] requiredColumns)
+ {
+ final CqlTable cqlTable = it.cqlTable();
+ final boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
+ return new RowIterator<>(it, stats, requiredColumns,
Function.identity())
+ {
+ @Override
+ public PartialRowBuilder<Map<String, Object>> newPartialBuilder()
+ {
+ Objects.requireNonNull(requiredColumns, "requiredColumns must
be non-null to use PartialRowBuilder");
Review Comment:
nit: can you move the validation inside the `PartialRowBuilder#constructor`?
The same validation is done in both here and `AbstractSparkRowIterator`
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -91,6 +94,16 @@ public static byte[] getArray(ByteBuffer buffer)
return bytes;
}
+ public static String readShortLengthString(ByteBuffer buf)
+ {
+ int len = buf.getShort();
+ byte[] ar = new byte[len];
+ buf.get(ar);
+ String str = new String(ar, StandardCharsets.UTF_8);
+ buf.get();
Review Comment:
why consume an extra byte and throw away? It is not a behavior of reading
string, but to decode from `CompositeType` bytes
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java:
##########
@@ -58,6 +65,55 @@ protected RowIterator(CellIterator it,
this.builder = newBuilder(decorator);
}
+ /**
+ * @param it CellIterator cell iterate instance to iterate
over each cell in the data.
+ * @param stats stats instance to emit KPIs
+ * @param requiredColumns optional column filter to select a subset of
columns, if null then all columns are returned.
+ * @return a basic RowIterator that transforms Object[] to a Map keyed on
column name
+ */
+ public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator
it, Stats stats, @Nullable String[] requiredColumns)
+ {
+ final CqlTable cqlTable = it.cqlTable();
+ final boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
+ return new RowIterator<>(it, stats, requiredColumns,
Function.identity())
+ {
+ @Override
+ public PartialRowBuilder<Map<String, Object>> newPartialBuilder()
+ {
+ Objects.requireNonNull(requiredColumns, "requiredColumns must
be non-null to use PartialRowBuilder");
+ Map<String, Integer> columnIndex = IntStream.range(0,
requiredColumns.length)
+ .boxed()
+
.collect(Collectors.toMap(i -> requiredColumns[i], Function.identity()));
+
+ return new PartialRowBuilder<>(
+ requiredColumns, it.cqlTable(), hasProjectedValueColumns,
+ (valueArray) -> {
+ Map<String, Object> row = new
HashMap<>(requiredColumns.length);
+ for (String field : requiredColumns)
+ {
+ row.put(field, valueArray[columnIndex.get(field)]);
+ }
Review Comment:
The `columnIndex` map is probably not needed if you populate the row this
way.
```suggestion
for (int i = 0; i < requiredColumns.length; i++)
{
String field = requiredColumns[i];
row.put(field, valueArray[i]);
}
```
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java:
##########
@@ -33,7 +33,7 @@
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.jetbrains.annotations.NotNull;
-public final class PartitionKeyFilter implements Serializable
+public final class PartitionKeyFilter implements Serializable,
Comparable<PartitionKeyFilter>
Review Comment:
It is unclear to me about the semantics of comparing two filters. Can you
explain?
And why removing the `filter` method?
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java:
##########
@@ -388,4 +388,16 @@ public static Range<BigInteger> range(BigInteger start,
BigInteger end)
{
return Range.openClosed(start, end);
}
+
+ public static void createDirectory(Path directory)
Review Comment:
Just to mention that the same functionality is already provided by junit.
But I am fine with defining our own testing utility.
##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws
IOException
public abstract CompressionUtil compressionUtil();
+ // additional SSTable utils methods
+
+ /**
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @return last repair time for a given SSTable by reading the
Statistics.db file.
+ * @throws IOException
+ */
+ public abstract long lastRepairTime(String keyspace, String table, SSTable
ssTable) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param partitioner Cassandra partitioner
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @return the first and last token by attempting to read from the
Summary.db file first then failing back to the Index.db.
+ * @throws IOException
+ */
+ public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable
ssTable,
Review Comment:
naming is hard.. I was wondering what is the token that is the first of the
last tokens..
Anyway, there is already a method to expose
`org.apache.cassandra.bridge.SSTableSummary`. This new method is sort of
redundant, in the presence of
`org.apache.cassandra.bridge.CassandraBridgeImplementation#getSSTableSummary`.
Probably merge them or make it an overload method.
##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws
IOException
public abstract CompressionUtil compressionUtil();
+ // additional SSTable utils methods
+
+ /**
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @return last repair time for a given SSTable by reading the
Statistics.db file.
+ * @throws IOException
+ */
+ public abstract long lastRepairTime(String keyspace, String table, SSTable
ssTable) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param partitioner Cassandra partitioner
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @return the first and last token by attempting to read from the
Summary.db file first then failing back to the Index.db.
+ * @throws IOException
+ */
+ public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable
ssTable,
+ Partitioner
partitioner,
+ int
minIndexInterval,
+ int
maxIndexInterval) throws IOException;
+
+ /**
+ * @param ssTable SSTable instance
+ * @param minIndexInterval minIndexInterval configured in the TableMetaData
+ * @param partitioner Cassandra partitioner
+ * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+ * @param ranges a list of token ranges
+ * @return a list boolean value if corresponding token range in `ranges`
list parameter overlaps with the SSTable.
+ * The SSTable may or may not contain data for the range.
+ */
+ public abstract List<Boolean> overlaps(SSTable ssTable,
+ Partitioner partitioner,
+ int minIndexInterval,
+ int maxIndexInterval,
+ List<TokenRange> ranges) throws
IOException;
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace Cassandra keyspace
+ * @param createTableStmt CQL table create statement
+ * @param partitionKeys list of
+ * @return list of tokens corresponding to each input `partitionKeys`
+ */
+ public List<BigInteger> toTokens(Partitioner partitioner, String keyspace,
String createTableStmt, List<List<String>> partitionKeys)
+ {
+ return toTokens(partitioner, encodePartitionKeys(partitioner,
keyspace, createTableStmt, partitionKeys));
+ }
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param partitionKeys list of encoded partition keys
+ * @return list of tokens corresponding to each input `partitionKeys`
+ */
+ public abstract List<BigInteger> toTokens(Partitioner partitioner,
@NotNull List<ByteBuffer> partitionKeys);
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createTableStmt CQL create table statement
+ * @param partitionKey partition key
+ * @return encoded ByteBuffer for the input `partitionKey`
+ */
+ public ByteBuffer encodePartitionKey(Partitioner partitioner, String
keyspace, String createTableStmt, List<String> partitionKey)
+ {
+ return encodePartitionKeys(partitioner, keyspace, createTableStmt,
Collections.singletonList(partitionKey)).get(0);
+ }
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param createTableStmt CQL create table statement
+ * @param partitionKeys list of partition keys
+ * @return a list encoded ByteBuffers corresponding to the partition keys
input in `partitionKeys`
+ */
+ public abstract List<ByteBuffer> encodePartitionKeys(Partitioner
partitioner, String keyspace, String createTableStmt, List<List<String>>
partitionKeys);
+
+ /**
+ * @param partitioner Cassandra partitioner
+ * @param keyspace keyspace name
+ * @param table table name
+ * @param ssTable SSTable instance
+ * @param partitionKeys list of partition keys
+ * @return list of booleans returning true if an SSTable might contain a
partition key
+ * (might return false-positives but never false-negatives),
+ * corresponding to the partition keys input in `partitionKeys`.
+ * @throws IOException
+ */
+ public abstract List<Boolean> maybeContains(Partitioner partitioner,
Review Comment:
Can you add test?
Another concern I have is that `maybeContains` and `contains` can be
confusing. Their implementations differ completely, and they have different
performance characteristics. I am wondering if it is more clear to just return
a `BloomFilter` object here and client check the with the `BloomFilter`.
The `BloomFilter` would be a new class defined in the bridge subproject, not
the one vended from cassandra library. It could be simply an extension to
`Predicate<ByteBuffer>`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]