This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 9ce86e0ff8 SAI result retriever is filtering too many rows
9ce86e0ff8 is described below
commit 9ce86e0ff8b6344b528a0640f9dafa23f97dd85a
Author: Mike Adamson <[email protected]>
AuthorDate: Tue Aug 8 17:07:01 2023 +0100
SAI result retriever is filtering too many rows
This patch fixes a bug in the SegmentMetadata that
was only storing the partition key for min and max
primary keys for a segment. It also contains some
refactoring of the PrimaryKey to remove the deferred
loading of PrimaryKeys by the PrimaryKeyMaps.
Patch by Mike Adamson; reviewed by Caleb Rackliffe and Andrés de la Peña
for CASSANDRA-18734
---
CHANGES.txt | 1 +
.../apache/cassandra/index/sai/IndexContext.java | 9 +-
.../cassandra/index/sai/StorageAttachedIndex.java | 1 +
.../index/sai/StorageAttachedIndexGroup.java | 2 +-
.../cassandra/index/sai/disk/PrimaryKeyMap.java | 3 +-
.../index/sai/disk/StorageAttachedIndexWriter.java | 3 +-
.../index/sai/disk/format/IndexDescriptor.java | 11 +-
.../index/sai/disk/v1/MemtableIndexWriter.java | 14 +-
.../index/sai/disk/v1/SkinnyPrimaryKeyMap.java | 40 +--
.../index/sai/disk/v1/WidePrimaryKeyMap.java | 37 +--
.../index/sai/disk/v1/keystore/KeyLookup.java | 6 +-
.../index/sai/disk/v1/segment/SegmentMetadata.java | 33 +-
.../index/sai/memory/TrieMemoryIndex.java | 3 +-
.../cassandra/index/sai/plan/QueryController.java | 13 +-
.../sai/plan/StorageAttachedIndexSearcher.java | 24 +-
.../cassandra/index/sai/utils/PrimaryKey.java | 368 ++++++++++++++-------
.../test/microbench/sai/KeyLookupBench.java | 4 +-
.../org/apache/cassandra/index/sai/SAITester.java | 4 +-
.../index/sai/disk/format/IndexDescriptorTest.java | 5 +-
.../sai/disk/v1/InvertedIndexSearcherTest.java | 10 +-
.../index/sai/disk/v1/SegmentFlushTest.java | 16 +-
.../index/sai/disk/v1/WideRowPrimaryKeyTest.java | 3 +-
.../v1/bbtree/BlockBalancedTreeIndexBuilder.java | 8 +-
.../index/sai/disk/v1/keystore/KeyLookupTest.java | 28 +-
.../sai/iterators/KeyRangeConcatIteratorTest.java | 6 +-
.../index/sai/iterators/LongIterator.java | 2 +-
.../AbstractInMemoryKeyRangeIteratorTester.java | 16 +-
.../PriorityInMemoryKeyRangeIteratorTest.java | 4 +-
.../index/sai/memory/TrieMemoryIndexTest.java | 1 +
.../index/sai/utils/AbstractPrimaryKeyTester.java | 15 +-
.../index/sai/utils/IndexInputLeakDetector.java | 2 +-
.../cassandra/index/sai/utils/PrimaryKeyTest.java | 61 +++-
32 files changed, 449 insertions(+), 304 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 953ca7f3dc..c65a1e6671 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0-alpha2
+ * Fix SAI's SegmentMetadata min and max primary keys (CASSANDRA-18734)
* Remove commons-codec dependency (CASSANDRA-18772)
Merged from 4.1:
Merged from 4.0:
diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java
b/src/java/org/apache/cassandra/index/sai/IndexContext.java
index ac33c837da..61eb844b02 100644
--- a/src/java/org/apache/cassandra/index/sai/IndexContext.java
+++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.index.sai.analyzer.AbstractAnalyzer;
import org.apache.cassandra.index.sai.disk.SSTableIndex;
import org.apache.cassandra.index.sai.disk.format.Version;
@@ -96,6 +97,7 @@ public class IndexContext
public IndexContext(String keyspace,
String table,
AbstractType<?> partitionKeyType,
+ IPartitioner partitioner,
ClusteringComparator clusteringComparator,
ColumnMetadata columnMetadata,
IndexTarget.Type indexType,
@@ -108,7 +110,7 @@ public class IndexContext
this.columnMetadata = Objects.requireNonNull(columnMetadata);
this.indexType = Objects.requireNonNull(indexType);
this.validator = TypeUtil.cellValueType(columnMetadata, indexType);
- this.primaryKeyFactory = new PrimaryKey.Factory(clusteringComparator);
+ this.primaryKeyFactory = new PrimaryKey.Factory(partitioner,
clusteringComparator);
this.indexMetadata = indexMetadata;
this.memtableIndexManager = indexMetadata == null ? null : new
MemtableIndexManager(this);
@@ -122,6 +124,11 @@ public class IndexContext
:
AbstractAnalyzer.fromOptions(getValidator(), indexMetadata.options);
}
+ public boolean hasClustering()
+ {
+ return clusteringComparator.size() > 0;
+ }
+
public AbstractType<?> keyValidator()
{
return partitionKeyType;
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index 805a6b65e1..a0cdf9c3ac 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -178,6 +178,7 @@ public class StorageAttachedIndex implements Index
this.indexContext = new IndexContext(tableMetadata.keyspace,
tableMetadata.name,
tableMetadata.partitionKeyType,
+ tableMetadata.partitioner,
tableMetadata.comparator,
target.left,
target.right,
diff --git
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
index 7ce084d9db..de4c8b3570 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
@@ -191,7 +191,7 @@ public class StorageAttachedIndexGroup implements
Index.Group, INotificationCons
@Override
public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker, TableMetadata tableMetadata)
{
- IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
tableMetadata.comparator);
+ IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
tableMetadata.partitioner, tableMetadata.comparator);
try
{
return
StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes,
tracker);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
index fa47b9f4a4..330acf6954 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
@@ -36,7 +37,7 @@ public interface PrimaryKeyMap extends Closeable
* A factory for creating {@link PrimaryKeyMap} instances. Implementations
of this
* interface are expected to be threadsafe.
*/
- @NotThreadSafe
+ @ThreadSafe
interface Factory extends Closeable
{
/**
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index f44a68bde1..f35ae67f93 100644
---
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -247,7 +247,8 @@ public class StorageAttachedIndexWriter implements
SSTableFlushObserver
private void addRow(Row row) throws IOException,
InMemoryTrie.SpaceExhaustedException
{
- PrimaryKey primaryKey =
indexDescriptor.primaryKeyFactory.create(currentKey, row.clustering());
+ PrimaryKey primaryKey = indexDescriptor.hasClustering() ?
indexDescriptor.primaryKeyFactory.create(currentKey, row.clustering())
+ :
indexDescriptor.primaryKeyFactory.create(currentKey);
perSSTableWriter.nextRow(primaryKey);
rowMapping.add(primaryKey, sstableRowId);
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
index 37364f6448..eda5cb7ed3 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.IndexValidation;
import org.apache.cassandra.index.sai.SSTableContext;
@@ -74,17 +75,17 @@ public class IndexDescriptor
public final ClusteringComparator clusteringComparator;
public final PrimaryKey.Factory primaryKeyFactory;
- private IndexDescriptor(Version version, Descriptor sstableDescriptor,
ClusteringComparator clusteringComparator)
+ private IndexDescriptor(Version version, Descriptor sstableDescriptor,
IPartitioner partitioner, ClusteringComparator clusteringComparator)
{
this.version = version;
this.sstableDescriptor = sstableDescriptor;
this.clusteringComparator = clusteringComparator;
- this.primaryKeyFactory = new PrimaryKey.Factory(clusteringComparator);
+ this.primaryKeyFactory = new PrimaryKey.Factory(partitioner,
clusteringComparator);
}
- public static IndexDescriptor create(Descriptor descriptor,
ClusteringComparator clusteringComparator)
+ public static IndexDescriptor create(Descriptor descriptor, IPartitioner
partitioner, ClusteringComparator clusteringComparator)
{
- return new IndexDescriptor(Version.LATEST, descriptor,
clusteringComparator);
+ return new IndexDescriptor(Version.LATEST, descriptor, partitioner,
clusteringComparator);
}
public static IndexDescriptor create(SSTableReader sstable)
@@ -93,6 +94,7 @@ public class IndexDescriptor
{
IndexDescriptor indexDescriptor = new IndexDescriptor(version,
sstable.descriptor,
+
sstable.getPartitioner(),
sstable.metadata().comparator);
if
(version.onDiskFormat().isPerSSTableIndexBuildComplete(indexDescriptor))
@@ -102,6 +104,7 @@ public class IndexDescriptor
}
return new IndexDescriptor(Version.LATEST,
sstable.descriptor,
+ sstable.getPartitioner(),
sstable.metadata().comparator);
}
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index da57ac29e0..97e82ed805 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.LongArrayList;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.index.sai.IndexContext;
@@ -105,14 +104,11 @@ public class MemtableIndexWriter implements
PerColumnIndexWriter
return;
}
- final DecoratedKey minKey = rowMapping.minKey.partitionKey();
- final DecoratedKey maxKey = rowMapping.maxKey.partitionKey();
-
final Iterator<Pair<ByteComparable, LongArrayList>> iterator =
rowMapping.merge(memtable);
try (MemtableTermsIterator terms = new
MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator))
{
- long cellCount = flush(minKey, maxKey,
indexContext.getValidator(), terms, rowMapping.maxSSTableRowId);
+ long cellCount = flush(rowMapping.minKey, rowMapping.maxKey,
indexContext.getValidator(), terms, rowMapping.maxSSTableRowId);
indexDescriptor.createComponentOnDisk(IndexComponent.COLUMN_COMPLETION_MARKER,
indexContext);
@@ -135,8 +131,8 @@ public class MemtableIndexWriter implements
PerColumnIndexWriter
}
}
- private long flush(DecoratedKey minKey,
- DecoratedKey maxKey,
+ private long flush(PrimaryKey minKey,
+ PrimaryKey maxKey,
AbstractType<?> termComparator,
MemtableTermsIterator terms,
long maxSSTableRowId) throws IOException
@@ -175,8 +171,8 @@ public class MemtableIndexWriter implements
PerColumnIndexWriter
numRows,
terms.getMinSSTableRowId(),
terms.getMaxSSTableRowId(),
-
indexDescriptor.primaryKeyFactory.createPartitionKeyOnly(minKey),
-
indexDescriptor.primaryKeyFactory.createPartitionKeyOnly(maxKey),
+ minKey,
+ maxKey,
terms.getMinTerm(),
terms.getMaxTerm(),
indexMetas);
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
index ad7e16751b..b535861290 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SkinnyPrimaryKeyMap.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.index.sai.disk.v1;
-import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.Clustering;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
@@ -35,15 +38,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.bytecomparable.ByteComparable;
-import org.apache.cassandra.utils.bytecomparable.ByteSource;
-import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.annotation.concurrent.ThreadSafe;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
/**
* A {@link PrimaryKeyMap} for skinny tables (those with no clustering
columns).
@@ -130,7 +124,6 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
protected final KeyLookup.Cursor partitionKeyCursor;
protected final IPartitioner partitioner;
protected final PrimaryKey.Factory primaryKeyFactory;
- protected final ByteBuffer tokenBuffer = ByteBuffer.allocate(Long.BYTES);
protected SkinnyPrimaryKeyMap(LongArray tokenArray,
LongArray partitionArray,
@@ -148,9 +141,7 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
@Override
public PrimaryKey primaryKeyFromRowId(long sstableRowId)
{
- tokenBuffer.putLong(tokenArray.get(sstableRowId));
- tokenBuffer.rewind();
- return
primaryKeyFactory.createDeferred(partitioner.getTokenFactory().fromByteArray(tokenBuffer),
() -> supplier(sstableRowId));
+ return primaryKeyFactory.create(readPartitionKey(sstableRowId));
}
@Override
@@ -159,7 +150,9 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
long rowId = tokenArray.indexOf(primaryKey.token().getLongValue());
// If the key is token only, the token is out of range, we are at the
end of our keys, or we have skipped a token
// we can return straight away.
- if (primaryKey.isTokenOnly() || rowId < 0 || rowId + 1 ==
tokenArray.length() || tokenArray.get(rowId) !=
primaryKey.token().getLongValue())
+ if (primaryKey.kind() == PrimaryKey.Kind.TOKEN ||
+ rowId < 0 ||
+ rowId + 1 == tokenArray.length() || tokenArray.get(rowId) !=
primaryKey.token().getLongValue())
return rowId;
// Otherwise we need to check for token collision.
return tokenCollisionDetection(primaryKey, rowId);
@@ -188,21 +181,8 @@ public class SkinnyPrimaryKeyMap implements PrimaryKeyMap
return rowId;
}
- protected PrimaryKey supplier(long sstableRowId)
- {
- return primaryKeyFactory.create(readPartitionKey(sstableRowId),
Clustering.EMPTY);
- }
-
protected DecoratedKey readPartitionKey(long sstableRowId)
{
- long partitionId = partitionArray.get(sstableRowId);
- ByteSource.Peekable peekable =
ByteSource.peekable(partitionKeyCursor.seekToPointId(partitionId).asComparableBytes(ByteComparable.Version.OSS50));
-
- byte[] keyBytes = ByteSourceInverse.getUnescapedBytes(peekable);
-
- assert keyBytes != null : "Primary key from map did not contain a
partition key";
-
- ByteBuffer decoratedKey = ByteBuffer.wrap(keyBytes);
- return new BufferDecoratedKey(partitioner.getToken(decoratedKey),
decoratedKey);
+ return
primaryKeyFactory.partitionKeyFromComparableBytes(partitionKeyCursor.seekToPointId(partitionArray.get(sstableRowId)));
}
}
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
index 0016c49493..a93d6a7ff1 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/WidePrimaryKeyMap.java
@@ -18,9 +18,13 @@
package org.apache.cassandra.index.sai.disk.v1;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
@@ -33,13 +37,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.bytecomparable.ByteComparable;
-import org.apache.cassandra.utils.bytecomparable.ByteSource;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.annotation.concurrent.ThreadSafe;
-import java.io.IOException;
-import java.util.Arrays;
/**
* An extension of the {@link SkinnyPrimaryKeyMap} for wide tables (those with
clustering columns).
@@ -125,6 +122,12 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap
this.clusteringKeyCursor = clusteringKeyCursor;
}
+ @Override
+ public PrimaryKey primaryKeyFromRowId(long sstableRowId)
+ {
+ return primaryKeyFactory.create(readPartitionKey(sstableRowId),
readClusteringKey(sstableRowId));
+ }
+
@Override
public long rowIdFromPrimaryKey(PrimaryKey primaryKey)
{
@@ -132,7 +135,7 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap
// If the key only has a token (initial range skip in the query), the
token is out of range,
// or we have skipped a token, return the rowId from the token array.
- if (primaryKey.isTokenOnly() || rowId < 0 || tokenArray.get(rowId) !=
primaryKey.token().getLongValue())
+ if (primaryKey.kind() == PrimaryKey.Kind.TOKEN || rowId < 0 ||
tokenArray.get(rowId) != primaryKey.token().getLongValue())
return rowId;
rowId = tokenCollisionDetection(primaryKey, rowId);
@@ -148,23 +151,9 @@ public class WidePrimaryKeyMap extends SkinnyPrimaryKeyMap
FileUtils.closeQuietly(clusteringKeyCursor);
}
- @Override
- protected PrimaryKey supplier(long sstableRowId)
- {
- return primaryKeyFactory.create(readPartitionKey(sstableRowId),
readClusteringKey(sstableRowId));
- }
-
private Clustering<?> readClusteringKey(long sstableRowId)
{
- ByteSource.Peekable peekable =
ByteSource.peekable(clusteringKeyCursor.seekToPointId(sstableRowId)
-
.asComparableBytes(ByteComparable.Version.OSS50));
-
- Clustering<?> clustering =
clusteringComparator.clusteringFromByteComparable(ByteBufferAccessor.instance,
v -> peekable);
-
- if (clustering == null)
- clustering = Clustering.EMPTY;
-
- return clustering;
+ return
primaryKeyFactory.clusteringFromByteComparable(clusteringKeyCursor.seekToPointId(sstableRowId));
}
// Returns the rowId of the next partition or the number of rows if
supplied rowId is in the last partition
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
index 7f6f8f3b82..7b8808e89e 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookup.java
@@ -143,10 +143,10 @@ public class KeyLookup
* in these cases the internal buffer is cleared.
*
* @param pointId point id to lookup
- * @return The {@link ByteComparable} containing the key
+ * @return The {@link ByteSource} containing the key
* @throws IndexOutOfBoundsException if the target point id is less
than -1 or greater than the number of keys
*/
- public @Nonnull ByteComparable seekToPointId(long pointId)
+ public @Nonnull ByteSource seekToPointId(long pointId)
{
if (pointId < 0 || pointId >= keyLookupMeta.keyCount)
throw new
IndexOutOfBoundsException(String.format(INDEX_OUT_OF_BOUNDS, pointId,
keyLookupMeta.keyCount));
@@ -170,7 +170,7 @@ public class KeyLookup
updateCurrentBlockIndex(currentPointId);
}
- return ByteComparable.fixedLength(currentKey.bytes,
currentKey.offset, currentKey.length);
+ return ByteSource.fixedLength(currentKey.bytes, currentKey.offset,
currentKey.length);
}
/**
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
index e9510f945b..28aa9b2e4d 100644
---
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
+++
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentMetadata.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.index.sai.disk.v1.segment;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -30,12 +31,14 @@ import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.v1.MetadataSource;
import org.apache.cassandra.index.sai.disk.v1.MetadataWriter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexOutput;
@@ -54,7 +57,7 @@ public class SegmentMetadata
/**
* Min and max sstable rowId in current segment.
- *
+ * <p>
* For index generated by compaction, minSSTableRowId is the same as
segmentRowIdOffset.
* But for flush, segmentRowIdOffset is taken from previous segment's
maxSSTableRowId.
*/
@@ -80,7 +83,7 @@ public class SegmentMetadata
/**
* Root, offset, length for each index structure in the segment.
- *
+ * <p>
* Note: postings block offsets are stored in terms dictionary, no need to
worry about its root.
*/
public final ComponentMetadataMap componentMetadatas;
@@ -118,8 +121,8 @@ public class SegmentMetadata
this.numRows = input.readLong();
this.minSSTableRowId = input.readLong();
this.maxSSTableRowId = input.readLong();
- this.minKey =
primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
- this.maxKey =
primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
+ this.minKey =
primaryKeyFactory.fromComparableBytes(ByteSource.fixedLength(readBytes(input)));
+ this.maxKey =
primaryKeyFactory.fromComparableBytes(ByteSource.fixedLength(readBytes(input)));
this.minTerm = readBytes(input);
this.maxTerm = readBytes(input);
this.componentMetadatas = new ComponentMetadataMap(input);
@@ -158,9 +161,10 @@ public class SegmentMetadata
output.writeLong(metadata.minSSTableRowId);
output.writeLong(metadata.maxSSTableRowId);
- Stream.of(metadata.minKey.partitionKey().getKey(),
- metadata.maxKey.partitionKey().getKey(),
- metadata.minTerm, metadata.maxTerm).forEach(bb ->
writeBytes(bb, output));
+
Stream.of(ByteSourceInverse.readBytes(metadata.minKey.asComparableBytes(ByteComparable.Version.OSS50)),
+
ByteSourceInverse.readBytes(metadata.maxKey.asComparableBytes(ByteComparable.Version.OSS50)))
+ .forEach(b -> writeBytes(b, output));
+ Stream.of(metadata.minTerm, metadata.maxTerm).forEach(bb ->
writeBytes(bb, output));
metadata.componentMetadatas.write(output);
}
@@ -195,6 +199,19 @@ public class SegmentMetadata
out.writeInt(bytes.length);
out.writeBytes(bytes, 0, bytes.length);
}
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static void writeBytes(byte[] bytes, IndexOutput out)
+ {
+ try
+ {
+ out.writeInt(bytes.length);
+ out.writeBytes(bytes, 0, bytes.length);
+ }
catch (IOException ioe)
{
throw new RuntimeException(ioe);
diff --git
a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
index 0df665d930..f992301df9 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
@@ -97,7 +97,8 @@ public class TrieMemoryIndex
{
value = TypeUtil.asIndexBytes(value, validator);
analyzer.reset(value);
- final PrimaryKey primaryKey =
indexContext.keyFactory().create(key, clustering);
+ final PrimaryKey primaryKey = indexContext.hasClustering() ?
indexContext.keyFactory().create(key, clustering)
+ :
indexContext.keyFactory().create(key);
final long initialSizeOnHeap = data.sizeOnHeap();
final long initialSizeOffHeap = data.sizeOffHeap();
final long reducerHeapSize = primaryKeysReducer.heapAllocations();
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index bf435678ac..2d01983219 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -119,6 +119,7 @@ public class QueryController
return indexes.isEmpty() ? new IndexContext(cfs.metadata().keyspace,
cfs.metadata().name,
cfs.metadata().partitionKeyType,
+ cfs.getPartitioner(),
cfs.metadata().comparator,
expression.column(),
IndexTarget.Type.VALUES,
@@ -196,7 +197,7 @@ public class QueryController
* The query does not select the key if both of the following statements
are false:
* 1. The table associated with the query is not using clustering keys
* 2. The clustering index filter for the command wants the row.
- *
+ * <p>
* Item 2 is important in paged queries where the {@link
org.apache.cassandra.db.filter.ClusteringIndexSliceFilter} for
* subsequent paged queries may not select rows that are returned by the
index
* search because that is initially partition based.
@@ -206,7 +207,7 @@ public class QueryController
*/
public boolean doesNotSelect(PrimaryKey key)
{
- return !key.hasEmptyClustering() &&
!command.clusteringIndexFilter(key.partitionKey()).selects(key.clustering());
+ return key.kind() == PrimaryKey.Kind.WIDE &&
!command.clusteringIndexFilter(key.partitionKey()).selects(key.clustering());
}
// Note: This method assumes that the selects method has already been
called for the
@@ -215,7 +216,13 @@ public class QueryController
{
ClusteringIndexFilter clusteringIndexFilter =
command.clusteringIndexFilter(key.partitionKey());
- if (key.hasEmptyClustering())
+ assert cfs.metadata().comparator.size() == 0 &&
!key.kind().hasClustering ||
+ cfs.metadata().comparator.size() > 0 &&
key.kind().hasClustering :
+ "PrimaryKey " + key + " clustering does not match table. There
should be a clustering of size " + cfs.metadata().comparator.size();
+
+ // If we have skinny partitions or the key is for a static row then we
need to get the partition as
+ // requested by the original query.
+ if (cfs.metadata().comparator.size() == 0 || key.kind() ==
PrimaryKey.Kind.STATIC)
return clusteringIndexFilter;
else
return new
ClusteringIndexNamesFilter(FBUtilities.singleton(key.clustering(),
cfs.metadata().comparator),
diff --git
a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
index adcf36d157..295189b023 100644
---
a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
+++
b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
@@ -72,7 +72,7 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
this.command = command;
this.queryContext = new QueryContext(command, executionQuotaMs);
this.queryController = new QueryController(cfs, command,
filterOperation, queryContext, tableQueryMetrics);
- this.keyFactory = new PrimaryKey.Factory(cfs.metadata().comparator);
+ this.keyFactory = new PrimaryKey.Factory(cfs.getPartitioner(),
cfs.metadata().comparator);
}
@Override
@@ -138,8 +138,8 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
this.queryContext = queryContext;
this.keyFactory = keyFactory;
- this.firstPrimaryKey =
keyFactory.createTokenOnly(queryController.mergeRange().left.getToken());
- this.lastPrimaryKey =
keyFactory.createTokenOnly(queryController.mergeRange().right.getToken());
+ this.firstPrimaryKey =
keyFactory.create(queryController.mergeRange().left.getToken());
+ this.lastPrimaryKey =
keyFactory.create(queryController.mergeRange().right.getToken());
}
@Override
@@ -298,7 +298,7 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
*/
private void skipTo(@Nonnull Token token)
{
- resultKeyIterator.skipTo(keyFactory.createTokenOnly(token));
+ resultKeyIterator.skipTo(keyFactory.create(token));
}
/**
@@ -380,6 +380,13 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
private static UnfilteredRowIterator
applyIndexFilter(UnfilteredRowIterator partition, FilterTree tree, QueryContext
queryContext)
{
Row staticRow = partition.staticRow();
+
+ // We want to short-circuit the filtering of the whole partition
if the static row
+ // satisfies the filter. If that is the case we just need to
return the whole partition.
+ queryContext.rowsFiltered++;
+ if (tree.isSatisfiedBy(partition.partitionKey(), staticRow,
staticRow))
+ return partition;
+
List<Unfiltered> clusters = new ArrayList<>();
while (partition.hasNext())
@@ -393,15 +400,6 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
}
}
- if (clusters.isEmpty())
- {
- queryContext.rowsFiltered++;
- if (tree.isSatisfiedBy(partition.partitionKey(), staticRow,
staticRow))
- {
- clusters.add(staticRow);
- }
- }
-
/*
* If {@code clusters} is empty, which means either all clustering
row and static row pairs failed,
* or static row and static row pair failed. In both cases,
we should not return any partition.
diff --git a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
index fe541998e2..7918cbad0c 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java
@@ -17,34 +17,58 @@
*/
package org.apache.cassandra.index.sai.utils;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-
+import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
/**
* Representation of the primary key for a row consisting of the {@link
DecoratedKey} and
* {@link Clustering} associated with a {@link
org.apache.cassandra.db.rows.Row}.
+ * The {@link Factory.TokenOnlyPrimaryKey} is used by the {@link
org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher} to
+ * position the search within the query range.
*/
public interface PrimaryKey extends Comparable<PrimaryKey>, ByteComparable
{
+ /**
+ * See the javadoc for {@link #kind()} for how this enum is used.
+ */
+ enum Kind
+ {
+ TOKEN(false),
+ SKINNY(false),
+ WIDE(true),
+ STATIC(true);
+
+ public final boolean hasClustering;
+
+ Kind(boolean hasClustering)
+ {
+ this.hasClustering = hasClustering;
+ }
+ }
+
class Factory
{
+ private final IPartitioner partitioner;
private final ClusteringComparator clusteringComparator;
- public Factory(ClusteringComparator clusteringComparator)
+ public Factory(IPartitioner partitioner, ClusteringComparator
clusteringComparator)
{
+ this.partitioner = partitioner;
this.clusteringComparator = clusteringComparator;
}
@@ -54,18 +78,22 @@ public interface PrimaryKey extends Comparable<PrimaryKey>,
ByteComparable
* {@link Token} only primary keys are used for defining the partition
range
* of a query.
*/
- public PrimaryKey createTokenOnly(Token token)
+ public PrimaryKey create(Token token)
{
assert token != null : "Cannot create a primary key with a null
token";
return new TokenOnlyPrimaryKey(token);
}
- public PrimaryKey createPartitionKeyOnly(DecoratedKey partitionKey)
+ /**
+ * Create a {@link PrimaryKey} for tables without clustering columns
+ */
+ public PrimaryKey create(DecoratedKey partitionKey)
{
+ assert clusteringComparator.size() == 0 : "Cannot create a skinny
primary key for a table with clustering columns";
assert partitionKey != null : "Cannot create a primary key with a
null partition key";
- return new ImmutablePrimaryKey(partitionKey, null);
+ return new SkinnyPrimaryKey(partitionKey);
}
/**
@@ -74,66 +102,103 @@ public interface PrimaryKey extends
Comparable<PrimaryKey>, ByteComparable
*/
public PrimaryKey create(DecoratedKey partitionKey, Clustering<?>
clustering)
{
+ assert clusteringComparator.size() > 0 : "Cannot create a wide
primary key for a table without clustering columns";
assert partitionKey != null : "Cannot create a primary key with a
null partition key";
assert clustering != null : "Cannot create a primary key with a
null clustering";
- return new ImmutablePrimaryKey(partitionKey, clustering);
+ return clustering == Clustering.STATIC_CLUSTERING ? new
StaticPrimaryKey(partitionKey) : new WidePrimaryKey(partitionKey, clustering);
+ }
+
+ /**
+ * Create a {@link PrimaryKey} from a {@link ByteSource}. This should
only be used with {@link ByteSource} instances
+ * created by calls to {@link PrimaryKey#asComparableBytes(Version)}.
+ */
+ public PrimaryKey fromComparableBytes(ByteSource byteSource)
+ {
+ if (clusteringComparator.size() > 0)
+ {
+ ByteSource.Peekable peekable = ByteSource.peekable(byteSource);
+ DecoratedKey partitionKey =
partitionKeyFromComparableBytes(ByteSourceInverse.nextComponentSource(peekable));
+ Clustering<?> clustering =
clusteringFromByteComparable(ByteSourceInverse.nextComponentSource(peekable));
+ return create(partitionKey, clustering);
+ }
+ else
+ {
+ return create(partitionKeyFromComparableBytes(byteSource));
+ }
+ }
+
+ /**
+ * Create a {@link DecoratedKey} from a {@link ByteSource}. This is a
separate method because of it's use by
+ * the {@link org.apache.cassandra.index.sai.disk.PrimaryKeyMap}
implementations to create partition keys.
+ */
+ public DecoratedKey partitionKeyFromComparableBytes(ByteSource
byteSource)
+ {
+ ByteBuffer decoratedKey =
ByteBuffer.wrap(ByteSourceInverse.getUnescapedBytes(ByteSource.peekable(byteSource)));
+ return new BufferDecoratedKey(partitioner.getToken(decoratedKey),
decoratedKey);
}
- public PrimaryKey createDeferred(Token token, Supplier<PrimaryKey>
primaryKeySupplier)
+ /**
+ * Create a {@link Clustering} from a {@link ByteSource}. This is a
separate method because of its use by
+ * the {@link
org.apache.cassandra.index.sai.disk.v1.WidePrimaryKeyMap} to create its
clustering keys.
+ */
+ public Clustering<?> clusteringFromByteComparable(ByteSource
byteSource)
{
- assert token != null : "Cannot create a deferred primary key with
a null token";
- assert primaryKeySupplier != null : "Cannot create a deferred
primary key with a null key supplier";
+ Clustering<?> clustering =
clusteringComparator.clusteringFromByteComparable(ByteBufferAccessor.instance,
v -> byteSource);
- return new MutablePrimaryKey(token, primaryKeySupplier);
+ // Clustering is null for static rows
+ return (clustering == null) ? Clustering.STATIC_CLUSTERING :
clustering;
}
- abstract class AbstractPrimaryKey implements PrimaryKey
+ class TokenOnlyPrimaryKey implements PrimaryKey
{
+ protected final Token token;
+
+ TokenOnlyPrimaryKey(Token token)
+ {
+ this.token = token;
+ }
+
@Override
- @SuppressWarnings("ConstantConditions")
- public ByteSource asComparableBytes(ByteComparable.Version version)
+ public Kind kind()
{
- ByteSource keyComparable =
ByteSource.of(partitionKey().getKey(), version);
- if (clusteringComparator.size() == 0)
- return keyComparable;
- // It is important that the
ClusteringComparator.asBytesComparable method is used
- // to maintain the correct clustering sort order
- ByteSource clusteringComparable = clustering() == null ||
- clustering().isEmpty() ? null
- :
clusteringComparator.asByteComparable(clustering())
-
.asComparableBytes(version);
- return ByteSource.withTerminator(version ==
ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM
-
: ByteSource.TERMINATOR,
- keyComparable,
- clusteringComparable);
+ return Kind.TOKEN;
}
@Override
- @SuppressWarnings("ConstantConditions")
- public int compareTo(PrimaryKey o)
+ public Token token()
{
- int cmp = token().compareTo(o.token());
+ return token;
+ }
- // If the tokens don't match then we don't need to compare any
more of the key.
- // Otherwise, if either of the keys are token only we can only
compare tokens
- if ((cmp != 0) || isTokenOnly() || o.isTokenOnly())
- return cmp;
+ @Override
+ public DecoratedKey partitionKey()
+ {
+ throw new UnsupportedOperationException();
+ }
- // Next compare the partition keys. If they are not equal or
- // this is a single row partition key or there are no
- // clusterings then we can return the result of this without
- // needing to compare the clusterings
- cmp = partitionKey().compareTo(o.partitionKey());
- if (cmp != 0 || hasEmptyClustering() || o.hasEmptyClustering())
- return cmp;
- return clusteringComparator.compare(clustering(),
o.clustering());
+ @Override
+ public Clustering<?> clustering()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteSource asComparableBytes(Version version)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int compareTo(PrimaryKey o)
+ {
+ return token().compareTo(o.token());
}
@Override
public int hashCode()
{
- return Objects.hash(token(), partitionKey(), clustering(),
clusteringComparator);
+ return Objects.hash(token(), clusteringComparator);
}
@Override
@@ -145,165 +210,219 @@ public interface PrimaryKey extends
Comparable<PrimaryKey>, ByteComparable
}
@Override
- @SuppressWarnings("ConstantConditions")
public String toString()
{
- return isTokenOnly() ? String.format("PrimaryKey: { token: %s
}", token())
- : String.format("PrimaryKey: { token: %s,
partition: %s, clustering: %s:%s } ",
- token(),
- partitionKey(),
- clustering() == null ?
null : clustering().kind(),
- clustering() == null ?
null : Arrays.stream(clustering().getBufferArray())
-
.map(ByteBufferUtil::bytesToHex)
-
.collect(Collectors.joining(", ")));
+ return String.format("PrimaryKey: { token: %s }", token());
}
}
- class TokenOnlyPrimaryKey extends AbstractPrimaryKey
+ class SkinnyPrimaryKey extends TokenOnlyPrimaryKey
{
- private final Token token;
+ protected final DecoratedKey partitionKey;
- TokenOnlyPrimaryKey(Token token)
+ SkinnyPrimaryKey(DecoratedKey partitionKey)
{
- this.token = token;
+ super(partitionKey.getToken());
+ this.partitionKey = partitionKey;
}
@Override
- public boolean isTokenOnly()
+ public Kind kind()
{
- return true;
+ return Kind.SKINNY;
}
@Override
- public Token token()
+ public DecoratedKey partitionKey()
{
- return token;
+ return partitionKey;
}
@Override
- public DecoratedKey partitionKey()
+ public ByteSource asComparableBytes(Version version)
{
- throw new UnsupportedOperationException();
+ return ByteSource.of(partitionKey().getKey(), version);
}
@Override
- public Clustering<?> clustering()
+ public int compareTo(PrimaryKey o)
{
- throw new UnsupportedOperationException();
+ int cmp = super.compareTo(o);
+
+ // If the tokens don't match then we don't need to compare any
more of the key.
+ // Otherwise, if the other key is token only we can only
compare tokens
+ // This is used by the ResultRetriever to skip to the current
key range start position
+ // during result retrieval.
+ if ((cmp != 0) || o.kind() == Kind.TOKEN)
+ return cmp;
+
+ // Finally compare the partition keys
+ return partitionKey().compareTo(o.partitionKey());
}
@Override
- public ByteSource asComparableBytes(Version version)
+ public int hashCode()
{
- throw new UnsupportedOperationException();
+ return Objects.hash(token(), partitionKey(), Clustering.EMPTY,
clusteringComparator);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("PrimaryKey: { token: %s, partition: %s
}", token(), partitionKey());
}
}
- class ImmutablePrimaryKey extends AbstractPrimaryKey
+ class StaticPrimaryKey extends SkinnyPrimaryKey
{
- private final Token token;
- private final DecoratedKey partitionKey;
- private final Clustering<?> clustering;
+ StaticPrimaryKey(DecoratedKey partitionKey)
+ {
+ super(partitionKey);
+ }
- ImmutablePrimaryKey(DecoratedKey partitionKey, Clustering<?>
clustering)
+ @Override
+ public Kind kind()
{
- this.token = partitionKey.getToken();
- this.partitionKey = partitionKey;
- this.clustering = clustering;
+ return Kind.STATIC;
}
@Override
- public Token token()
+ public Clustering<?> clustering()
{
- return token;
+ return Clustering.STATIC_CLUSTERING;
}
@Override
- public DecoratedKey partitionKey()
+ public ByteSource asComparableBytes(ByteComparable.Version version)
{
- return partitionKey;
+ ByteSource keyComparable =
ByteSource.of(partitionKey().getKey(), version);
+ // Static clustering cannot be serialized or made to a byte
comparable, so we use null as the component.
+ return ByteSource.withTerminator(version ==
ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM
+
: ByteSource.TERMINATOR,
+ keyComparable,
+ null);
}
@Override
- public Clustering<?> clustering()
+ public int compareTo(PrimaryKey o)
{
- return clustering;
+ int cmp = super.compareTo(o);
+ if (cmp != 0 || o.kind() == Kind.TOKEN || o.kind() ==
Kind.SKINNY)
+ return cmp;
+ // The static clustering comes first in the sort order of if
the other key has static clustering we
+ // are equals otherwise we are less than the other
+ return o.kind() == Kind.STATIC ? 0 : -1;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(token(), partitionKey(),
Clustering.STATIC_CLUSTERING, clusteringComparator);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("PrimaryKey: { token: %s, partition: %s,
clustering: STATIC } ", token(), partitionKey());
}
}
- class MutablePrimaryKey extends AbstractPrimaryKey
+ class WidePrimaryKey extends SkinnyPrimaryKey
{
- private final Token token;
- private final Supplier<PrimaryKey> primaryKeySupplier;
+ private final Clustering<?> clustering;
- private boolean notLoaded = true;
- private DecoratedKey partitionKey;
- private Clustering<?> clustering;
+ WidePrimaryKey(DecoratedKey partitionKey, Clustering<?> clustering)
+ {
+ super(partitionKey);
+ this.clustering = clustering;
+ }
- MutablePrimaryKey(Token token, Supplier<PrimaryKey>
primaryKeySupplier)
+ @Override
+ public Kind kind()
{
- this.token = token;
- this.primaryKeySupplier = primaryKeySupplier;
+ return Kind.WIDE;
}
@Override
- public Token token()
+ public Clustering<?> clustering()
{
- return token;
+ return clustering;
}
@Override
- public DecoratedKey partitionKey()
+ public ByteSource asComparableBytes(ByteComparable.Version version)
{
- loadDeferred();
- return partitionKey;
+ ByteSource keyComparable =
ByteSource.of(partitionKey().getKey(), version);
+ // It is important that the
ClusteringComparator.asBytesComparable method is used
+ // to maintain the correct clustering sort order.
+ ByteSource clusteringComparable =
clusteringComparator.asByteComparable(clustering()).asComparableBytes(version);
+ return ByteSource.withTerminator(version ==
ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM
+
: ByteSource.TERMINATOR,
+ keyComparable,
+ clusteringComparable);
}
@Override
- public Clustering<?> clustering()
+ public int compareTo(PrimaryKey o)
{
- loadDeferred();
- return clustering;
+ int cmp = super.compareTo(o);
+ if (cmp != 0 || o.kind() == Kind.TOKEN || o.kind() ==
Kind.SKINNY)
+ return cmp;
+ // At this point we will be greater than other if it is static
+ if (o.kind() == Kind.STATIC)
+ return 1;
+ return clusteringComparator.compare(clustering(),
o.clustering());
}
- private void loadDeferred()
+ @Override
+ public int hashCode()
{
- if (notLoaded)
- {
- PrimaryKey deferredPrimaryKey = primaryKeySupplier.get();
- this.partitionKey = deferredPrimaryKey.partitionKey();
- this.clustering = deferredPrimaryKey.clustering();
- notLoaded = false;
- }
+ return Objects.hash(token(), partitionKey(), clustering(),
clusteringComparator);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("PrimaryKey: { token: %s, partition: %s,
clustering: %s:%s } ",
+ token(),
+ partitionKey(),
+ clustering().kind(),
+
Arrays.stream(clustering().getBufferArray())
+ .map(ByteBufferUtil::bytesToHex)
+ .collect(Collectors.joining(", ")));
}
}
}
- default boolean isTokenOnly()
- {
- return false;
- }
+ /**
+ * Returns the {@link Kind} of the {@link PrimaryKey}. The {@link Kind} is
used locally in the {@link #compareTo(Object)}
+ * methods to determine how far the comparision needs to go between keys.
+ * <p>
+ * The {@link Kind} values have a categorization of {@code isClustering}.
This indicates whether the key belongs to
+ * a table with clustering tables or not.
+ */
+ Kind kind();
+ /**
+ * Returns the {@link Token} component of the {@link PrimaryKey}
+ */
Token token();
- @Nullable
+ /**
+ * Returns the {@link DecoratedKey} representing the partition key of the
{@link PrimaryKey}.
+ * <p>
+ * Note: This cannot be null but some {@link PrimaryKey} implementations
can throw {@link UnsupportedOperationException}
+ * if they do not support partition keys.
+ */
DecoratedKey partitionKey();
- @Nullable
- Clustering<?> clustering();
-
/**
- * Return whether the primary key has an empty clustering or not.
- * By default, the clustering is empty if the internal clustering
- * is null or is empty.
- *
- * @return {@code true} if the clustering is empty, otherwise {@code false}
+ * Returns the {@link Clustering} representing the clustering component of
the {@link PrimaryKey}.
+ * <p>
+ * Note: This cannot be null but some {@link PrimaryKey} implementations
can throw {@link UnsupportedOperationException}
+ * if they do not support clustering columns.
*/
- @SuppressWarnings("ConstantConditions")
- default boolean hasEmptyClustering()
- {
- return clustering() == null || clustering().isEmpty();
- }
+ Clustering<?> clustering();
/**
* Returns the {@link PrimaryKey} as a {@link ByteSource} byte comparable
representation.
@@ -314,6 +433,7 @@ public interface PrimaryKey extends Comparable<PrimaryKey>,
ByteComparable
*
* @param version the {@link ByteComparable.Version} to use for the
implementation
* @return the {@code ByteSource} byte comparable.
+ * @throws UnsupportedOperationException for {@link PrimaryKey}
implementations that are not byte-comparable
*/
ByteSource asComparableBytes(ByteComparable.Version version);
}
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
b/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
index 54f909efad..65bded3a45 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/sai/KeyLookupBench.java
@@ -117,13 +117,13 @@ public class KeyLookupBench
metadata.name,
Util.newUUIDGen().get());
- indexDescriptor = IndexDescriptor.create(descriptor,
metadata.comparator);
+ indexDescriptor = IndexDescriptor.create(descriptor,
metadata.partitioner, metadata.comparator);
CassandraRelevantProperties.SAI_SORTED_TERMS_PARTITION_BLOCK_SHIFT.setInt(partitionBlockShift);
CassandraRelevantProperties.SAI_SORTED_TERMS_CLUSTERING_BLOCK_SHIFT.setInt(clusteringBlockShift);
SSTableComponentsWriter writer = new
SSTableComponentsWriter(indexDescriptor);
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(metadata.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(metadata.partitioner, metadata.comparator);
PrimaryKey[] primaryKeys = new PrimaryKey[rows];
int partition = 0;
diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java
b/test/unit/org/apache/cassandra/index/sai/SAITester.java
index c1080fb528..e043d6602f 100644
--- a/test/unit/org/apache/cassandra/index/sai/SAITester.java
+++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java
@@ -71,6 +71,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sai.disk.SSTableIndex;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
@@ -141,7 +142,7 @@ public abstract class SAITester extends CQLTester
public static final ClusteringComparator EMPTY_COMPARATOR = new
ClusteringComparator();
- public static final PrimaryKey.Factory TEST_FACTORY = new
PrimaryKey.Factory(EMPTY_COMPARATOR);
+ public static final PrimaryKey.Factory TEST_FACTORY = new
PrimaryKey.Factory(Murmur3Partitioner.instance, EMPTY_COMPARATOR);
@BeforeClass
public static void setUpClass()
@@ -254,6 +255,7 @@ public abstract class SAITester extends CQLTester
return new IndexContext("test_ks",
"test_cf",
UTF8Type.instance,
+ Murmur3Partitioner.instance,
new ClusteringComparator(),
ColumnMetadata.regularColumn("sai",
"internal", name, validator),
IndexTarget.Type.SIMPLE,
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
index b6546c9551..264e217e70 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/format/IndexDescriptorTest.java
@@ -31,6 +31,7 @@ import org.junit.rules.TemporaryFolder;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -68,7 +69,7 @@ public class IndexDescriptorTest
{
createFileOnDisk("-SAI+aa+GroupComplete.db");
- IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
SAITester.EMPTY_COMPARATOR);
+ IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR);
assertEquals(Version.AA, indexDescriptor.version);
assertTrue(indexDescriptor.hasComponent(IndexComponent.GROUP_COMPLETION_MARKER));
@@ -79,7 +80,7 @@ public class IndexDescriptorTest
{
createFileOnDisk("-SAI+aa+test_index+ColumnComplete.db");
- IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
SAITester.EMPTY_COMPARATOR);
+ IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR);
IndexContext indexContext = SAITester.createIndexContext("test_index",
UTF8Type.instance);
assertEquals(Version.AA, indexDescriptor.version);
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
index 485d2b095a..dd3547ed79 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java
@@ -62,12 +62,12 @@ public class InvertedIndexSearcherTest extends
SAIRandomizedTester
{
public static final PrimaryKeyMap TEST_PRIMARY_KEY_MAP = new
PrimaryKeyMap()
{
- private final PrimaryKey.Factory primaryKeyFactory = new
PrimaryKey.Factory(new ClusteringComparator());
+ private final PrimaryKey.Factory primaryKeyFactory = new
PrimaryKey.Factory(Murmur3Partitioner.instance, new ClusteringComparator());
@Override
public PrimaryKey primaryKeyFromRowId(long sstableRowId)
{
- return primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(sstableRowId));
+ return primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(sstableRowId));
}
@Override
@@ -127,7 +127,7 @@ public class InvertedIndexSearcherTest extends
SAIRandomizedTester
final int idxToSkip = numPostings - 7;
// tokens are equal to their corresponding row IDs
final long tokenToSkip =
termsEnum.get(t).right.get(idxToSkip);
- results.skipTo(SAITester.TEST_FACTORY.createTokenOnly(new
Murmur3Partitioner.LongToken(tokenToSkip)));
+ results.skipTo(SAITester.TEST_FACTORY.create(new
Murmur3Partitioner.LongToken(tokenToSkip)));
for (int p = idxToSkip; p < numPostings; ++p)
{
@@ -189,8 +189,8 @@ public class InvertedIndexSearcherTest extends
SAIRandomizedTester
size,
0,
Long.MAX_VALUE,
-
SAITester.TEST_FACTORY.createTokenOnly(DatabaseDescriptor.getPartitioner().getMinimumToken()),
-
SAITester.TEST_FACTORY.createTokenOnly(DatabaseDescriptor.getPartitioner().getMaximumToken()),
+
SAITester.TEST_FACTORY.create(DatabaseDescriptor.getPartitioner().getMinimumToken()),
+
SAITester.TEST_FACTORY.create(DatabaseDescriptor.getPartitioner().getMaximumToken()),
wrap(termsEnum.get(0).left),
wrap(termsEnum.get(terms - 1).left),
indexMetas);
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
index 96d746df29..cbd74d8bef 100644
--- a/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
@@ -54,6 +55,7 @@ import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import static org.apache.cassandra.Util.dk;
@@ -75,6 +77,8 @@ public class SegmentFlushTest
public static void init()
{
DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
}
@After
@@ -106,6 +110,7 @@ public class SegmentFlushTest
{
Path tmpDir = Files.createTempDirectory("SegmentFlushTest");
IndexDescriptor indexDescriptor = IndexDescriptor.create(new
Descriptor(new File(tmpDir.toFile()), "ks", "cf", new
SequenceBasedSSTableId(1)),
+
Murmur3Partitioner.instance,
SAITester.EMPTY_COMPARATOR);
ColumnMetadata column = ColumnMetadata.regularColumn("sai",
"internal", "column", UTF8Type.instance);
@@ -114,6 +119,7 @@ public class SegmentFlushTest
IndexContext indexContext = new IndexContext("ks",
"cf",
UTF8Type.instance,
+
Murmur3Partitioner.instance,
new
ClusteringComparator(),
column,
IndexTarget.Type.SIMPLE,
@@ -127,13 +133,13 @@ public class SegmentFlushTest
DecoratedKey key1 = keys.get(0);
ByteBuffer term1 = UTF8Type.instance.decompose("a");
Row row1 = createRow(column, term1);
- writer.addRow(SAITester.TEST_FACTORY.create(key1, Clustering.EMPTY),
row1, sstableRowId1);
+ writer.addRow(SAITester.TEST_FACTORY.create(key1), row1,
sstableRowId1);
// expect a flush if exceed max rowId per segment
DecoratedKey key2 = keys.get(1);
ByteBuffer term2 = UTF8Type.instance.decompose("b");
Row row2 = createRow(column, term2);
- writer.addRow(SAITester.TEST_FACTORY.create(key2, Clustering.EMPTY),
row2, sstableRowId2);
+ writer.addRow(SAITester.TEST_FACTORY.create(key2), row2,
sstableRowId2);
writer.complete(Stopwatch.createStarted());
@@ -147,8 +153,8 @@ public class SegmentFlushTest
segmentRowIdOffset = sstableRowId1;
posting1 = 0;
posting2 = segments == 1 ? (int) (sstableRowId2 - segmentRowIdOffset)
: 0;
- minKey = SAITester.TEST_FACTORY.createTokenOnly(key1.getToken());
- maxKey = segments == 1 ?
SAITester.TEST_FACTORY.createTokenOnly(key2.getToken()) : minKey;
+ minKey = SAITester.TEST_FACTORY.create(key1.getToken());
+ maxKey = segments == 1 ?
SAITester.TEST_FACTORY.create(key2.getToken()) : minKey;
minTerm = term1;
maxTerm = segments == 1 ? term2 : term1;
numRows = segments == 1 ? 2 : 1;
@@ -160,7 +166,7 @@ public class SegmentFlushTest
segmentRowIdOffset = sstableRowId2;
posting1 = 0;
posting2 = 0;
- minKey = SAITester.TEST_FACTORY.createTokenOnly(key2.getToken());
+ minKey = SAITester.TEST_FACTORY.create(key2.getToken());
maxKey = minKey;
minTerm = term2;
maxTerm = term2;
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
index 767ac6fa0f..9a6a4dcb15 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/v1/WideRowPrimaryKeyTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import org.junit.Test;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.utils.AbstractPrimaryKeyTester;
@@ -42,7 +43,7 @@ public class WideRowPrimaryKeyTest extends
AbstractPrimaryKeyTester
SSTableComponentsWriter writer = new
SSTableComponentsWriter(indexDescriptor);
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartitionMultipleClusteringAsc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
compositePartitionMultipleClusteringAsc.comparator);
int rows = nextInt(1000, 10000);
PrimaryKey[] keys = new PrimaryKey[rows];
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
index b69e5155c8..a62fe3991a 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeIndexBuilder.java
@@ -65,12 +65,12 @@ public class BlockBalancedTreeIndexBuilder
{
public static final PrimaryKeyMap TEST_PRIMARY_KEY_MAP = new
PrimaryKeyMap()
{
- private final PrimaryKey.Factory primaryKeyFactory = new
PrimaryKey.Factory(null);
+ private final PrimaryKey.Factory primaryKeyFactory = new
PrimaryKey.Factory(Murmur3Partitioner.instance, null);
@Override
public PrimaryKey primaryKeyFromRowId(long sstableRowId)
{
- return primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(sstableRowId));
+ return primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(sstableRowId));
}
@Override
@@ -122,8 +122,8 @@ public class BlockBalancedTreeIndexBuilder
minSegmentRowId,
maxSegmentRowId,
// min/max is unused for now
-
SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()),
-
SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()),
+
SAITester.TEST_FACTORY.create(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()),
+
SAITester.TEST_FACTORY.create(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()),
UTF8Type.instance.fromString("c"),
UTF8Type.instance.fromString("d"),
indexMetas);
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
index bd522bde5b..92a31f87f7 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/v1/keystore/KeyLookupTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
import org.junit.Before;
import org.junit.Test;
-import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -75,7 +74,7 @@ public class KeyLookupTest extends SAIRandomizedTester
{
ByteBuffer buffer =
UTF8Type.instance.decompose(Integer.toString(x));
DecoratedKey partitionKey =
Murmur3Partitioner.instance.decorateKey(buffer);
- PrimaryKey primaryKey =
SAITester.TEST_FACTORY.create(partitionKey, Clustering.EMPTY);
+ PrimaryKey primaryKey =
SAITester.TEST_FACTORY.create(partitionKey);
primaryKeys.add(primaryKey);
}
@@ -316,35 +315,18 @@ public class KeyLookupTest extends SAIRandomizedTester
// iterate ascending
withKeyLookupCursor(cursor -> {
for (int x = 0; x < keys.size(); x++)
- {
- ByteComparable key = cursor.seekToPointId(x);
-
- byte[] bytes =
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
-
- assertArrayEquals(keys.get(x), bytes);
- }
+ assertArrayEquals(keys.get(x),
ByteSourceInverse.readBytes(cursor.seekToPointId(x)));
});
// iterate ascending skipping blocks
withKeyLookupCursor(cursor -> {
for (int x = 0; x < keys.size(); x += 17)
- {
- ByteComparable key = cursor.seekToPointId(x);
-
- byte[] bytes =
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
-
- assertArrayEquals(keys.get(x), bytes);
- }
+ assertArrayEquals(keys.get(x),
ByteSourceInverse.readBytes(cursor.seekToPointId(x)));
});
withKeyLookupCursor(cursor -> {
- ByteComparable key = cursor.seekToPointId(7);
- byte[] bytes =
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
- assertArrayEquals(keys.get(7), bytes);
-
- key = cursor.seekToPointId(7);
- bytes =
ByteSourceInverse.readBytes(key.asComparableBytes(ByteComparable.Version.OSS50));
- assertArrayEquals(keys.get(7), bytes);
+ assertArrayEquals(keys.get(7),
ByteSourceInverse.readBytes(cursor.seekToPointId(7)));
+ assertArrayEquals(keys.get(7),
ByteSourceInverse.readBytes(cursor.seekToPointId(7)));
});
}
diff --git
a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
index 6f1afe7eb9..821942588e 100644
---
a/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/iterators/KeyRangeConcatIteratorTest.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue;
public class KeyRangeConcatIteratorTest extends AbstractKeyRangeIteratorTester
{
- PrimaryKey.Factory primaryKeyFactory = new PrimaryKey.Factory(null);
+ PrimaryKey.Factory primaryKeyFactory = new
PrimaryKey.Factory(Murmur3Partitioner.instance, null);
@Test
public void testValidation()
{
@@ -427,7 +427,7 @@ public class KeyRangeConcatIteratorTest extends
AbstractKeyRangeIteratorTester
private String createErrorMessage(int max, int min)
{
return String.format(KeyRangeConcatIterator.MUST_BE_SORTED_ERROR,
- primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(max)),
- primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(min)));
+ primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(max)),
+ primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(min)));
}
}
diff --git
a/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
b/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
index 9a271f79d1..5c4122f99d 100644
--- a/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
+++ b/test/unit/org/apache/cassandra/index/sai/iterators/LongIterator.java
@@ -92,7 +92,7 @@ public class LongIterator extends KeyRangeIterator
public static PrimaryKey fromToken(long token)
{
- return SAITester.TEST_FACTORY.createTokenOnly(new
Murmur3Partitioner.LongToken(token));
+ return SAITester.TEST_FACTORY.create(new
Murmur3Partitioner.LongToken(token));
}
diff --git
a/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
b/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
index cbf5d05675..3c21941a2e 100644
---
a/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
+++
b/test/unit/org/apache/cassandra/index/sai/memory/AbstractInMemoryKeyRangeIteratorTester.java
@@ -35,7 +35,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
@Before
public void setup()
{
- primaryKeyFactory = new PrimaryKey.Factory(SAITester.EMPTY_COMPARATOR);
+ primaryKeyFactory = new
PrimaryKey.Factory(Murmur3Partitioner.instance, SAITester.EMPTY_COMPARATOR);
}
@Test
@@ -99,7 +99,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
{
KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
- iterator.skipTo(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(0)));
+ iterator.skipTo(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(0)));
assertIterator(iterator, 1, 2, 3);
}
@@ -109,7 +109,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
{
KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
- iterator.skipTo(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(1)));
+ iterator.skipTo(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(1)));
assertIterator(iterator, 1, 2, 3);
}
@@ -119,7 +119,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
{
KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
- iterator.skipTo(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(2)));
+ iterator.skipTo(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(2)));
assertIterator(iterator, 2, 3);
}
@@ -129,7 +129,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
{
KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
- iterator.skipTo(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(3)));
+ iterator.skipTo(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(3)));
assertIterator(iterator, 3);
}
@@ -139,7 +139,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
{
KeyRangeIterator iterator = makeIterator(1, 3, 1, 2, 3);
- iterator.skipTo(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(4)));
+ iterator.skipTo(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(4)));
assertIterator(iterator);
}
@@ -149,7 +149,7 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
{
KeyRangeIterator iterator = makeIterator(1, 3, 1, 1, 2, 2, 3, 3);
- iterator.skipTo(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(2)));
+ iterator.skipTo(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(2)));
assertIterator(iterator, 2, 3);
}
@@ -168,6 +168,6 @@ public abstract class AbstractInMemoryKeyRangeIteratorTester
protected PrimaryKey keyForToken(long token)
{
- return primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(token));
+ return primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(token));
}
}
diff --git
a/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
b/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
index 707db7f1e9..5864c79b55 100644
---
a/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/memory/PriorityInMemoryKeyRangeIteratorTest.java
@@ -33,8 +33,8 @@ public class PriorityInMemoryKeyRangeIteratorTest extends
AbstractInMemoryKeyRan
Arrays.stream(tokens).forEach(t -> queue.add(keyForToken(t)));
- return new
InMemoryKeyRangeIterator(primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(minimumTokenValue)),
-
primaryKeyFactory.createTokenOnly(new
Murmur3Partitioner.LongToken(maximumTokenValue)),
+ return new InMemoryKeyRangeIterator(primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(minimumTokenValue)),
+ primaryKeyFactory.create(new
Murmur3Partitioner.LongToken(maximumTokenValue)),
queue);
}
}
diff --git
a/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
b/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
index 9cf8ff5318..3277d79dc1 100644
--- a/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/memory/TrieMemoryIndexTest.java
@@ -245,6 +245,7 @@ public class TrieMemoryIndexTest extends SAIRandomizedTester
indexContext = new IndexContext(table.keyspace,
table.name,
table.partitionKeyType,
+ table.partitioner,
table.comparator,
target.left,
target.right,
diff --git
a/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
b/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
index a67cf97d9a..be887bc64c 100644
---
a/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
+++
b/test/unit/org/apache/cassandra/index/sai/utils/AbstractPrimaryKeyTester.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -53,6 +52,13 @@ public class AbstractPrimaryKeyTester extends
SAIRandomizedTester
.addClusteringColumn("ck1", UTF8Type.instance)
.build();
+ protected static final TableMetadata
simplePartitionStaticAndSingleClusteringAsc = TableMetadata.builder("test",
"test")
+
.partitioner(Murmur3Partitioner.instance)
+
.addPartitionKeyColumn("pk1", Int32Type.instance)
+
.addStaticColumn("sk1", Int32Type.instance)
+
.addClusteringColumn("ck1", UTF8Type.instance)
+
.build();
+
protected static final TableMetadata simplePartitionMultipleClusteringAsc
= TableMetadata.builder("test", "test")
.partitioner(Murmur3Partitioner.instance)
.addPartitionKeyColumn("pk1", Int32Type.instance)
@@ -118,13 +124,6 @@ public class AbstractPrimaryKeyTester extends
SAIRandomizedTester
.addClusteringColumn("ck2",
ReversedType.getInstance(UTF8Type.instance))
.build();
- protected void assertByteComparison(PrimaryKey a, PrimaryKey b, int
expected)
- {
- assertEquals(expected, ByteComparable.compare(a::asComparableBytes,
- b::asComparableBytes,
-
ByteComparable.Version.OSS50));
- }
-
protected void assertCompareToAndEquals(PrimaryKey a, PrimaryKey b, int
expected)
{
if (expected > 0)
diff --git
a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
index 00518bb972..35e556f763 100644
--- a/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
+++ b/test/unit/org/apache/cassandra/index/sai/utils/IndexInputLeakDetector.java
@@ -41,7 +41,7 @@ public class IndexInputLeakDetector extends TestRuleAdapter
{
TrackingIndexFileUtils trackingIndexFileUtils = new
TrackingIndexFileUtils(sequentialWriterOption);
trackedIndexFileUtils.add(trackingIndexFileUtils);
- return IndexDescriptor.create(descriptor, tableMetadata.comparator);
+ return IndexDescriptor.create(descriptor, tableMetadata.partitioner,
tableMetadata.comparator);
}
@Override
diff --git a/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
b/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
index 9b9f738a4e..c64372feab 100644
--- a/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/utils/PrimaryKeyTest.java
@@ -23,17 +23,18 @@ import java.util.Arrays;
import org.junit.Test;
import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.dht.Murmur3Partitioner;
public class PrimaryKeyTest extends AbstractPrimaryKeyTester
{
@Test
public void singlePartitionTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(simplePartition.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance, simplePartition.comparator);
int rows = nextInt(10, 100);
PrimaryKey[] keys = new PrimaryKey[rows];
for (int index = 0; index < rows; index++)
- keys[index] = factory.create(makeKey(simplePartition, index),
Clustering.EMPTY);
+ keys[index] = factory.create(makeKey(simplePartition, index));
Arrays.sort(keys);
@@ -43,11 +44,11 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void compositePartitionTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartition.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance, compositePartition.comparator);
int rows = nextInt(10, 100);
PrimaryKey[] keys = new PrimaryKey[rows];
for (int index = 0; index < rows; index++)
- keys[index] = factory.create(makeKey(compositePartition, index,
index + 1), Clustering.EMPTY);
+ keys[index] = factory.create(makeKey(compositePartition, index,
index + 1));
Arrays.sort(keys);
@@ -57,7 +58,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void simplePartitonSingleClusteringAscTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(simplePartitionSingleClusteringAsc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
simplePartitionSingleClusteringAsc.comparator);
int rows = nextInt(10, 100);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -78,10 +79,40 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
compareToAndEqualsTests(factory, keys);
}
+ @Test
+ public void simplePartitonStaticAndSingleClusteringAscTest()
+ {
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
simplePartitionStaticAndSingleClusteringAsc.comparator);
+ int rows = nextInt(10, 100);
+ PrimaryKey[] keys = new PrimaryKey[rows];
+ int partition = 0;
+ int clustering = 0;
+ for (int index = 0; index < rows; index++)
+ {
+ if (clustering == 0)
+ {
+ keys[index] =
factory.create(makeKey(simplePartitionSingleClusteringAsc, partition),
Clustering.STATIC_CLUSTERING);
+ clustering++;
+ }
+ else
+ keys[index] =
factory.create(makeKey(simplePartitionSingleClusteringAsc, partition),
+
makeClustering(simplePartitionSingleClusteringAsc,
Integer.toString(clustering++)));
+ if (clustering == 5)
+ {
+ clustering = 0;
+ partition++;
+ }
+ }
+
+ Arrays.sort(keys);
+
+ compareToAndEqualsTests(factory, keys);
+ }
+
@Test
public void simplePartitionMultipleClusteringAscTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(simplePartitionMultipleClusteringAsc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
simplePartitionMultipleClusteringAsc.comparator);
int rows = nextInt(100, 1000);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -111,7 +142,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void simplePartitonSingleClusteringDescTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(simplePartitionSingleClusteringDesc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
simplePartitionSingleClusteringDesc.comparator);
int rows = nextInt(10, 100);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -135,7 +166,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void simplePartitionMultipleClusteringDescTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(simplePartitionMultipleClusteringDesc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
simplePartitionMultipleClusteringDesc.comparator);
int rows = nextInt(100, 1000);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -165,7 +196,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void compositePartitionSingleClusteringAscTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartitionSingleClusteringAsc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
compositePartitionSingleClusteringAsc.comparator);
int rows = nextInt(10, 100);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -189,7 +220,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void compositePartitionMultipleClusteringAscTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartitionMultipleClusteringAsc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
compositePartitionMultipleClusteringAsc.comparator);
int rows = nextInt(100, 1000);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -219,7 +250,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void compositePartitionSingleClusteringDescTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartitionSingleClusteringDesc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
compositePartitionSingleClusteringDesc.comparator);
int rows = nextInt(10, 100);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -243,7 +274,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void compositePartitionMultipleClusteringDescTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartitionMultipleClusteringDesc.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
compositePartitionMultipleClusteringDesc.comparator);
int rows = nextInt(100, 1000);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -273,7 +304,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void simplePartitionMultipleClusteringMixedTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(simplePartitionMultipleClusteringMixed.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
simplePartitionMultipleClusteringMixed.comparator);
int rows = nextInt(100, 1000);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -303,7 +334,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
@Test
public void compositePartitionMultipleClusteringMixedTest()
{
- PrimaryKey.Factory factory = new
PrimaryKey.Factory(compositePartitionMultipleClusteringMixed.comparator);
+ PrimaryKey.Factory factory = new
PrimaryKey.Factory(Murmur3Partitioner.instance,
compositePartitionMultipleClusteringMixed.comparator);
int rows = nextInt(100, 1000);
PrimaryKey[] keys = new PrimaryKey[rows];
int partition = 0;
@@ -335,7 +366,7 @@ public class PrimaryKeyTest extends AbstractPrimaryKeyTester
for (int index = 0; index < keys.length - 1; index++)
{
PrimaryKey key = keys[index];
- PrimaryKey tokenOnlyKey = factory.createTokenOnly(key.token());
+ PrimaryKey tokenOnlyKey = factory.create(key.token());
assertCompareToAndEquals(tokenOnlyKey, key, 0);
assertCompareToAndEquals(key, key, 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]