This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cep-7-sai
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-7-sai by this push:
new 83419e074d Fix concurrency in bbtree reader by cloning state
83419e074d is described below
commit 83419e074d131a68112470fc8761c3edafb5e675
Author: Mike Adamson <[email protected]>
AuthorDate: Thu Jul 13 11:24:55 2023 +0100
Fix concurrency in bbtree reader by cloning state
patch by Mike Adamson; reviewed by Andrés de la Peña and Caleb Rackliffe
for CASSANDRA-18669
---
.../disk/v1/bbtree/BlockBalancedTreeReader.java | 33 ++--
.../disk/v1/bbtree/BlockBalancedTreeWalker.java | 152 +++++++++-------
.../v1/bbtree/BlockBalancedTreeReaderTest.java | 192 +++++++++++++--------
.../sai/disk/v1/bbtree/BlockBalancedTreeTest.java | 54 +++---
4 files changed, 265 insertions(+), 166 deletions(-)
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReader.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReader.java
index 59271e3c14..53cac195e0 100644
---
a/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReader.java
+++
b/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReader.java
@@ -78,17 +78,17 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
this.indexContext = indexContext;
this.postingsFile = postingsFile;
this.postingsIndex = new BlockBalancedTreePostingsIndex(postingsFile,
treePostingsRoot);
- leafOrderMapBitsRequired =
DirectWriter.unsignedBitsRequired(state.maxPointsInLeafNode - 1);
+ leafOrderMapBitsRequired =
DirectWriter.unsignedBitsRequired(maxValuesInLeafNode - 1);
}
public int getBytesPerValue()
{
- return state.bytesPerValue;
+ return bytesPerValue;
}
public long getPointCount()
{
- return state.valueCount;
+ return valueCount;
}
@Override
@@ -101,7 +101,7 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
@SuppressWarnings({"resource", "RedundantSuppression"})
public PostingList intersect(IntersectVisitor visitor,
QueryEventListener.BalancedTreeEventListener listener, QueryContext context)
{
- Relation relation = visitor.compare(state.minPackedValue,
state.maxPackedValue);
+ Relation relation = visitor.compare(minPackedValue, maxPackedValue);
if (relation == Relation.CELL_OUTSIDE_QUERY)
{
@@ -113,7 +113,6 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
IndexInput treeInput =
IndexFileUtils.instance.openInput(treeIndexFile);
IndexInput postingsInput =
IndexFileUtils.instance.openInput(postingsFile);
IndexInput postingsSummaryInput =
IndexFileUtils.instance.openInput(postingsFile);
- state.reset();
Intersection intersection = relation == Relation.CELL_INSIDE_QUERY
? new Intersection(treeInput,
postingsInput, postingsSummaryInput, listener, context)
@@ -131,6 +130,7 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
private final Stopwatch queryExecutionTimer =
Stopwatch.createStarted();
final QueryContext context;
+ final TraversalState state;
final IndexInput treeInput;
final IndexInput postingsInput;
final IndexInput postingsSummaryInput;
@@ -140,12 +140,13 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
Intersection(IndexInput treeInput, IndexInput postingsInput,
IndexInput postingsSummaryInput,
QueryEventListener.BalancedTreeEventListener listener,
QueryContext context)
{
+ this.state = newTraversalState();
this.treeInput = treeInput;
this.postingsInput = postingsInput;
this.postingsSummaryInput = postingsSummaryInput;
this.listener = listener;
this.context = context;
- postingLists = new PriorityQueue<>(state.numLeaves, COMPARATOR);
+ postingLists = new PriorityQueue<>(numLeaves, COMPARATOR);
}
public PostingList execute()
@@ -247,14 +248,14 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
{
super(treeInput, postingsInput, postingsSummaryInput, listener,
context);
this.visitor = visitor;
- this.packedValue = new byte[state.bytesPerValue];
- this.origIndex = new short[state.maxPointsInLeafNode];
+ this.packedValue = new byte[bytesPerValue];
+ this.origIndex = new short[maxValuesInLeafNode];
}
@Override
public void executeInternal() throws IOException
{
- collectPostingLists(state.minPackedValue, state.maxPackedValue);
+ collectPostingLists(minPackedValue, maxPackedValue);
}
private void collectPostingLists(byte[] minPackedValue, byte[]
maxPackedValue) throws IOException
@@ -320,8 +321,8 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
if (BlockBalancedTreeWriter.DEBUG)
{
// make sure cellMin <= splitValue <= cellMax:
- assert ByteArrayUtil.compareUnsigned(minPackedValue, 0,
splitValue, 0, state.bytesPerValue) <= 0 :"bytesPerValue=" +
state.bytesPerValue;
- assert ByteArrayUtil.compareUnsigned(maxPackedValue, 0,
splitValue, 0, state.bytesPerValue) >= 0 : "bytesPerValue=" +
state.bytesPerValue;
+ assert ByteArrayUtil.compareUnsigned(minPackedValue, 0,
splitValue, 0, bytesPerValue) <= 0 :"bytesPerValue=" + bytesPerValue;
+ assert ByteArrayUtil.compareUnsigned(maxPackedValue, 0,
splitValue, 0, bytesPerValue) >= 0 : "bytesPerValue=" + bytesPerValue;
}
// Recurse on left subtree:
@@ -346,8 +347,8 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
private FixedBitSet buildPostingsFilter(IndexInput in, int count,
IntersectVisitor visitor, short[] origIndex) throws IOException
{
int commonPrefixLength = readCommonPrefixLength(in);
- return commonPrefixLength == state.bytesPerValue ?
buildPostingsFilterForSingleValueLeaf(count, visitor, origIndex)
- :
buildPostingsFilterForMultiValueLeaf(commonPrefixLength, in, count, visitor,
origIndex);
+ return commonPrefixLength == bytesPerValue ?
buildPostingsFilterForSingleValueLeaf(count, visitor, origIndex)
+ :
buildPostingsFilterForMultiValueLeaf(commonPrefixLength, in, count, visitor,
origIndex);
}
private FixedBitSet buildPostingsFilterForMultiValueLeaf(int
commonPrefixLength,
@@ -362,7 +363,7 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
commonPrefixLength++;
int i;
- FixedBitSet fixedBitSet = new
FixedBitSet(state.maxPointsInLeafNode);
+ FixedBitSet fixedBitSet = new FixedBitSet(maxValuesInLeafNode);
for (i = 0; i < count; )
{
@@ -370,7 +371,7 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
final int runLen = Byte.toUnsignedInt(in.readByte());
for (int j = 0; j < runLen; ++j)
{
- in.readBytes(packedValue, commonPrefixLength,
state.bytesPerValue - commonPrefixLength);
+ in.readBytes(packedValue, commonPrefixLength,
bytesPerValue - commonPrefixLength);
final int rowIDIndex = origIndex[i + j];
if (visitor.contains(packedValue))
fixedBitSet.set(rowIDIndex);
@@ -385,7 +386,7 @@ public class BlockBalancedTreeReader extends
BlockBalancedTreeWalker implements
private FixedBitSet buildPostingsFilterForSingleValueLeaf(int count,
IntersectVisitor visitor, final short[] origIndex)
{
- FixedBitSet fixedBitSet = new
FixedBitSet(state.maxPointsInLeafNode);
+ FixedBitSet fixedBitSet = new FixedBitSet(maxValuesInLeafNode);
// All the values in the leaf are the same, so we only
// need to visit once then set the bits for the relevant indexes
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeWalker.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeWalker.java
index ebfcd4ce5f..5a01b81f09 100644
---
a/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeWalker.java
+++
b/src/java/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeWalker.java
@@ -21,6 +21,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+
import com.google.common.annotations.VisibleForTesting;
import org.agrona.collections.IntArrayList;
@@ -46,7 +48,15 @@ import org.apache.lucene.util.BytesRef;
public class BlockBalancedTreeWalker implements Closeable
{
final FileHandle treeIndexFile;
- final TraversalState state;
+ final int bytesPerValue;
+ final int numLeaves;
+ final int treeDepth;
+ final byte[] minPackedValue;
+ final byte[] maxPackedValue;
+ final long valueCount;
+ final int maxValuesInLeafNode;
+ final byte[] packedIndex;
+ final long memoryUsage;
BlockBalancedTreeWalker(FileHandle treeIndexFile, long treeIndexRoot)
{
@@ -58,7 +68,35 @@ public class BlockBalancedTreeWalker implements Closeable
SAICodecUtils.validate(indexInput);
indexInput.seek(treeIndexRoot);
- state = new TraversalState(indexInput);
+ maxValuesInLeafNode = indexInput.readVInt();
+ bytesPerValue = indexInput.readVInt();
+
+ // Read index:
+ numLeaves = indexInput.readVInt();
+ assert numLeaves > 0;
+ treeDepth = indexInput.readVInt();
+ minPackedValue = new byte[bytesPerValue];
+ maxPackedValue = new byte[bytesPerValue];
+
+ indexInput.readBytes(minPackedValue, 0, bytesPerValue);
+ indexInput.readBytes(maxPackedValue, 0, bytesPerValue);
+
+ if (ByteArrayUtil.compareUnsigned(minPackedValue, 0,
maxPackedValue, 0, bytesPerValue) > 0)
+ {
+ String message = String.format("Min packed value %s is > max
packed value %s.",
+ new BytesRef(minPackedValue),
new BytesRef(maxPackedValue));
+ throw new CorruptIndexException(message, indexInput);
+ }
+
+ valueCount = indexInput.readVLong();
+
+ int numBytes = indexInput.readVInt();
+ packedIndex = new byte[numBytes];
+ indexInput.readBytes(packedIndex, 0, numBytes);
+
+ memoryUsage = ObjectSizes.sizeOfArray(packedIndex) +
+ ObjectSizes.sizeOfArray(minPackedValue) +
+ ObjectSizes.sizeOfArray(maxPackedValue);
}
catch (Throwable t)
{
@@ -67,9 +105,52 @@ public class BlockBalancedTreeWalker implements Closeable
}
}
+ @VisibleForTesting
+ public BlockBalancedTreeWalker(DataInput indexInput, long treeIndexRoot)
throws IOException
+ {
+ treeIndexFile = null;
+
+ indexInput.skipBytes(treeIndexRoot);
+
+ maxValuesInLeafNode = indexInput.readVInt();
+ bytesPerValue = indexInput.readVInt();
+
+ // Read index:
+ numLeaves = indexInput.readVInt();
+ assert numLeaves > 0;
+ treeDepth = indexInput.readVInt();
+ minPackedValue = new byte[bytesPerValue];
+ maxPackedValue = new byte[bytesPerValue];
+
+ indexInput.readBytes(minPackedValue, 0, bytesPerValue);
+ indexInput.readBytes(maxPackedValue, 0, bytesPerValue);
+
+ if (ByteArrayUtil.compareUnsigned(minPackedValue, 0, maxPackedValue,
0, bytesPerValue) > 0)
+ {
+ String message = String.format("Min packed value %s is > max
packed value %s.",
+ new BytesRef(minPackedValue), new
BytesRef(maxPackedValue));
+ throw new CorruptIndexException(message, indexInput);
+ }
+
+ valueCount = indexInput.readVLong();
+
+ int numBytes = indexInput.readVInt();
+ packedIndex = new byte[numBytes];
+ indexInput.readBytes(packedIndex, 0, numBytes);
+
+ memoryUsage = ObjectSizes.sizeOfArray(packedIndex) +
+ ObjectSizes.sizeOfArray(minPackedValue) +
+ ObjectSizes.sizeOfArray(maxPackedValue);
+ }
+
public long memoryUsage()
{
- return state.memoryUsage;
+ return memoryUsage;
+ }
+
+ public TraversalState newTraversalState()
+ {
+ return new TraversalState();
}
@Override
@@ -80,11 +161,10 @@ public class BlockBalancedTreeWalker implements Closeable
void traverse(TraversalCallback callback)
{
- state.reset();
- traverse(callback, new IntArrayList());
+ traverse(newTraversalState(), callback, new IntArrayList());
}
- private void traverse(TraversalCallback callback, IntArrayList pathToRoot)
+ private void traverse(TraversalState state, TraversalCallback callback,
IntArrayList pathToRoot)
{
if (state.atLeafNode())
{
@@ -101,11 +181,11 @@ public class BlockBalancedTreeWalker implements Closeable
currentPath.add(state.nodeID);
state.pushLeft();
- traverse(callback, currentPath);
+ traverse(state, callback, currentPath);
state.pop();
state.pushRight();
- traverse(callback, currentPath);
+ traverse(state, callback, currentPath);
state.pop();
}
}
@@ -143,17 +223,9 @@ public class BlockBalancedTreeWalker implements Closeable
* 2[0-16] 3[16-32]
* </pre>
*/
- final static class TraversalState
+ @NotThreadSafe
+ final class TraversalState
{
- final int bytesPerValue;
- final int numLeaves;
- final int treeDepth;
- final byte[] minPackedValue;
- final byte[] maxPackedValue;
- final long valueCount;
- final int maxPointsInLeafNode;
- final long memoryUsage;
-
// used to read the packed index byte[]
final ByteArrayDataInput dataInput;
// holds the minimum (left most) leaf block file pointer for each
level we've recursed to:
@@ -170,60 +242,18 @@ public class BlockBalancedTreeWalker implements Closeable
@VisibleForTesting
int maxLevel;
- TraversalState(DataInput dataInput) throws IOException
+ private TraversalState()
{
- maxPointsInLeafNode = dataInput.readVInt();
- bytesPerValue = dataInput.readVInt();
-
- // Read index:
- numLeaves = dataInput.readVInt();
- assert numLeaves > 0;
- treeDepth = dataInput.readVInt();
- minPackedValue = new byte[bytesPerValue];
- maxPackedValue = new byte[bytesPerValue];
-
- dataInput.readBytes(minPackedValue, 0, bytesPerValue);
- dataInput.readBytes(maxPackedValue, 0, bytesPerValue);
-
- if (ByteArrayUtil.compareUnsigned(minPackedValue, 0,
maxPackedValue, 0, bytesPerValue) > 0)
- {
- String message = String.format("Min packed value %s is > max
packed value %s.",
- new BytesRef(minPackedValue),
new BytesRef(maxPackedValue));
- throw new CorruptIndexException(message, dataInput);
- }
-
- valueCount = dataInput.readVLong();
-
- int numBytes = dataInput.readVInt();
- byte[] packedIndex = new byte[numBytes];
- dataInput.readBytes(packedIndex, 0, numBytes);
-
nodeID = 1;
level = 0;
leafBlockFPStack = new long[treeDepth];
leftNodePositions = new int[treeDepth];
rightNodePositions = new int[treeDepth];
splitValuesStack = new byte[treeDepth][];
-
- memoryUsage = ObjectSizes.sizeOfArray(packedIndex) +
- ObjectSizes.sizeOfArray(minPackedValue) +
- ObjectSizes.sizeOfArray(maxPackedValue) +
- ObjectSizes.sizeOfArray(leafBlockFPStack) +
- ObjectSizes.sizeOfArray(leftNodePositions) +
- ObjectSizes.sizeOfArray(rightNodePositions) +
- ObjectSizes.sizeOfArray(splitValuesStack) *
bytesPerValue;
-
this.dataInput = new ByteArrayDataInput(packedIndex);
readNodeData(false);
}
- public void reset()
- {
- nodeID = 1;
- level = 0;
- dataInput.setPosition(0);
- }
-
public void pushLeft()
{
int nodePosition = leftNodePositions[level];
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReaderTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReaderTest.java
index 984a064c54..59e54c5708 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReaderTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeReaderTest.java
@@ -17,6 +17,13 @@
*/
package org.apache.cassandra.index.sai.disk.v1.bbtree;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import org.junit.Before;
import org.junit.Test;
@@ -33,6 +40,8 @@ import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.postings.PostingList;
import org.apache.cassandra.index.sai.utils.SAIRandomizedTester;
import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
import org.apache.lucene.index.PointValues.Relation;
import org.apache.lucene.util.NumericUtils;
@@ -98,22 +107,16 @@ public class BlockBalancedTreeReaderTest extends
SAIRandomizedTester
final BlockBalancedTreeRamBuffer buffer = new
BlockBalancedTreeRamBuffer(Integer.BYTES);
byte[] scratch = new byte[4];
- for (int docID = 0; docID < numRows; docID++)
+ for (int rowID = 0; rowID < numRows; rowID++)
{
- NumericUtils.intToSortableBytes(docID, scratch, 0);
- buffer.add(docID, scratch);
+ NumericUtils.intToSortableBytes(rowID, scratch, 0);
+ buffer.add(rowID, scratch);
}
- final BlockBalancedTreeReader reader = finishAndOpenReader(4, buffer);
-
- Expression expression = new Expression(indexContext);
- expression.add(Operator.GT, Int32Type.instance.decompose(444));
- expression.add(Operator.LT, Int32Type.instance.decompose(555));
- PostingList intersection = performIntersection(reader,
BlockBalancedTreeQueries.balancedTreeQueryFrom(expression, 4));
- assertNotNull(intersection);
- assertEquals(110, intersection.size());
- for (long posting = 445; posting < 555; posting++)
- assertEquals(posting, intersection.nextPosting());
+ try (BlockBalancedTreeReader reader = finishAndOpenReader(4, buffer))
+ {
+ assertRange(reader, 445, 555);
+ }
}
@Test
@@ -123,32 +126,33 @@ public class BlockBalancedTreeReaderTest extends
SAIRandomizedTester
final BlockBalancedTreeRamBuffer buffer = new
BlockBalancedTreeRamBuffer(Integer.BYTES);
byte[] scratch = new byte[4];
- for (int docID = 0; docID < numRows; docID++)
+ for (int rowID = 0; rowID < numRows; rowID++)
{
- NumericUtils.intToSortableBytes(docID, scratch, 0);
- buffer.add(docID, scratch);
+ NumericUtils.intToSortableBytes(rowID, scratch, 0);
+ buffer.add(rowID, scratch);
}
- final BlockBalancedTreeReader reader = finishAndOpenReader(2, buffer);
-
- PostingList intersection = performIntersection(reader, NONE_MATCH);
- assertNull(intersection);
-
- intersection = performIntersection(reader, ALL_MATCH);
- assertEquals(numRows, intersection.size());
- assertEquals(100, intersection.advance(100));
- assertEquals(200, intersection.advance(200));
- assertEquals(300, intersection.advance(300));
- assertEquals(400, intersection.advance(400));
- assertEquals(401, intersection.advance(401));
- long expectedRowID = 402;
- for (long id = intersection.nextPosting(); expectedRowID < 500; id =
intersection.nextPosting())
+ try (BlockBalancedTreeReader reader = finishAndOpenReader(2, buffer))
{
- assertEquals(expectedRowID++, id);
- }
- assertEquals(PostingList.END_OF_STREAM, intersection.advance(numRows +
1));
+ PostingList intersection = performIntersection(reader, NONE_MATCH);
+ assertNull(intersection);
+
+ intersection = performIntersection(reader, ALL_MATCH);
+ assertEquals(numRows, intersection.size());
+ assertEquals(100, intersection.advance(100));
+ assertEquals(200, intersection.advance(200));
+ assertEquals(300, intersection.advance(300));
+ assertEquals(400, intersection.advance(400));
+ assertEquals(401, intersection.advance(401));
+ long expectedRowID = 402;
+ for (long id = intersection.nextPosting(); expectedRowID < 500; id
= intersection.nextPosting())
+ {
+ assertEquals(expectedRowID++, id);
+ }
+ assertEquals(PostingList.END_OF_STREAM,
intersection.advance(numRows + 1));
- intersection.close();
+ intersection.close();
+ }
}
@Test
@@ -162,41 +166,42 @@ public class BlockBalancedTreeReaderTest extends
SAIRandomizedTester
final BlockBalancedTreeRamBuffer buffer = new
BlockBalancedTreeRamBuffer(Integer.BYTES);
byte[] scratch = new byte[4];
- for (int docID = 0; docID < 10; docID++)
+ for (int rowID = 0; rowID < 10; rowID++)
{
- NumericUtils.intToSortableBytes(docID, scratch, 0);
- buffer.add(docID, scratch);
+ NumericUtils.intToSortableBytes(rowID, scratch, 0);
+ buffer.add(rowID, scratch);
}
- for (int docID = 10; docID < 20; docID++)
+ for (int rowID = 10; rowID < 20; rowID++)
{
NumericUtils.intToSortableBytes(10, scratch, 0);
- buffer.add(docID, scratch);
+ buffer.add(rowID, scratch);
}
- for (int docID = 20; docID < 30; docID++)
+ for (int rowID = 20; rowID < 30; rowID++)
{
- NumericUtils.intToSortableBytes(docID, scratch, 0);
- buffer.add(docID, scratch);
+ NumericUtils.intToSortableBytes(rowID, scratch, 0);
+ buffer.add(rowID, scratch);
}
- final BlockBalancedTreeReader reader = finishAndOpenReader(5, buffer);
-
- PostingList postingList = performIntersection(reader, buildQuery(8,
15));
-
- assertEquals(8, postingList.nextPosting());
- assertEquals(9, postingList.nextPosting());
- assertEquals(10, postingList.nextPosting());
- assertEquals(11, postingList.nextPosting());
- assertEquals(12, postingList.nextPosting());
- assertEquals(13, postingList.nextPosting());
- assertEquals(14, postingList.nextPosting());
- assertEquals(15, postingList.nextPosting());
- assertEquals(16, postingList.nextPosting());
- assertEquals(17, postingList.nextPosting());
- assertEquals(18, postingList.nextPosting());
- assertEquals(19, postingList.nextPosting());
- assertEquals(PostingList.END_OF_STREAM, postingList.nextPosting());
+ try (BlockBalancedTreeReader reader = finishAndOpenReader(5, buffer))
+ {
+ PostingList postingList = performIntersection(reader,
buildQuery(8, 15));
+
+ assertEquals(8, postingList.nextPosting());
+ assertEquals(9, postingList.nextPosting());
+ assertEquals(10, postingList.nextPosting());
+ assertEquals(11, postingList.nextPosting());
+ assertEquals(12, postingList.nextPosting());
+ assertEquals(13, postingList.nextPosting());
+ assertEquals(14, postingList.nextPosting());
+ assertEquals(15, postingList.nextPosting());
+ assertEquals(16, postingList.nextPosting());
+ assertEquals(17, postingList.nextPosting());
+ assertEquals(18, postingList.nextPosting());
+ assertEquals(19, postingList.nextPosting());
+ assertEquals(PostingList.END_OF_STREAM, postingList.nextPosting());
+ }
}
@Test
@@ -204,22 +209,73 @@ public class BlockBalancedTreeReaderTest extends
SAIRandomizedTester
{
final BlockBalancedTreeRamBuffer buffer = new
BlockBalancedTreeRamBuffer(Integer.BYTES);
byte[] scratch = new byte[4];
- for (int docID = 0; docID < 1000; docID++)
+ for (int rowID = 0; rowID < 1000; rowID++)
{
- NumericUtils.intToSortableBytes(docID, scratch, 0);
- buffer.add(docID, scratch);
+ NumericUtils.intToSortableBytes(rowID, scratch, 0);
+ buffer.add(rowID, scratch);
}
// add a gap between 1000 and 1100
- for (int docID = 1000; docID < 2000; docID++)
+ for (int rowID = 1000; rowID < 2000; rowID++)
+ {
+ NumericUtils.intToSortableBytes(rowID + 100, scratch, 0);
+ buffer.add(rowID, scratch);
+ }
+
+ try (BlockBalancedTreeReader reader = finishAndOpenReader(50, buffer))
+ {
+ final PostingList intersection = performIntersection(reader,
buildQuery(1017, 1096));
+ assertNull(intersection);
+ }
+ }
+
+ @Test
+ public void testConcurrentIntersectionsOnSameReader() throws Exception
+ {
+ int numRows = 1000;
+
+ final BlockBalancedTreeRamBuffer buffer = new
BlockBalancedTreeRamBuffer(Integer.BYTES);
+
+ byte[] scratch = new byte[4];
+ for (int rowID = 0; rowID < numRows; rowID++)
{
- NumericUtils.intToSortableBytes(docID + 100, scratch, 0);
- buffer.add(docID, scratch);
+ NumericUtils.intToSortableBytes(rowID, scratch, 0);
+ buffer.add(rowID, scratch);
+ }
+
+ try (BlockBalancedTreeReader reader = finishAndOpenReader(4, buffer))
+ {
+ int concurrency = 100;
+
+ ExecutorService executor =
Executors.newFixedThreadPool(concurrency);
+ List<Future<?>> results = new ArrayList<>();
+ for (int thread = 0; thread < concurrency; thread++)
+ {
+ results.add(executor.submit(() -> assertRange(reader, 445,
555)));
+ }
+ FBUtilities.waitOnFutures(results);
+ executor.shutdown();
}
+ }
- final BlockBalancedTreeReader reader = finishAndOpenReader(50, buffer);
+ @SuppressWarnings("SameParameterValue")
+ private void assertRange(BlockBalancedTreeReader reader, long lowerBound,
long upperBound)
+ {
+ Expression expression = new Expression(indexContext);
+ expression.add(Operator.GT, Int32Type.instance.decompose(444));
+ expression.add(Operator.LT, Int32Type.instance.decompose(555));
- final PostingList intersection = performIntersection(reader,
buildQuery(1017, 1096));
- assertNull(intersection);
+ try
+ {
+ PostingList intersection = performIntersection(reader,
BlockBalancedTreeQueries.balancedTreeQueryFrom(expression, 4));
+ assertNotNull(intersection);
+ assertEquals(upperBound - lowerBound, intersection.size());
+ for (long posting = lowerBound; posting < upperBound; posting++)
+ assertEquals(posting, intersection.nextPosting());
+ }
+ catch (IOException e)
+ {
+ throw Throwables.unchecked(e);
+ }
}
private PostingList performIntersection(BlockBalancedTreeReader reader,
BlockBalancedTreeReader.IntersectVisitor visitor)
diff --git
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeTest.java
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeTest.java
index 710da74b91..d749b13e77 100644
---
a/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeTest.java
+++
b/test/unit/org/apache/cassandra/index/sai/disk/v1/bbtree/BlockBalancedTreeTest.java
@@ -53,26 +53,33 @@ public class BlockBalancedTreeTest extends
SAIRandomizedTester
@Test
public void testSingleLeaf() throws Exception
{
- BlockBalancedTreeWalker.TraversalState state =
generateBalancedTree(100, 100, rowID -> rowID);
+ try (BlockBalancedTreeWalker walker = generateBalancedTree(100, 100,
rowID -> rowID))
+ {
+ assertEquals(1, walker.numLeaves);
+ assertEquals(1, walker.treeDepth);
+ assertEquals(100, walker.valueCount);
+
+ BlockBalancedTreeWalker.TraversalState state =
walker.newTraversalState();
- assertEquals(1, state.numLeaves);
- assertEquals(1, state.treeDepth);
- assertEquals(100, state.valueCount);
- assertTrue(state.atLeafNode());
+ assertTrue(state.atLeafNode());
- recursiveAssertTraversal(state, -1);
+ recursiveAssertTraversal(state, -1);
- assertEquals(state.treeDepth, state.maxLevel + 1);
+ assertEquals(walker.treeDepth, state.maxLevel + 1);
+ }
}
@Test
public void testTreeWithSameValue() throws Exception
{
- BlockBalancedTreeWalker.TraversalState state =
generateBalancedTree(100, 4, rowID -> 1);
+ try (BlockBalancedTreeWalker walker = generateBalancedTree(100, 4,
rowID -> 1))
+ {
+ BlockBalancedTreeWalker.TraversalState state =
walker.newTraversalState();
- recursiveAssertTraversal(state, -1);
+ recursiveAssertTraversal(state, -1);
- assertEquals(state.treeDepth, state.maxLevel + 1);
+ assertEquals(walker.treeDepth, state.maxLevel + 1);
+ }
}
@Test
@@ -83,14 +90,17 @@ public class BlockBalancedTreeTest extends
SAIRandomizedTester
{
int numRows = leafSize * numLeaves;
- BlockBalancedTreeWalker.TraversalState state =
generateBalancedTree(numRows, leafSize, rowID -> rowID);
+ try (BlockBalancedTreeWalker walker =
generateBalancedTree(numRows, leafSize, rowID -> rowID))
+ {
+ assertEquals(numLeaves, walker.numLeaves);
+ assertTrue(walker.treeDepth <= walker.numLeaves);
- assertEquals(numLeaves, state.numLeaves);
- assertTrue(state.treeDepth <= state.numLeaves);
+ BlockBalancedTreeWalker.TraversalState state =
walker.newTraversalState();
- recursiveAssertTraversal(state, -1);
+ recursiveAssertTraversal(state, -1);
- assertEquals(state.treeDepth, state.maxLevel + 1);
+ assertEquals(walker.treeDepth, state.maxLevel + 1);
+ }
}
}
@@ -104,11 +114,14 @@ public class BlockBalancedTreeTest extends
SAIRandomizedTester
int leafSize = nextInt(2, 512);
int numRows = nextInt(1000, 10000);
- BlockBalancedTreeWalker.TraversalState state =
generateBalancedTree(numRows, leafSize, rowID -> nextInt(0, numRows / 2));
+ try (BlockBalancedTreeWalker walker =
generateBalancedTree(numRows, leafSize, rowID -> nextInt(0, numRows / 2)))
+ {
+ BlockBalancedTreeWalker.TraversalState state =
walker.newTraversalState();
- recursiveAssertTraversal(state, -1);
+ recursiveAssertTraversal(state, -1);
- assertEquals(state.treeDepth, state.maxLevel + 1);
+ assertEquals(walker.treeDepth, state.maxLevel + 1);
+ }
}
}
@@ -134,14 +147,13 @@ public class BlockBalancedTreeTest extends
SAIRandomizedTester
}
}
- private BlockBalancedTreeWalker.TraversalState generateBalancedTree(int
numRows, int leafSize, IntFunction<Integer> valueProvider) throws Exception
+ private BlockBalancedTreeWalker generateBalancedTree(int numRows, int
leafSize, IntFunction<Integer> valueProvider) throws Exception
{
long treeOffset = writeBalancedTree(numRows, leafSize, valueProvider);
DataInput input = dataOutput.toDataInput();
- input.skipBytes(treeOffset);
- return new BlockBalancedTreeWalker.TraversalState(input);
+ return new BlockBalancedTreeWalker(input, treeOffset);
}
private long writeBalancedTree(int numRows, int leafSize,
IntFunction<Integer> valueProvider) throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]