yifan-c commented on code in PR #98:
URL: 
https://github.com/apache/cassandra-analytics/pull/98#discussion_r1943398707


##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java:
##########
@@ -23,6 +23,8 @@
 
 public interface TypeConverter
 {
+    TypeConverter STUB = (cqlType, value, isFrozen) -> value;

Review Comment:
   My understanding of the term stub is a placeholder. 
   
   The function is more like `Function.identity`, which returns the input 
as-is. 
   
   Maybe we name it `IDENTITY` too? 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/ReplicationFactor.java:
##########
@@ -96,6 +98,12 @@ public static ReplicationStrategy getEnum(String value)
         }
     }
 
+    public static ReplicationFactor simple(int rf)

Review Comment:
   nit: how about `simpleStrategy`? 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java:
##########
@@ -58,6 +65,55 @@ protected RowIterator(CellIterator it,
         this.builder = newBuilder(decorator);
     }
 
+    /**
+     * @param it              CellIterator cell iterate instance to iterate 
over each cell in the data.
+     * @param stats           stats instance to emit KPIs
+     * @param requiredColumns optional column filter to select a subset of 
columns, if null then all columns are returned.
+     * @return a basic RowIterator that transforms Object[] to a Map keyed on 
column name
+     */
+    public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator 
it, Stats stats, @Nullable String[] requiredColumns)
+    {
+        final CqlTable cqlTable = it.cqlTable();
+        final boolean hasProjectedValueColumns = it.hasProjectedValueColumns();

Review Comment:
   please remove the `final`s



##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws 
IOException
 
     public abstract CompressionUtil compressionUtil();
 
+    // additional SSTable utils methods
+
+    /**
+     * @param keyspace keyspace name
+     * @param table    table name
+     * @param ssTable  SSTable instance
+     * @return last repair time for a given SSTable by reading the 
Statistics.db file.
+     * @throws IOException
+     */
+    public abstract long lastRepairTime(String keyspace, String table, SSTable 
ssTable) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param partitioner      Cassandra partitioner
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @return the first and last token by attempting to read from the 
Summary.db file first then failing back to the Index.db.
+     * @throws IOException
+     */
+    public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable 
ssTable,
+                                                                Partitioner 
partitioner,
+                                                                int 
minIndexInterval,
+                                                                int 
maxIndexInterval) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param partitioner      Cassandra partitioner
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @param ranges           a list of token ranges
+     * @return a list boolean value if corresponding token range in `ranges` 
list parameter overlaps with the SSTable.
+     * The SSTable may or may not contain data for the range.
+     */
+    public abstract List<Boolean> overlaps(SSTable ssTable,
+                                           Partitioner partitioner,
+                                           int minIndexInterval,
+                                           int maxIndexInterval,
+                                           List<TokenRange> ranges) throws 
IOException;

Review Comment:
   It seems to be out of the scope if the range of the sstable is already 
exposed, since client should be able to check quite easily with the list of 
ranges. But I am not strongly against adding it. 



##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws 
IOException
 
     public abstract CompressionUtil compressionUtil();
 
+    // additional SSTable utils methods
+
+    /**
+     * @param keyspace keyspace name
+     * @param table    table name
+     * @param ssTable  SSTable instance
+     * @return last repair time for a given SSTable by reading the 
Statistics.db file.
+     * @throws IOException
+     */
+    public abstract long lastRepairTime(String keyspace, String table, SSTable 
ssTable) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param partitioner      Cassandra partitioner
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @return the first and last token by attempting to read from the 
Summary.db file first then failing back to the Index.db.
+     * @throws IOException
+     */
+    public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable 
ssTable,
+                                                                Partitioner 
partitioner,
+                                                                int 
minIndexInterval,
+                                                                int 
maxIndexInterval) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param partitioner      Cassandra partitioner
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @param ranges           a list of token ranges
+     * @return a list boolean value if corresponding token range in `ranges` 
list parameter overlaps with the SSTable.
+     * The SSTable may or may not contain data for the range.
+     */
+    public abstract List<Boolean> overlaps(SSTable ssTable,
+                                           Partitioner partitioner,
+                                           int minIndexInterval,
+                                           int maxIndexInterval,
+                                           List<TokenRange> ranges) throws 
IOException;
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        Cassandra keyspace
+     * @param createTableStmt CQL table create statement
+     * @param partitionKeys   list of
+     * @return list of tokens corresponding to each input `partitionKeys`
+     */
+    public List<BigInteger> toTokens(Partitioner partitioner, String keyspace, 
String createTableStmt, List<List<String>> partitionKeys)

Review Comment:
   Nit: I am wonder if we can expose some sort of `Tokenizer` instead to 
convert bytebuffer to token? And the tokenizer can be composed with 
`encodePartitionKey` to handle the string input.



##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java:
##########
@@ -264,6 +276,244 @@ public CompressionUtil compressionUtil()
         return CompressionUtilImplementation.INSTANCE;
     }
 
+    @Override
+    public long lastRepairTime(String keyspace, String table, SSTable ssTable) 
throws IOException
+    {
+        Map<MetadataType, MetadataComponent> componentMap = 
ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, 
EnumSet.of(MetadataType.STATS));
+        StatsMetadata statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+        if (statsMetadata == null)
+        {
+            throw new IllegalStateException("Could not read StatsMetadata");
+        }
+        return statsMetadata.repairedAt;
+    }
+
+    @Override
+    public Pair<BigInteger, BigInteger> firstLastToken(SSTable ssTable, 
Partitioner partitioner, int minIndexInterval, int maxIndexInterval) throws 
IOException
+    {
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+
+        // attempt Summary.db file first
+        Pair<BigInteger, BigInteger> firstAndLast = null;
+        try
+        {
+            SummaryDbUtils.Summary summary = 
SummaryDbUtils.readSummary(ssTable, iPartitioner, minIndexInterval, 
maxIndexInterval);
+            if (summary != null)
+            {
+                BigInteger first = 
TokenUtils.tokenToBigInteger(summary.first().getToken());
+                BigInteger last = 
TokenUtils.tokenToBigInteger(summary.last().getToken());
+                firstAndLast = Pair.of(first, last);
+            }
+        }
+        catch (IOException e)
+        {
+            // this can happen if the minIndexInterval and maxIndexInterval do 
not match the expected serialized in the Summary.db file
+            LOGGER.warn("IOException reading Summary.db file", e);
+        }
+
+        if (firstAndLast == null)
+        {
+            // try Index.db file
+            LOGGER.warn("Failed to read Summary.db file, falling back to 
Index.db file");
+            Pair<DecoratedKey, DecoratedKey> keys = 
ReaderUtils.keysFromIndex(iPartitioner, ssTable);
+            Token first = keys.left.getToken();
+            Token last = keys.right.getToken();
+            if (first == null || last == null)
+            {
+                throw new RuntimeException("Unable to extract first and last 
keys from Summary.db or Index.db");
+            }
+            firstAndLast = Pair.of(TokenUtils.tokenToBigInteger(first), 
TokenUtils.tokenToBigInteger(last));
+        }
+
+        return firstAndLast;
+    }
+
+    @Override
+    public List<Boolean> overlaps(SSTable ssTable,
+                                  Partitioner partitioner,
+                                  int minIndexInterval,
+                                  int maxIndexInterval,
+                                  List<TokenRange> ranges) throws IOException
+    {
+        Pair<BigInteger, BigInteger> firstAndLastKeys = 
firstLastToken(ssTable, partitioner, minIndexInterval, maxIndexInterval);
+        TokenRange sstableRange = TokenRange.closed(firstAndLastKeys.left, 
firstAndLastKeys.right);
+        return ranges.stream()
+                     .map(range -> range.isConnected(sstableRange))
+                     .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<BigInteger> toTokens(Partitioner partitioner, @NotNull 
List<ByteBuffer> partitionKeys)
+    {
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        return partitionKeys
+               .stream()
+               .map(key -> {
+                   DecoratedKey decoratedKey = iPartitioner.decorateKey(key);
+                   return 
TokenUtils.tokenToBigInteger(decoratedKey.getToken());
+               })
+               .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<ByteBuffer> encodePartitionKeys(Partitioner partitioner, 
String keyspace, String createTableStmt, List<List<String>> keys)
+    {
+        CqlTable table = new SchemaBuilder(createTableStmt, keyspace, 
ReplicationFactor.simple(1), partitioner).build();
+        return keys.stream().map(key -> buildPartitionKey(table, 
key)).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Boolean> maybeContains(Partitioner partitioner,
+                                       String keyspace,
+                                       String table,
+                                       SSTable ssTable,
+                                       List<ByteBuffer> partitionKeys) throws 
IOException
+    {
+        if (partitionKeys.isEmpty())
+        {
+            return List.of();
+        }
+        IPartitioner iPartitioner = getPartitioner(partitioner);
+        Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, 
table, ssTable);
+        return maybeContains(descriptor, ssTable, 
partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList()));
+    }
+
+    private List<Boolean> maybeContains(Descriptor descriptor, SSTable 
ssTable, Collection<DecoratedKey> partitionKeys) throws IOException
+    {
+        if (partitionKeys.isEmpty())
+        {
+            return List.of();
+        }
+        BloomFilter filter = ReaderUtils.readFilter(ssTable, descriptor);

Review Comment:
   `BloomFilter` is closable, and it is not closed. 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -272,6 +285,28 @@ public static ByteBuffer buildPartitionKey(List<CqlField> 
partitionKeys, Object.
         return build(false, buffers);
     }
 
+    public static Map<String, Object> decodePartitionKey(CqlTable table, 
ByteBuffer partitionKey)

Review Comment:
   Add a test for it? 



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java:
##########
@@ -58,6 +65,55 @@ protected RowIterator(CellIterator it,
         this.builder = newBuilder(decorator);
     }
 
+    /**
+     * @param it              CellIterator cell iterate instance to iterate 
over each cell in the data.
+     * @param stats           stats instance to emit KPIs
+     * @param requiredColumns optional column filter to select a subset of 
columns, if null then all columns are returned.
+     * @return a basic RowIterator that transforms Object[] to a Map keyed on 
column name
+     */
+    public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator 
it, Stats stats, @Nullable String[] requiredColumns)
+    {
+        final CqlTable cqlTable = it.cqlTable();
+        final boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
+        return new RowIterator<>(it, stats, requiredColumns, 
Function.identity())
+        {
+            @Override
+            public PartialRowBuilder<Map<String, Object>> newPartialBuilder()
+            {
+                Objects.requireNonNull(requiredColumns, "requiredColumns must 
be non-null to use PartialRowBuilder");

Review Comment:
   nit: can you move the validation inside the `PartialRowBuilder#constructor`? 
The same validation is done in both here and `AbstractSparkRowIterator`



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ByteBufferUtils.java:
##########
@@ -91,6 +94,16 @@ public static byte[] getArray(ByteBuffer buffer)
         return bytes;
     }
 
+    public static String readShortLengthString(ByteBuffer buf)
+    {
+        int len = buf.getShort();
+        byte[] ar = new byte[len];
+        buf.get(ar);
+        String str = new String(ar, StandardCharsets.UTF_8);
+        buf.get();

Review Comment:
   why consume an extra byte and throw away? It is not a behavior of reading 
string, but to decode from `CompositeType` bytes



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java:
##########
@@ -58,6 +65,55 @@ protected RowIterator(CellIterator it,
         this.builder = newBuilder(decorator);
     }
 
+    /**
+     * @param it              CellIterator cell iterate instance to iterate 
over each cell in the data.
+     * @param stats           stats instance to emit KPIs
+     * @param requiredColumns optional column filter to select a subset of 
columns, if null then all columns are returned.
+     * @return a basic RowIterator that transforms Object[] to a Map keyed on 
column name
+     */
+    public static RowIterator<Map<String, Object>> rowMapIterator(CellIterator 
it, Stats stats, @Nullable String[] requiredColumns)
+    {
+        final CqlTable cqlTable = it.cqlTable();
+        final boolean hasProjectedValueColumns = it.hasProjectedValueColumns();
+        return new RowIterator<>(it, stats, requiredColumns, 
Function.identity())
+        {
+            @Override
+            public PartialRowBuilder<Map<String, Object>> newPartialBuilder()
+            {
+                Objects.requireNonNull(requiredColumns, "requiredColumns must 
be non-null to use PartialRowBuilder");
+                Map<String, Integer> columnIndex = IntStream.range(0, 
requiredColumns.length)
+                                                            .boxed()
+                                                            
.collect(Collectors.toMap(i -> requiredColumns[i], Function.identity()));
+
+                return new PartialRowBuilder<>(
+                requiredColumns, it.cqlTable(), hasProjectedValueColumns,
+                (valueArray) -> {
+                    Map<String, Object> row = new 
HashMap<>(requiredColumns.length);
+                    for (String field : requiredColumns)
+                    {
+                        row.put(field, valueArray[columnIndex.get(field)]);
+                    }

Review Comment:
   The `columnIndex` map is probably not needed if you populate the row this 
way. 
   
   ```suggestion
                       for (int i = 0; i < requiredColumns.length; i++)
                       {
                           String field = requiredColumns[i];
                           row.put(field, valueArray[i]);
                       }
   ```



##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/filters/PartitionKeyFilter.java:
##########
@@ -33,7 +33,7 @@
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.jetbrains.annotations.NotNull;
 
-public final class PartitionKeyFilter implements Serializable
+public final class PartitionKeyFilter implements Serializable, 
Comparable<PartitionKeyFilter>

Review Comment:
   It is unclear to me about the semantics of comparing two filters. Can you 
explain? 
   And why removing the `filter` method?



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java:
##########
@@ -388,4 +388,16 @@ public static Range<BigInteger> range(BigInteger start, 
BigInteger end)
     {
         return Range.openClosed(start, end);
     }
+
+    public static void createDirectory(Path directory)

Review Comment:
   Just to mention that the same functionality is already provided by junit. 
   But I am fine with defining our own testing utility. 



##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws 
IOException
 
     public abstract CompressionUtil compressionUtil();
 
+    // additional SSTable utils methods
+
+    /**
+     * @param keyspace keyspace name
+     * @param table    table name
+     * @param ssTable  SSTable instance
+     * @return last repair time for a given SSTable by reading the 
Statistics.db file.
+     * @throws IOException
+     */
+    public abstract long lastRepairTime(String keyspace, String table, SSTable 
ssTable) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param partitioner      Cassandra partitioner
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @return the first and last token by attempting to read from the 
Summary.db file first then failing back to the Index.db.
+     * @throws IOException
+     */
+    public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable 
ssTable,

Review Comment:
   naming is hard.. I was wondering what is the token that is the first of the 
last tokens..
   
   Anyway, there is already a method to expose 
`org.apache.cassandra.bridge.SSTableSummary`. This new method is sort of 
redundant, in the presence of 
`org.apache.cassandra.bridge.CassandraBridgeImplementation#getSSTableSummary`. 
   Probably merge them or make it an overload method.  



##########
cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java:
##########
@@ -443,6 +445,198 @@ public ByteBuffer uncompress(ByteBuffer input) throws 
IOException
 
     public abstract CompressionUtil compressionUtil();
 
+    // additional SSTable utils methods
+
+    /**
+     * @param keyspace keyspace name
+     * @param table    table name
+     * @param ssTable  SSTable instance
+     * @return last repair time for a given SSTable by reading the 
Statistics.db file.
+     * @throws IOException
+     */
+    public abstract long lastRepairTime(String keyspace, String table, SSTable 
ssTable) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param partitioner      Cassandra partitioner
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @return the first and last token by attempting to read from the 
Summary.db file first then failing back to the Index.db.
+     * @throws IOException
+     */
+    public abstract Pair<BigInteger, BigInteger> firstLastToken(SSTable 
ssTable,
+                                                                Partitioner 
partitioner,
+                                                                int 
minIndexInterval,
+                                                                int 
maxIndexInterval) throws IOException;
+
+    /**
+     * @param ssTable          SSTable instance
+     * @param minIndexInterval minIndexInterval configured in the TableMetaData
+     * @param partitioner      Cassandra partitioner
+     * @param maxIndexInterval maxIndexInterval configured in the TableMetadata
+     * @param ranges           a list of token ranges
+     * @return a list boolean value if corresponding token range in `ranges` 
list parameter overlaps with the SSTable.
+     * The SSTable may or may not contain data for the range.
+     */
+    public abstract List<Boolean> overlaps(SSTable ssTable,
+                                           Partitioner partitioner,
+                                           int minIndexInterval,
+                                           int maxIndexInterval,
+                                           List<TokenRange> ranges) throws 
IOException;
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        Cassandra keyspace
+     * @param createTableStmt CQL table create statement
+     * @param partitionKeys   list of
+     * @return list of tokens corresponding to each input `partitionKeys`
+     */
+    public List<BigInteger> toTokens(Partitioner partitioner, String keyspace, 
String createTableStmt, List<List<String>> partitionKeys)
+    {
+        return toTokens(partitioner, encodePartitionKeys(partitioner, 
keyspace, createTableStmt, partitionKeys));
+    }
+
+    /**
+     * @param partitioner   Cassandra partitioner
+     * @param partitionKeys list of encoded partition keys
+     * @return list of tokens corresponding to each input `partitionKeys`
+     */
+    public abstract List<BigInteger> toTokens(Partitioner partitioner, 
@NotNull List<ByteBuffer> partitionKeys);
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        keyspace name
+     * @param createTableStmt CQL create table statement
+     * @param partitionKey    partition key
+     * @return encoded ByteBuffer for the input `partitionKey`
+     */
+    public ByteBuffer encodePartitionKey(Partitioner partitioner, String 
keyspace, String createTableStmt, List<String> partitionKey)
+    {
+        return encodePartitionKeys(partitioner, keyspace, createTableStmt, 
Collections.singletonList(partitionKey)).get(0);
+    }
+
+    /**
+     * @param partitioner     Cassandra partitioner
+     * @param keyspace        keyspace name
+     * @param createTableStmt CQL create table statement
+     * @param partitionKeys   list of partition keys
+     * @return a list encoded ByteBuffers corresponding to the partition keys 
input in `partitionKeys`
+     */
+    public abstract List<ByteBuffer> encodePartitionKeys(Partitioner 
partitioner, String keyspace, String createTableStmt, List<List<String>> 
partitionKeys);
+
+    /**
+     * @param partitioner   Cassandra partitioner
+     * @param keyspace      keyspace name
+     * @param table         table name
+     * @param ssTable       SSTable instance
+     * @param partitionKeys list of partition keys
+     * @return list of booleans returning true if an SSTable might contain a 
partition key
+     * (might return false-positives but never false-negatives),
+     * corresponding to the partition keys input in `partitionKeys`.
+     * @throws IOException
+     */
+    public abstract List<Boolean> maybeContains(Partitioner partitioner,

Review Comment:
   Can you add test? 
   
   Another concern I have is that `maybeContains` and `contains` can be 
confusing. Their implementations differ completely, and they have different 
performance characteristics. I am wondering if it is more clear to just return 
a `BloomFilter` object here and client check the with the `BloomFilter`. 
   
   The `BloomFilter` would be a new class defined in the bridge subproject, not 
the one vended from cassandra library. It could be simply an extension to 
`Predicate<ByteBuffer>`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to