This is an automated email from the ASF dual-hosted git repository.
lukasz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new e5bfe221 CASSANALYTICS-121: Assign data file start offset based on BTI
index
e5bfe221 is described below
commit e5bfe221dfa58c6e4882485468d1a84e6e0f3d68
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Mon Feb 16 11:45:34 2026 +0100
CASSANALYTICS-121: Assign data file start offset based on BTI index
Patched by Lukasz Antoniak; Reviewed by Yifan Cai for CASSANALYTICS-121
---
CHANGES.txt | 1 +
.../io/sstable/format/bti/BtiReaderUtils.java | 64 +++++++++++++++++++++-
.../cassandra/spark/reader/SSTableReader.java | 24 ++++++--
.../cassandra/spark/reader/IndexOffsetTests.java | 15 ++++-
.../cassandra/spark/reader/IndexOffsetTests.java | 15 ++++-
.../org/apache/cassandra/utils/TokenUtils.java | 15 +++++
6 files changed, 121 insertions(+), 13 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7588c8b8..6b973c39 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Assign data file start offset based on BTI index (CASSANALYTICS-121)
* Quote identifiers option must be set to true if ttl has mixed case column
name (CASSANALYTICS-120)
* Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption
(CASSANALYTICS-116)
* Fix race condition in DirectStreamSession#onSSTablesProduced and
SortedSStableWriter#close (CASSANALYTICS-107)
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
index 367a41b7..444610bf 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
@@ -24,19 +24,28 @@ import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
@@ -53,6 +62,7 @@ import
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.TokenUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -60,6 +70,8 @@ import static
org.apache.cassandra.spark.reader.BigIndexReader.calculateCompress
public class BtiReaderUtils
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BtiReaderUtils.class);
+
private static final Set<Component> indexComponents =
ImmutableSet.of(BtiFormat.Components.DATA,
BtiFormat.Components.PARTITION_INDEX,
BtiFormat.Components.ROW_INDEX);
@@ -109,6 +121,53 @@ public class BtiReaderUtils
return exists.get();
}
+ @Nullable
+ public static Long startOffsetInDataFile(@NotNull SSTable ssTable,
+ @NotNull TableMetadata metadata,
+ @NotNull Descriptor descriptor,
+ @NotNull TokenRange tokenRange)
+ {
+ final AtomicReference<Long> offset = new AtomicReference<>(null);
+
+ Token tokenStart = TokenUtils.bigIntegerToToken(metadata.partitioner,
tokenRange.lowerEndpoint());
+ Token tokenEnd = TokenUtils.bigIntegerToToken(metadata.partitioner,
tokenRange.upperEndpoint());
+ Range<Token> range = new Range<>(tokenStart, tokenEnd);
+
+ try
+ {
+ withPartitionIndex(ssTable, descriptor, metadata, true, false,
(dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> {
+ TableMetadataRef metadataRef =
TableMetadataRef.forOfflineTools(metadata);
+ BtiTableReader btiTableReader = new
BtiTableReader.Builder(descriptor)
+ .setDataFile(dataFileHandle)
+
.setPartitionIndex(partitionIndex)
+ .setComponents(indexComponents)
+
.setTableMetadataRef(metadataRef)
+
.setFilter(FilterFactory.AlwaysPresent)
+ .build(null, false, false);
+ try
+ {
+ List<SSTableReader.PartitionPositionBounds> positions =
+
btiTableReader.getPositionsForRanges(Collections.singletonList(range));
+ if (!positions.isEmpty())
+ {
+ // we should receive zero or one position
+ offset.set(positions.get(0).lowerPosition);
+ }
+ }
+ finally
+ {
+ btiTableReader.selfRef().release();
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Failed to lookup start offset for token range {} in
sstable {}",
+ tokenRange, ssTable, e);
+ }
+ return offset.get();
+ }
+
public static void consumePrimaryIndex(@NotNull SSTable ssTable,
@NotNull TableMetadata metadata,
@NotNull Descriptor descriptor,
@@ -240,9 +299,8 @@ public class BtiReaderUtils
@NotNull BtiPartitionIndexConsumer
consumer) throws IOException
{
File file = new File(sstable.getDataFileName());
- CompressionMetadata compression = getCompressionMetadata(sstable,
crcCheckChance, descriptor);
-
- try (FileHandle dataFileHandle = loadDataFile ? createFileHandle(file,
+ try (CompressionMetadata compression = getCompressionMetadata(sstable,
crcCheckChance, descriptor);
+ FileHandle dataFileHandle = loadDataFile ? createFileHandle(file,
sstable.openDataStream(),
sstable.length(FileType.DATA),
compression) : null;
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 3dbe44d1..cd8be1f0 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.bti.BtiReaderUtils;
import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
@@ -429,11 +430,26 @@ public class SSTableReader implements SparkSSTableReader,
Scannable
buildColumnFilter(metadata,
columnFilter));
this.metadata = metadata;
- if (readIndexOffset && summary != null)
+ if (readIndexOffset)
{
- SummaryDbUtils.Summary finalSummary = summary;
- extractRange(sparkRangeFilter, partitionKeyFilters)
- .ifPresent(range -> readOffsets(finalSummary.summary(),
range));
+ if (summary != null)
+ {
+ // BIG format
+ SummaryDbUtils.Summary finalSummary = summary;
+ extractRange(sparkRangeFilter, partitionKeyFilters)
+ .ifPresent(range ->
readOffsets(finalSummary.summary(), range));
+ }
+ else
+ {
+ // BTI format
+ extractRange(sparkRangeFilter, partitionKeyFilters)
+ .ifPresent(range -> {
+ startOffset =
BtiReaderUtils.startOffsetInDataFile(ssTable,
+
this.metadata,
+
descriptor,
+
range);
+ });
+ }
}
else
{
diff --git
a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
index b2b7751f..316a9a1c 100644
---
a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
+++
b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +111,8 @@ public class IndexOffsetTests
LOGGER.info("Testing index offsets numKeys={}
sparkPartitions={} partitioner={} enableCompression={}",
numKeys, ranges.size(), partitioner.name(),
enableCompression);
- MutableInt skipped = new MutableInt(0);
+ MutableInt skippedPartitions = new MutableInt(0);
+ MutableLong skippedDataOffsets = new MutableLong(0);
int[] counts = new int[numKeys];
for (TokenRange range : ranges)
{
@@ -120,7 +122,12 @@ public class IndexOffsetTests
{
public void
skippedPartition(ByteBuffer key, BigInteger token)
{
-
skipped.add(1);
+
skippedPartitions.add(1);
+ }
+
+ public void
skippedDataDbStartOffset(long length)
+ {
+
skippedDataOffsets.add(length);
}
})
.build();
@@ -180,8 +187,10 @@ public class IndexOffsetTests
index++;
}
+
assertThat(skippedDataOffsets.longValue()).isGreaterThan(0);
+
LOGGER.info("Success skippedKeys={} partitioner={}",
- skipped.intValue(), partitioner.name());
+ skippedPartitions.intValue(),
partitioner.name());
}
catch (IOException exception)
{
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
index b2b7751f..316a9a1c 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +111,8 @@ public class IndexOffsetTests
LOGGER.info("Testing index offsets numKeys={}
sparkPartitions={} partitioner={} enableCompression={}",
numKeys, ranges.size(), partitioner.name(),
enableCompression);
- MutableInt skipped = new MutableInt(0);
+ MutableInt skippedPartitions = new MutableInt(0);
+ MutableLong skippedDataOffsets = new MutableLong(0);
int[] counts = new int[numKeys];
for (TokenRange range : ranges)
{
@@ -120,7 +122,12 @@ public class IndexOffsetTests
{
public void
skippedPartition(ByteBuffer key, BigInteger token)
{
-
skipped.add(1);
+
skippedPartitions.add(1);
+ }
+
+ public void
skippedDataDbStartOffset(long length)
+ {
+
skippedDataOffsets.add(length);
}
})
.build();
@@ -180,8 +187,10 @@ public class IndexOffsetTests
index++;
}
+
assertThat(skippedDataOffsets.longValue()).isGreaterThan(0);
+
LOGGER.info("Success skippedKeys={} partitioner={}",
- skipped.intValue(), partitioner.name());
+ skippedPartitions.intValue(),
partitioner.name());
}
catch (IOException exception)
{
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
index beef1aa8..da84bca4 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.utils;
import java.math.BigInteger;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
@@ -46,6 +47,20 @@ public class TokenUtils
throw new UnsupportedOperationException("Unexpected token type: " +
token.getClass().getName());
}
+ public static Token bigIntegerToToken(IPartitioner partitioner, BigInteger
token)
+ {
+ if (partitioner instanceof Murmur3Partitioner)
+ {
+ return new Murmur3Partitioner.LongToken(token.longValue());
+ }
+ if (partitioner instanceof RandomPartitioner)
+ {
+ return new RandomPartitioner.BigIntegerToken(token);
+ }
+
+ throw new UnsupportedOperationException("Unexpected partitioner type:
" + partitioner.getClass().getName());
+ }
+
public static long tokenToLong(final Token token)
{
if (token instanceof Murmur3Partitioner.LongToken)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]