Add row offset support to SASI Patch by Alex Petrov; reviewed by Pavel Yaskevich for CASSANDRA-11990
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d857b46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d857b46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d857b46 Branch: refs/heads/trunk Commit: 7d857b46fb070548bf5e5f6ff81db588f08ec22a Parents: 3c95d47 Author: Alex Petrov <[email protected]> Authored: Sun Jun 26 18:21:08 2016 +0200 Committer: Pavel Yaskevich <[email protected]> Committed: Mon Sep 5 22:17:11 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnIndex.java | 6 +- .../apache/cassandra/index/sasi/KeyFetcher.java | 98 +++++++ .../apache/cassandra/index/sasi/SASIIndex.java | 11 +- .../cassandra/index/sasi/SASIIndexBuilder.java | 13 +- .../cassandra/index/sasi/SSTableIndex.java | 41 +-- .../cassandra/index/sasi/conf/ColumnIndex.java | 4 +- .../index/sasi/conf/view/RangeTermTree.java | 4 + .../sasi/disk/AbstractTokenTreeBuilder.java | 276 ++++++++++-------- .../cassandra/index/sasi/disk/Descriptor.java | 33 ++- .../sasi/disk/DynamicTokenTreeBuilder.java | 59 ++-- .../cassandra/index/sasi/disk/KeyOffsets.java | 115 ++++++++ .../cassandra/index/sasi/disk/OnDiskIndex.java | 12 +- .../index/sasi/disk/OnDiskIndexBuilder.java | 16 +- .../index/sasi/disk/PerSSTableIndexWriter.java | 13 +- .../cassandra/index/sasi/disk/RowKey.java | 108 +++++++ .../index/sasi/disk/StaticTokenTreeBuilder.java | 18 +- .../apache/cassandra/index/sasi/disk/Token.java | 10 +- .../cassandra/index/sasi/disk/TokenTree.java | 288 ++++++++++++------- .../index/sasi/disk/TokenTreeBuilder.java | 72 +++-- .../index/sasi/memory/IndexMemtable.java | 8 +- .../index/sasi/memory/KeyRangeIterator.java | 49 ++-- .../cassandra/index/sasi/memory/MemIndex.java | 4 +- .../index/sasi/memory/SkipListMemIndex.java | 12 +- .../index/sasi/memory/TrieMemIndex.java | 45 ++- .../index/sasi/plan/QueryController.java | 49 ++-- .../cassandra/index/sasi/plan/QueryPlan.java | 174 ++++++++--- .../io/sstable/format/SSTableFlushObserver.java | 5 + .../io/sstable/format/SSTableReader.java | 33 ++- .../io/sstable/format/big/BigTableWriter.java | 8 +- .../org/apache/cassandra/utils/obs/BitUtil.java | 2 +- test/data/legacy-sasi/on-disk-sa-int2.db | Bin 0 -> 12312 bytes .../cassandra/index/sasi/SASIIndexTest.java | 25 +- .../index/sasi/disk/KeyOffsetsTest.java | 48 ++++ .../index/sasi/disk/OnDiskIndexTest.java | 216 +++++++------- .../sasi/disk/PerSSTableIndexWriterTest.java | 112 ++++++-- .../index/sasi/disk/TokenTreeTest.java | 208 +++++++------- .../index/sasi/plan/OperationTest.java | 2 +- .../index/sasi/utils/KeyConverter.java | 69 +++++ .../index/sasi/utils/LongIterator.java | 8 +- .../sasi/utils/RangeUnionIteratorTest.java | 17 ++ 41 files changed, 1547 insertions(+), 745 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c466dfe..28c2d84 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Add row offset support to SASI (CASSANDRA-11990) * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) * Add keep-alive to streaming (CASSANDRA-11841) * Tracing payload is passed through newSession(..) (CASSANDRA-11706) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 1116cc2..1f39927 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -121,9 +121,10 @@ public class ColumnIndex { Row staticRow = iterator.staticRow(); + long startPosition = currentPosition(); UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version); if (!observers.isEmpty()) - observers.forEach((o) -> o.nextUnfilteredCluster(staticRow)); + observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition)); } } @@ -232,6 +233,7 @@ public class ColumnIndex private void add(Unfiltered unfiltered) throws IOException { + final long origPos = writer.position(); long pos = currentPosition(); if (firstClustering == null) @@ -245,7 +247,7 @@ public class ColumnIndex // notify observers about each new row if (!observers.isEmpty()) - observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered)); + observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos)); lastClustering = unfiltered.clustering(); previousRowStart = pos; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java new file mode 100644 index 0000000..80ee167 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sasi; + +import java.io.IOException; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.index.sasi.disk.*; +import org.apache.cassandra.io.*; +import org.apache.cassandra.io.sstable.format.*; + + +public interface KeyFetcher +{ + public Clustering getClustering(long offset); + public DecoratedKey getPartitionKey(long offset); + + public RowKey getRowKey(long partitionOffset, long rowOffset); + + /** + * Fetches clustering and partition key from the sstable. + * + * Currently, clustering key is fetched from the data file of the sstable and partition key is + * read from the index file. Reading from index file helps us to warm up key cache in this case. + */ + public static class SSTableKeyFetcher implements KeyFetcher + { + private final SSTableReader sstable; + + public SSTableKeyFetcher(SSTableReader reader) + { + sstable = reader; + } + + @Override + public Clustering getClustering(long offset) + { + try + { + return sstable.clusteringAt(offset); + } + catch (IOException e) + { + throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename()); + } + } + + @Override + public DecoratedKey getPartitionKey(long offset) + { + try + { + return sstable.keyAt(offset); + } + catch (IOException e) + { + throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename()); + } + } + + @Override + public RowKey getRowKey(long partitionOffset, long rowOffset) + { + if (rowOffset == KeyOffsets.NO_OFFSET) + return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator); + else + return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator); + } + + public int hashCode() + { + return sstable.descriptor.hashCode(); + } + + public boolean equals(Object other) + { + return other instanceof SSTableKeyFetcher + && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SASIIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 4375964..65953a9 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -46,6 +46,7 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex; import org.apache.cassandra.index.sasi.conf.IndexMode; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode; import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter; +import org.apache.cassandra.index.sasi.disk.RowKey; import org.apache.cassandra.index.sasi.plan.QueryPlan; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.Descriptor; @@ -182,6 +183,14 @@ public class SASIIndex implements Index, INotificationConsumer return getTruncateTask(FBUtilities.timestampMicros()); } + public Callable<?> getTruncateTask(Collection<SSTableReader> sstablesToRebuild) + { + return () -> { + index.dropData(sstablesToRebuild); + return null; + }; + } + public Callable<?> getTruncateTask(long truncatedAt) { return () -> { @@ -252,7 +261,7 @@ public class SASIIndex implements Index, INotificationConsumer public void insertRow(Row row) { if (isNewData()) - adjustMemtableSize(index.index(key, row), opGroup); + adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup); } public void updateRow(Row oldRow, Row newRow) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index d50875a..d6706ea 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -94,16 +94,23 @@ class SASIIndexBuilder extends SecondaryIndexBuilder { RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); dataFile.seek(indexEntry.position); - ByteBufferUtil.readWithShortLength(dataFile); // key + int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key)) { // if the row has statics attached, it has to be indexed separately if (cfs.metadata.hasStaticColumns()) - indexWriter.nextUnfilteredCluster(partition.staticRow()); + { + long staticPosition = indexEntry.position + staticOffset; + indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition); + } + long position = dataFile.getPosition(); while (partition.hasNext()) - indexWriter.nextUnfilteredCluster(partition.next()); + { + indexWriter.nextUnfilteredCluster(partition.next(), position); + position = dataFile.getPosition(); + } } } catch (IOException ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java index c67c39c..f9c8abf 100644 --- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java @@ -18,28 +18,22 @@ package org.apache.cassandra.index.sasi; import java.io.File; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.OnDiskIndex; -import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeIterator; -import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.commons.lang3.builder.HashCodeBuilder; -import com.google.common.base.Function; - public class SSTableIndex { private final ColumnIndex columnIndex; @@ -65,7 +59,7 @@ public class SSTableIndex sstable.getFilename(), columnIndex.getIndexName()); - this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable)); + this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable)); } public OnDiskIndexBuilder.Mode mode() @@ -163,36 +157,5 @@ public class SSTableIndex return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor); } - private static class DecoratedKeyFetcher implements Function<Long, DecoratedKey> - { - private final SSTableReader sstable; - - DecoratedKeyFetcher(SSTableReader reader) - { - sstable = reader; - } - - public DecoratedKey apply(Long offset) - { - try - { - return sstable.keyAt(offset); - } - catch (IOException e) - { - throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename()); - } - } - - public int hashCode() - { - return sstable.descriptor.hashCode(); - } - public boolean equals(Object other) - { - return other instanceof DecoratedKeyFetcher - && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java index 0958113..459e5c3 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java @@ -30,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; @@ -40,6 +39,7 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; import org.apache.cassandra.index.sasi.conf.view.View; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.RowKey; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.memory.IndexMemtable; import org.apache.cassandra.index.sasi.plan.Expression; @@ -99,7 +99,7 @@ public class ColumnIndex return keyValidator; } - public long index(DecoratedKey key, Row row) + public long index(RowKey key, Row row) { return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java index d6b4551..2775c29 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sasi.conf.view; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -46,6 +47,9 @@ public class RangeTermTree implements TermTree public Set<SSTableIndex> search(Expression e) { + if (e == null) + return Collections.emptySet(); + ByteBuffer minTerm = e.lower == null ? min : e.lower.value; ByteBuffer maxTerm = e.upper == null ? max : e.upper.value; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java index 18994de..9245960 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java @@ -20,19 +20,18 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; +import java.util.function.*; +import com.carrotsearch.hppc.LongArrayList; +import com.carrotsearch.hppc.cursors.LongCursor; +import com.carrotsearch.hppc.cursors.LongObjectCursor; +import org.apache.cassandra.dht.*; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import com.carrotsearch.hppc.LongArrayList; -import com.carrotsearch.hppc.LongSet; -import com.carrotsearch.hppc.cursors.LongCursor; - public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder { protected int numBlocks; @@ -65,7 +64,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder public int serializedSize() { if (numBlocks == 1) - return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16)); + return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES); else return numBlocks * BLOCK_BYTES; } @@ -112,6 +111,15 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder buffer.clear(); } + /** + * Tree node, + * + * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf. + * + * Depending on the concrete implementation of {@code TokenTreeBuilder} + * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case + * of {@code DynamicTokenTreeBuilder} + */ protected abstract class Node { protected InteriorNode parent; @@ -179,8 +187,16 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder alignBuffer(buf, BLOCK_HEADER_BYTES); } + /** + * Shared header part, written for all node types: + * [ info byte ] [ token count ] [ min node token ] [ max node token ] + * [ 1b ] [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] + **/ private abstract class Header { + /** + * Serializes the shared part of the header + */ public void serialize(ByteBuffer buf) { buf.put(infoByte()) @@ -192,6 +208,12 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder protected abstract byte infoByte(); } + /** + * In addition to shared header part, root header stores version information, + * overall token count and min/max tokens for the whole tree: + * [ magic ] [ overall token count ] [ min tree token ] [ max tree token ] + * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ] + */ private class RootHeader extends Header { public void serialize(ByteBuffer buf) @@ -207,19 +229,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder { // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1) // if not leaf, clear both bits - return (byte) ((isLeaf()) ? 3 : 0); + return isLeaf() ? ENTRY_TYPE_MASK : 0; } protected void writeMagic(ByteBuffer buf) { switch (Descriptor.CURRENT_VERSION) { - case Descriptor.VERSION_AB: + case ab: buf.putShort(AB_MAGIC); break; - - default: + case ac: + buf.putShort(AC_MAGIC); break; + default: + throw new RuntimeException("Unsupported version"); } } @@ -249,6 +273,12 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder } + /** + * Leaf consists of + * - header (format described in {@code Header} ) + * - data (format described in {@code LeafEntry}) + * - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}. + */ protected abstract class Leaf extends Node { protected LongArrayList overflowCollisions; @@ -279,82 +309,98 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder protected abstract void serializeData(ByteBuffer buf); - protected LeafEntry createEntry(final long tok, final LongSet offsets) + protected LeafEntry createEntry(final long tok, final KeyOffsets offsets) { - int offsetCount = offsets.size(); + LongArrayList rawOffsets = new LongArrayList(offsets.size()); + + offsets.forEach(new Consumer<LongObjectCursor<long[]>>() + { + public void accept(LongObjectCursor<long[]> cursor) + { + for (long l : cursor.value) + { + rawOffsets.add(cursor.key); + rawOffsets.add(l); + } + } + }); + + int offsetCount = rawOffsets.size(); switch (offsetCount) { case 0: throw new AssertionError("no offsets for token " + tok); - case 1: - long offset = offsets.toArray()[0]; - if (offset > MAX_OFFSET) - throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET); - else if (offset <= Integer.MAX_VALUE) - return new SimpleLeafEntry(tok, offset); - else - return new FactoredOffsetLeafEntry(tok, offset); case 2: - long[] rawOffsets = offsets.toArray(); - if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE && - (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE)) - return new PackedCollisionLeafEntry(tok, rawOffsets); - else - return createOverflowEntry(tok, offsetCount, offsets); + return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1)); default: - return createOverflowEntry(tok, offsetCount, offsets); + assert offsetCount % 2 == 0; + if (offsetCount == 4) + { + if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE && + rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE) + { + return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1), + (int) rawOffsets.get(2), (int) rawOffsets.get(3)); + } + } + return createOverflowEntry(tok, offsetCount, rawOffsets); } } - private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets) + private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets) { if (overflowCollisions == null) - overflowCollisions = new LongArrayList(); + overflowCollisions = new LongArrayList(offsetCount); - LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount); - for (LongCursor o : offsets) - { - if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY) - throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf"); - else - overflowCollisions.add(o.value); - } + int overflowCount = (overflowCollisions.size() + offsetCount) / 2; + if (overflowCount >= OVERFLOW_TRAILER_CAPACITY) + throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount); + + LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2)); + overflowCollisions.addAll(offsets); return entry; } + /** + * A leaf of the B+-Tree, that holds information about the row offset(s) for + * the current token. + * + * Main 3 types of leaf entries are: + * 1) simple leaf entry: holding just a single row offset + * 2) packed collision leaf entry: holding two entries that would fit together into 168 bytes + * 3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf + */ protected abstract class LeafEntry { protected final long token; abstract public EntryType type(); - abstract public int offsetData(); - abstract public short offsetExtra(); public LeafEntry(final long tok) { token = tok; } - public void serialize(ByteBuffer buf) - { - buf.putShort((short) type().ordinal()) - .putShort(offsetExtra()) - .putLong(token) - .putInt(offsetData()); - } + public abstract void serialize(ByteBuffer buf); } - - // assumes there is a single offset and the offset is <= Integer.MAX_VALUE + /** + * Simple leaf, that can store a single row offset, having the following format: + * + * [ type ] [ token ] [ partition offset ] [ row offset ] + * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ] + */ protected class SimpleLeafEntry extends LeafEntry { - private final long offset; + private final long partitionOffset; + private final long rowOffset; - public SimpleLeafEntry(final long tok, final long off) + public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset) { super(tok); - offset = off; + this.partitionOffset = partitionOffset; + this.rowOffset = rowOffset; } public EntryType type() @@ -362,61 +408,38 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder return EntryType.SIMPLE; } - public int offsetData() - { - return (int) offset; - } - - public short offsetExtra() - { - return 0; - } - } - - // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET - // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits) - // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header - private class FactoredOffsetLeafEntry extends LeafEntry - { - private final long offset; - - public FactoredOffsetLeafEntry(final long tok, final long off) - { - super(tok); - offset = off; - } - - public EntryType type() - { - return EntryType.FACTORED; - } - - public int offsetData() - { - return (int) (offset >>> Short.SIZE); - } - - public short offsetExtra() + @Override + public void serialize(ByteBuffer buf) { - // exta offset is supposed to be an unsigned 16-bit integer - return (short) offset; + buf.putShort((short) type().ordinal()) + .putLong(token) + .putLong(partitionOffset) + .putLong(rowOffset); } } - // holds an entry with two offsets that can be packed in an int & a short - // the int offset is stored where offset is normally stored. short offset is - // stored in entry header - private class PackedCollisionLeafEntry extends LeafEntry + /** + * Packed collision entry, can store two offsets, if each one of their positions + * fit into 4 bytes. + * [ type ] [ token ] [ partition offset 1 ] [ row offset 1] [ partition offset 1 ] [ row offset 1] + * [ 2b (short) ] [ 8b (long) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ] + */ + protected class PackedCollisionLeafEntry extends LeafEntry { - private short smallerOffset; - private int largerOffset; + private final int partitionOffset1; + private final int rowOffset1; + private final int partitionOffset2; + private final int rowOffset2; - public PackedCollisionLeafEntry(final long tok, final long[] offs) + public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1, + final int partitionOffset2, final int rowOffset2) { super(tok); + this.partitionOffset1 = partitionOffset1; + this.rowOffset1 = rowOffset1; + this.partitionOffset2 = partitionOffset2; + this.rowOffset2 = rowOffset2; - smallerOffset = (short) Math.min(offs[0], offs[1]); - largerOffset = (int) Math.max(offs[0], offs[1]); } public EntryType type() @@ -424,21 +447,27 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder return EntryType.PACKED; } - public int offsetData() - { - return largerOffset; - } - - public short offsetExtra() + @Override + public void serialize(ByteBuffer buf) { - return smallerOffset; - } - } - - // holds an entry with three or more offsets, or two offsets that cannot - // be packed into an int & a short. the index into the overflow list - // is stored where the offset is normally stored. the number of overflowed offsets - // for the entry is stored in the entry header + buf.putShort((short) type().ordinal()) + .putLong(token) + .putInt(partitionOffset1) + .putInt(rowOffset1) + .putInt(partitionOffset2) + .putInt(rowOffset2); + } + } + + /** + * Overflow collision entry, holds an entry with three or more offsets, or two offsets + * that cannot be packed into 16 bytes. + * [ type ] [ token ] [ start index ] [ count ] + * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ] + * + * - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer + * - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer + */ private class OverflowCollisionLeafEntry extends LeafEntry { private final short startIndex; @@ -456,20 +485,23 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder return EntryType.OVERFLOW; } - public int offsetData() - { - return startIndex; - } - - public short offsetExtra() + @Override + public void serialize(ByteBuffer buf) { - return count; + buf.putShort((short) type().ordinal()) + .putLong(token) + .putLong(startIndex) + .putLong(count); } - } - } + /** + * Interior node consists of: + * - (interior node) header + * - tokens (serialized as longs, with count stored in header) + * - child offsets + */ protected class InteriorNode extends Node { protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java index 3aa6f14..3fa0f06 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java @@ -22,30 +22,29 @@ package org.apache.cassandra.index.sasi.disk; */ public class Descriptor { - public static final String VERSION_AA = "aa"; - public static final String VERSION_AB = "ab"; - public static final String CURRENT_VERSION = VERSION_AB; - public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION); - - public static class Version + public static enum Version { - public final String version; + aa, + ab, + ac + } - public Version(String version) - { - this.version = version; - } + public static final Version VERSION_AA = Version.aa; + public static final Version VERSION_AB = Version.ab; + public static final Version VERSION_AC = Version.ac; - public String toString() - { - return version; - } - } + public static final Version CURRENT_VERSION = Version.ac; + public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION); public final Version version; public Descriptor(String v) { - this.version = new Version(v); + this.version = Version.valueOf(v); + } + + public Descriptor(Version v) + { + this.version = v; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java index 2ddfd89..6e3e163 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java @@ -20,17 +20,14 @@ package org.apache.cassandra.index.sasi.disk; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.utils.Pair; - -import com.carrotsearch.hppc.LongOpenHashSet; -import com.carrotsearch.hppc.LongSet; -import com.carrotsearch.hppc.cursors.LongCursor; +import com.carrotsearch.hppc.cursors.LongObjectCursor; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.utils.*; public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder { - private final SortedMap<Long, LongSet> tokens = new TreeMap<>(); + private final SortedMap<Long, KeyOffsets> tokens = new TreeMap<>(); public DynamicTokenTreeBuilder() {} @@ -40,54 +37,52 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder add(data); } - public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data) + public DynamicTokenTreeBuilder(SortedMap<Long, KeyOffsets> data) { add(data); } - public void add(Long token, long keyPosition) + public void add(Long token, long partitionOffset, long rowOffset) { - LongSet found = tokens.get(token); + KeyOffsets found = tokens.get(token); if (found == null) - tokens.put(token, (found = new LongOpenHashSet(2))); + tokens.put(token, (found = new KeyOffsets(2))); - found.add(keyPosition); + found.put(partitionOffset, rowOffset); } - public void add(Iterator<Pair<Long, LongSet>> data) + public void add(Iterator<Pair<Long, KeyOffsets>> data) { while (data.hasNext()) { - Pair<Long, LongSet> entry = data.next(); - for (LongCursor l : entry.right) - add(entry.left, l.value); + Pair<Long, KeyOffsets> entry = data.next(); + for (LongObjectCursor<long[]> cursor : entry.right) + for (long l : cursor.value) + add(entry.left, cursor.key, l); } } - public void add(SortedMap<Long, LongSet> data) + public void add(SortedMap<Long, KeyOffsets> data) { - for (Map.Entry<Long, LongSet> newEntry : data.entrySet()) + for (Map.Entry<Long, KeyOffsets> newEntry : data.entrySet()) { - LongSet found = tokens.get(newEntry.getKey()); - if (found == null) - tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4))); - - for (LongCursor offset : newEntry.getValue()) - found.add(offset.value); + for (LongObjectCursor<long[]> cursor : newEntry.getValue()) + for (long l : cursor.value) + add(newEntry.getKey(), cursor.key, l); } } - public Iterator<Pair<Long, LongSet>> iterator() + public Iterator<Pair<Long, KeyOffsets>> iterator() { - final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator(); - return new AbstractIterator<Pair<Long, LongSet>>() + final Iterator<Map.Entry<Long, KeyOffsets>> iterator = tokens.entrySet().iterator(); + return new AbstractIterator<Pair<Long, KeyOffsets>>() { - protected Pair<Long, LongSet> computeNext() + protected Pair<Long, KeyOffsets> computeNext() { if (!iterator.hasNext()) return endOfData(); - Map.Entry<Long, LongSet> entry = iterator.next(); + Map.Entry<Long, KeyOffsets> entry = iterator.next(); return Pair.create(entry.getKey(), entry.getValue()); } }; @@ -161,9 +156,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder private class DynamicLeaf extends Leaf { - private final SortedMap<Long, LongSet> tokens; + private final SortedMap<Long, KeyOffsets> tokens; - DynamicLeaf(SortedMap<Long, LongSet> data) + DynamicLeaf(SortedMap<Long, KeyOffsets> data) { super(data.firstKey(), data.lastKey()); tokens = data; @@ -181,7 +176,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder protected void serializeData(ByteBuffer buf) { - for (Map.Entry<Long, LongSet> entry : tokens.entrySet()) + for (Map.Entry<Long, KeyOffsets> entry : tokens.entrySet()) createEntry(entry.getKey(), entry.getValue()).serialize(buf); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java new file mode 100644 index 0000000..db849fe --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sasi.disk; + +import java.util.*; + +import org.apache.commons.lang3.ArrayUtils; + +import com.carrotsearch.hppc.LongObjectOpenHashMap; +import com.carrotsearch.hppc.cursors.LongObjectCursor; + +public class KeyOffsets extends LongObjectOpenHashMap<long[]> +{ + public static final long NO_OFFSET = Long.MIN_VALUE; + + public KeyOffsets() { + super(4); + } + + public KeyOffsets(int initialCapacity) { + super(initialCapacity); + } + + public void put(long currentPartitionOffset, long currentRowOffset) + { + if (containsKey(currentPartitionOffset)) + super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset)); + else + super.put(currentPartitionOffset, asArray(currentRowOffset)); + } + + public long[] put(long currentPartitionOffset, long[] currentRowOffset) + { + if (containsKey(currentPartitionOffset)) + return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset)); + else + return super.put(currentPartitionOffset, currentRowOffset); + } + + public boolean equals(Object obj) + { + if (!(obj instanceof KeyOffsets)) + return false; + + KeyOffsets other = (KeyOffsets) obj; + if (other.size() != this.size()) + return false; + + for (LongObjectCursor<long[]> cursor : this) + if (!Arrays.equals(cursor.value, other.get(cursor.key))) + return false; + + return true; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("KeyOffsets { "); + forEach((a, b) -> { + sb.append(a).append(": ").append(Arrays.toString(b)); + }); + sb.append(" }"); + return sb.toString(); + } + + // primitive array creation + public static long[] asArray(long... vals) + { + return vals; + } + + private static long[] merge(long[] arr1, long[] arr2) + { + long[] copy = new long[arr2.length]; + int written = 0; + for (long l : arr2) + { + if (!ArrayUtils.contains(arr1, l)) + copy[written++] = l; + } + + if (written == 0) + return arr1; + + long[] merged = new long[arr1.length + written]; + System.arraycopy(arr1, 0, merged, 0, arr1.length); + System.arraycopy(copy, 0, merged, arr1.length, written); + return merged; + } + + private static long[] append(long[] arr1, long v) + { + if (ArrayUtils.contains(arr1, v)) + return arr1; + else + return ArrayUtils.add(arr1, v); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java index 4d43cd9..70d24a7 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java @@ -22,8 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.index.sasi.Term; +import org.apache.cassandra.index.sasi.*; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.plan.Expression.Op; import org.apache.cassandra.index.sasi.utils.MappedBuffer; @@ -37,12 +36,12 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult; +import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES; public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable { @@ -106,7 +105,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable protected final long indexSize; protected final boolean hasMarkedPartials; - protected final Function<Long, DecoratedKey> keyFetcher; + protected final KeyFetcher keyFetcher; protected final String indexPath; @@ -116,7 +115,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable protected final ByteBuffer minTerm, maxTerm, minKey, maxKey; @SuppressWarnings("resource") - public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader) + public OnDiskIndex(File index, AbstractType<?> cmp, KeyFetcher keyReader) { keyFetcher = keyReader; @@ -635,6 +634,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable { final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE); + // ([int] -1 for sparse, offset for non-sparse) if (isSparse()) return new PrefetchedTokensIterator(getSparseTokens()); @@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable NavigableMap<Long, Token> individualTokens = new TreeMap<>(); for (int i = 0; i < size; i++) { - Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher); + Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher); assert token != null; individualTokens.put(token.get(), token); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java index 4946f06..b6e2da5 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.*; import org.apache.cassandra.index.sasi.plan.Expression.Op; import org.apache.cassandra.index.sasi.sa.IndexedTerm; import org.apache.cassandra.index.sasi.sa.IntegralSA; @@ -37,7 +38,6 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import com.carrotsearch.hppc.LongArrayList; -import com.carrotsearch.hppc.LongSet; import com.carrotsearch.hppc.ShortArrayList; import com.google.common.annotations.VisibleForTesting; @@ -163,7 +163,7 @@ public class OnDiskIndexBuilder this.marksPartials = marksPartials; } - public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition) + public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset) { if (term.remaining() >= MAX_TERM_SIZE) { @@ -183,16 +183,16 @@ public class OnDiskIndexBuilder estimatedBytes += 64 + 48 + term.remaining(); } - tokens.add((Long) key.getToken().getTokenValue(), keyPosition); + tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset); // calculate key range (based on actual key values) for current index minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey; maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey; - // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added - // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key). + // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap<long[]> created + // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key). // in the case of hash collision for the token we may overestimate but this is extremely rare - estimatedBytes += 60 + 40 + 8; + estimatedBytes += 84 + 40 + 8; return this; } @@ -569,7 +569,7 @@ public class OnDiskIndexBuilder } } - private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm> + private class MutableDataBlock extends MutableBlock<InMemoryDataTerm> { private static final int MAX_KEYS_SPARSE = 5; @@ -651,7 +651,7 @@ public class OnDiskIndexBuilder { term.serialize(buffer); buffer.writeByte((byte) keys.getTokenCount()); - for (Pair<Long, LongSet> key : keys) + for (Pair<Long, KeyOffsets> key : keys) buffer.writeLong(key.left); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java index 9fa4e87..c204883 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java @@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver currentKeyPosition = curPosition; } - public void nextUnfilteredCluster(Unfiltered unfiltered) + public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset) { if (!unfiltered.isRow()) return; @@ -129,10 +129,15 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver if (index == null) indexes.put(column, (index = newIndex(columnIndex))); - index.add(value.duplicate(), currentKey, currentKeyPosition); + index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset); }); } + public void nextUnfilteredCluster(Unfiltered unfilteredCluster) + { + throw new UnsupportedOperationException("SASI Index does not support direct row access."); + } + public void complete() { if (isComplete) @@ -197,7 +202,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver this.currentBuilder = newIndexBuilder(); } - public void add(ByteBuffer term, DecoratedKey key, long keyPosition) + public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset) { if (term.remaining() == 0) return; @@ -235,7 +240,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver } } - currentBuilder.add(token, key, keyPosition); + currentBuilder.add(token, key, partitoinOffset, rowOffset); isAdded = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java new file mode 100644 index 0000000..518ad27 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyclustering ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sasi.disk; + +import java.util.*; +import java.util.stream.*; + +import org.apache.commons.lang.builder.HashCodeBuilder; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.*; + +/** + * Primary key of the found row, a combination of the Partition Key + * and clustering that belongs to the row. + */ +public class RowKey implements Comparable<RowKey> +{ + + public final DecoratedKey decoratedKey; + public final Clustering clustering; + + private final ClusteringComparator comparator; + + public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator) + { + this.decoratedKey = primaryKey; + this.clustering = clustering; + this.comparator = comparator; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RowKey rowKey = (RowKey) o; + + if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null) + return false; + return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null; + } + + public int hashCode() + { + return new HashCodeBuilder().append(decoratedKey).append(clustering).hashCode(); + } + + public int compareTo(RowKey other) + { + int cmp = this.decoratedKey.compareTo(other.decoratedKey); + if (cmp == 0 && clustering != null) + { + // Both clustering and rows should match + if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING) + return 0; + + return comparator.compare(this.clustering, other.clustering); + } + else + { + return cmp; + } + } + + public static RowKeyComparator COMPARATOR = new RowKeyComparator(); + + public String toString(CFMetaData metadata) + { + return String.format("RowKey: { pk : %s, clustering: %s}", + metadata.getKeyValidator().getString(decoratedKey.getKey()), + clustering.toString(metadata)); + } + + @Override + public String toString() + { + return String.format("RowKey: { pk : %s, clustering: %s}", + ByteBufferUtil.bytesToHex(decoratedKey.getKey()), + String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList()))); + } + + private static class RowKeyComparator implements Comparator<RowKey> + { + public int compare(RowKey o1, RowKey o2) + { + return o1.compareTo(o2); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java index 6e64c56..8a11d60 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java @@ -19,8 +19,7 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.SortedMap; +import java.util.*; import org.apache.cassandra.index.sasi.utils.CombinedTerm; import org.apache.cassandra.index.sasi.utils.RangeIterator; @@ -28,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Pair; -import com.carrotsearch.hppc.LongSet; import com.google.common.collect.Iterators; /** @@ -63,17 +61,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder combinedTerm = term; } - public void add(Long token, long keyPosition) + public void add(Long token, long partitionOffset, long rowOffset) { throw new UnsupportedOperationException(); } - public void add(SortedMap<Long, LongSet> data) + public void add(SortedMap<Long, KeyOffsets> data) { throw new UnsupportedOperationException(); } - public void add(Iterator<Pair<Long, LongSet>> data) + public void add(Iterator<Pair<Long, KeyOffsets>> data) { throw new UnsupportedOperationException(); } @@ -83,12 +81,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder return tokenCount == 0; } - public Iterator<Pair<Long, LongSet>> iterator() + public Iterator<Pair<Long, KeyOffsets>> iterator() { - Iterator<Token> iterator = combinedTerm.getTokenIterator(); - return new AbstractIterator<Pair<Long, LongSet>>() + @SuppressWarnings("resource") Iterator<Token> iterator = combinedTerm.getTokenIterator(); + return new AbstractIterator<Pair<Long, KeyOffsets>>() { - protected Pair<Long, LongSet> computeNext() + protected Pair<Long, KeyOffsets> computeNext() { if (!iterator.hasNext()) return endOfData(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/Token.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java index 4cd1ea3..2412477 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java @@ -18,13 +18,11 @@ package org.apache.cassandra.index.sasi.disk; import com.google.common.primitives.Longs; +import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.index.sasi.utils.CombinedValue; +import org.apache.cassandra.index.sasi.utils.*; -import com.carrotsearch.hppc.LongSet; - -public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey> +public abstract class Token implements CombinedValue<Long>, Iterable<RowKey> { protected final long token; @@ -38,7 +36,7 @@ public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKe return token; } - public abstract LongSet getOffsets(); + public abstract KeyOffsets getOffsets(); public int compareTo(CombinedValue<Long> o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java index c69ce00..1969627 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java @@ -19,22 +19,21 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.util.*; +import java.util.stream.*; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.index.sasi.utils.AbstractIterator; -import org.apache.cassandra.index.sasi.utils.CombinedValue; -import org.apache.cassandra.index.sasi.utils.MappedBuffer; -import org.apache.cassandra.index.sasi.utils.RangeIterator; -import org.apache.cassandra.utils.MergeIterator; - -import com.carrotsearch.hppc.LongOpenHashSet; -import com.carrotsearch.hppc.LongSet; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.commons.lang3.builder.HashCodeBuilder; -import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType; +import com.carrotsearch.hppc.cursors.LongObjectCursor; +import org.apache.cassandra.index.sasi.*; +import org.apache.cassandra.index.sasi.disk.Descriptor.*; +import org.apache.cassandra.index.sasi.utils.AbstractIterator; +import org.apache.cassandra.index.sasi.utils.*; +import org.apache.cassandra.utils.*; + +import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*; +import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*; // Note: all of the seek-able offsets contained in TokenTree should be sizeof(long) // even if currently only lower int portion of them if used, because that makes @@ -42,9 +41,6 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType; // without any on-disk format changes and/or re-indexing if one day we'll have a need to. public class TokenTree { - private static final int LONG_BYTES = Long.SIZE / 8; - private static final int SHORT_BYTES = Short.SIZE / 8; - private final Descriptor descriptor; private final MappedBuffer file; private final long startPos; @@ -66,8 +62,7 @@ public class TokenTree file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES); - if (!validateMagic()) - throw new IllegalArgumentException("invalid token tree"); + validateMagic(); tokenCount = file.getLong(); treeMinToken = file.getLong(); @@ -79,12 +74,12 @@ public class TokenTree return tokenCount; } - public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher) + public RangeIterator<Long, Token> iterator(KeyFetcher keyFetcher) { return new TokenTreeIterator(file.duplicate(), keyFetcher); } - public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher) + public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher) { seekToLeaf(searchToken, file); long leafStart = file.position(); @@ -95,21 +90,24 @@ public class TokenTree file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); - OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher); + OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher); + return token.get().equals(searchToken) ? token : null; } - private boolean validateMagic() + private void validateMagic() { - switch (descriptor.version.toString()) - { - case Descriptor.VERSION_AA: - return true; - case Descriptor.VERSION_AB: - return TokenTreeBuilder.AB_MAGIC == file.getShort(); - default: - return false; - } + if (descriptor.version == aa) + return; + + short magic = file.getShort(); + if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC) + return; + + if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC) + return; + + throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'"); } // finds leaf that *could* contain token @@ -136,18 +134,16 @@ public class TokenTree long minToken = file.getLong(); long maxToken = file.getLong(); - long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES; + long seekBase = blockStart + BLOCK_HEADER_BYTES; if (minToken > token) { // seek to beginning of child offsets to locate first child - file.position(seekBase + tokenCount * LONG_BYTES); - blockStart = (startPos + (int) file.getLong()); + file.position(seekBase + tokenCount * TOKEN_BYTES); } else if (maxToken < token) { // seek to end of child offsets to locate last child - file.position(seekBase + (2 * tokenCount) * LONG_BYTES); - blockStart = (startPos + (int) file.getLong()); + file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES); } else { @@ -158,12 +154,11 @@ public class TokenTree // file pointer is now at beginning of offsets if (offsetIndex == tokenCount) - file.position(file.position() + (offsetIndex * LONG_BYTES)); + file.position(file.position() + (offsetIndex * TOKEN_BYTES)); else - file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES); - - blockStart = (startPos + (int) file.getLong()); + file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES); } + blockStart = (startPos + (int) file.getLong()); } } @@ -172,8 +167,7 @@ public class TokenTree short offsetIndex = 0; for (int i = 0; i < tokenCount; i++) { - long readToken = file.getLong(); - if (searchToken < readToken) + if (searchToken < file.getLong()) break; offsetIndex++; @@ -193,10 +187,7 @@ public class TokenTree while (start <= end) { middle = start + ((end - start) >> 1); - - // each entry is 16 bytes wide, token is in bytes 4-11 - long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4)); - + long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)); if (token == searchToken) break; @@ -209,9 +200,9 @@ public class TokenTree return (short) middle; } - public class TokenTreeIterator extends RangeIterator<Long, Token> + private class TokenTreeIterator extends RangeIterator<Long, Token> { - private final Function<Long, DecoratedKey> keyFetcher; + private final KeyFetcher keyFetcher; private final MappedBuffer file; private long currentLeafStart; @@ -224,7 +215,7 @@ public class TokenTree protected boolean firstIteration = true; private boolean lastLeaf; - TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher) + TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher) { super(treeMinToken, treeMaxToken, tokenCount); @@ -314,13 +305,13 @@ public class TokenTree private Token getTokenAt(int idx) { - return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher); + return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher); } private long getTokenPosition(int idx) { - // skip 4 byte entry header to get position pointing directly at the entry's token - return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES); + // skip entry header to get position pointing directly at the entry's token + return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES); } private void seekToNextLeaf() @@ -347,15 +338,15 @@ public class TokenTree } } - public static class OnDiskToken extends Token + public class OnDiskToken extends Token { private final Set<TokenInfo> info = new HashSet<>(2); - private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator); + private final Set<RowKey> loadedKeys = new TreeSet<>(RowKey.COMPARATOR); - public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher) + private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher) { - super(buffer.getLong(position + (2 * SHORT_BYTES))); - info.add(new TokenInfo(buffer, position, leafSize, keyFetcher)); + super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES))); + info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor)); } public void merge(CombinedValue<Long> other) @@ -377,9 +368,9 @@ public class TokenTree } } - public Iterator<DecoratedKey> iterator() + public Iterator<RowKey> iterator() { - List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size()); + List<Iterator<RowKey>> keys = new ArrayList<>(info.size()); for (TokenInfo i : info) keys.add(i.iterator()); @@ -387,68 +378,72 @@ public class TokenTree if (!loadedKeys.isEmpty()) keys.add(loadedKeys.iterator()); - return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>() + return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer<RowKey, RowKey>() { - DecoratedKey reduced = null; + RowKey reduced = null; public boolean trivialReduceIsTrivial() { return true; } - public void reduce(int idx, DecoratedKey current) + public void reduce(int idx, RowKey current) { reduced = current; } - protected DecoratedKey getReduced() + protected RowKey getReduced() { return reduced; } }); } - public LongSet getOffsets() + public KeyOffsets getOffsets() { - LongSet offsets = new LongOpenHashSet(4); + KeyOffsets offsets = new KeyOffsets(); for (TokenInfo i : info) { - for (long offset : i.fetchOffsets()) - offsets.add(offset); + for (LongObjectCursor<long[]> offset : i.fetchOffsets()) + offsets.put(offset.key, offset.value); } return offsets; } + } - public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher) - { - return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher); - } + private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher) + { + return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher); + } - private static long getEntryPosition(int idx, MappedBuffer file) - { - // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes - return file.position() + (idx * (2 * LONG_BYTES)); - } + private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor) + { + if (descriptor.version.compareTo(Version.ac) < 0) + return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES); + + // skip n entries, to the entry with the given index + return file.position() + (idx * LEAF_ENTRY_BYTES); } private static class TokenInfo { private final MappedBuffer buffer; - private final Function<Long, DecoratedKey> keyFetcher; - + private final KeyFetcher keyFetcher; + private final Descriptor descriptor; private final long position; private final short leafSize; - public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher) + public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor) { this.keyFetcher = keyFetcher; this.buffer = buffer; this.position = position; this.leafSize = leafSize; + this.descriptor = descriptor; } - public Iterator<DecoratedKey> iterator() + public Iterator<RowKey> iterator() { return new KeyIterator(keyFetcher, fetchOffsets()); } @@ -465,59 +460,154 @@ public class TokenTree TokenInfo o = (TokenInfo) other; return keyFetcher == o.keyFetcher && position == o.position; + } - private long[] fetchOffsets() + /** + * Legacy leaf storage format (used for reading data formats before AC): + * + * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData] + * + * Many pairs can be encoded into long+int. + * + * Simple entry: offset fits into (int) + * + * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData] + * + * FactoredOffset: a single offset, offset fits into (long)+(int) bits: + * + * [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset] + * + * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into + * (long) and one of them fits into (int)) + * + * [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int] + * + * Otherwise, the rest gets packed into limited-size overflow collision entry + * + * [(short) leaf type][(short) count][(long) token][(int) start index] + */ + private KeyOffsets fetchOffsetsLegacy() { short info = buffer.getShort(position); // offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset) - int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF; + int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF; // is the it left-most (32-bit) base of the actual offset in the index file - int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES); + int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES); EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK); + KeyOffsets rowOffsets = new KeyOffsets(); switch (type) { case SIMPLE: - return new long[] { offsetData }; - + rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET); + break; case OVERFLOW: - long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens - long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES)); + long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES)); for (int i = 0; i < offsetExtra; i++) - offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES)); + { + long offset = buffer.getLong(offsetPos + (i * Long.BYTES));; + rowOffsets.put(offset, KeyOffsets.NO_OFFSET); + } + break; + case FACTORED: + long offset = (((long) offsetData) << Short.SIZE) + offsetExtra; + rowOffsets.put(offset, KeyOffsets.NO_OFFSET); + break; + case PACKED: + rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET); + rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET); + default: + throw new IllegalStateException("Unknown entry type: " + type); + } + return rowOffsets; + } - return offsets; + private KeyOffsets fetchOffsets() + { + if (descriptor.version.compareTo(Version.ac) < 0) + return fetchOffsetsLegacy(); - case FACTORED: - return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra }; + short info = buffer.getShort(position); + EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK); + KeyOffsets rowOffsets = new KeyOffsets(); + long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES; + switch (type) + { + case SIMPLE: + long partitionOffset = buffer.getLong(baseOffset); + long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES); + + rowOffsets.put(partitionOffset, rowOffset); + break; case PACKED: - return new long[] { offsetExtra, offsetData }; + long partitionOffset1 = buffer.getInt(baseOffset); + long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES); + + long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES); + long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES); + rowOffsets.put(partitionOffset1, rowOffset1); + rowOffsets.put(partitionOffset2, rowOffset2); + break; + case OVERFLOW: + long collisionOffset = buffer.getLong(baseOffset); + long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES); + + // Skip leaves and collision offsets that do not belong to current token + long offsetPos = buffer.position() + leafSize * LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES; + + for (int i = 0; i < count; i++) + { + long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES); + long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES); + + rowOffsets.put(currentPartitionOffset, currentRowOffset); + } + break; default: throw new IllegalStateException("Unknown entry type: " + type); } + + + return rowOffsets; } } - private static class KeyIterator extends AbstractIterator<DecoratedKey> + private static class KeyIterator extends AbstractIterator<RowKey> { - private final Function<Long, DecoratedKey> keyFetcher; - private final long[] offsets; - private int index = 0; + private final KeyFetcher keyFetcher; + private final Iterator<LongObjectCursor<long[]>> offsets; + private long currentPatitionKey; + private PrimitiveIterator.OfLong currentCursor = null; - public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets) + public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets) { this.keyFetcher = keyFetcher; - this.offsets = offsets; + this.offsets = offsets.iterator(); } - public DecoratedKey computeNext() + public RowKey computeNext() { - return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData(); + if (currentCursor != null && currentCursor.hasNext()) + { + return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong()); + } + else if (offsets.hasNext()) + { + LongObjectCursor<long[]> cursor = offsets.next(); + currentPatitionKey = cursor.key; + currentCursor = LongStream.of(cursor.value).iterator(); + + return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong()); + } + else + { + return endOfData(); + } } } -} \ No newline at end of file +}
