This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1a81a07e0d IGNITE-18322 Define scan contract SortedIndexStorage (#1407)
1a81a07e0d is described below
commit 1a81a07e0da50a3d0ba7e1d8b3d911e106464f99
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Dec 12 12:28:42 2022 +0300
IGNITE-18322 Define scan contract SortedIndexStorage (#1407)
---
.../tree/AbstractBplusTreePageMemoryTest.java | 24 +
.../ignite/internal/pagememory/tree/BplusTree.java | 75 +++
.../apache/ignite/internal/rocksdb/RocksUtils.java | 3 +
.../index/AbstractSortedIndexStorageTest.java | 713 ++++++++++++++++++++-
.../index/impl/BinaryTupleRowSerializer.java | 2 +-
.../storage/index/impl/TestSortedIndexStorage.java | 198 ++++--
.../index/sorted/PageMemorySortedIndexStorage.java | 100 ++-
.../pagememory/index/sorted/SortedIndexTree.java | 7 +
.../rocksdb/index/RocksDbSortedIndexStorage.java | 79 ++-
9 files changed, 1106 insertions(+), 95 deletions(-)
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
index b9d7c03739..0983eda760 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
@@ -2333,6 +2333,30 @@ public abstract class AbstractBplusTreePageMemoryTest
extends BaseIgniteAbstract
}
}
+ @Test
+ void testFindNext() throws Exception {
+ TestTree tree = createTestTree(true);
+
+ assertNull(tree.findNext(0L, false));
+ assertNull(tree.findNext(0L, true));
+
+ tree.put(0L);
+
+ assertNull(tree.findNext(0L, false));
+ assertEquals(0L, tree.findNext(0L, true));
+
+ tree.put(1L);
+
+ assertEquals(1L, tree.findNext(0L, false));
+ assertEquals(0L, tree.findNext(0L, true));
+
+ assertNull(tree.findNext(1L, false));
+ assertEquals(1L, tree.findNext(1L, true));
+
+ assertEquals(0L, tree.findNext(-1L, false));
+ assertEquals(0L, tree.findNext(-1L, true));
+ }
+
private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws
Exception {
final TestTree tree = createTestTree(canGetRow);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index 15214e04db..c601025f84 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1572,6 +1572,34 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return findOne(row, null, null);
}
+ /**
+ * Searches for the row that (strictly or loosely, depending on {@code
includeRow}) follows the lowerBound passed as an argument.
+ *
+ * @param lowerBound Lower bound.
+ * @param includeRow {@code True} if you include the passed row in the
result.
+ * @return Next row.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public final @Nullable T findNext(L lowerBound, boolean includeRow) throws
IgniteInternalCheckedException {
+ checkDestroyed();
+
+ GetNext g = new GetNext(lowerBound, includeRow);
+
+ try {
+ doFind(g);
+
+ return g.nextRow;
+ } catch (CorruptedDataStructureException e) {
+ throw e;
+ } catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalCheckedException("Runtime failure on
lookup next row: " + lowerBound, e);
+ } catch (RuntimeException | AssertionError e) {
+ throw corruptedTreeException("Runtime failure on lookup next row:
" + lowerBound, e, grpId, g.pageId);
+ } finally {
+ checkDestroyed();
+ }
+ }
+
/**
* Tries to find.
*
@@ -6222,6 +6250,53 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
}
}
+ /**
+ * Class for getting the next row.
+ */
+ private final class GetNext extends Get {
+ @Nullable
+ private T nextRow;
+
+ private GetNext(L row, boolean includeRow) {
+ super(row, false);
+
+ shift = includeRow ? -1 : 1;
+ }
+
+ @Override
+ boolean found(BplusIo<L> io, long pageAddr, int idx, int lvl) {
+ // Must never be called because we always have a shift.
+ throw new IllegalStateException();
+ }
+
+ @Override
+ boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl)
throws IgniteInternalCheckedException {
+ if (lvl != 0) {
+ return false;
+ }
+
+ int cnt = io.getCount(pageAddr);
+
+ if (cnt == 0) {
+ // Empty tree.
+ assert io.getForward(pageAddr, partId) == 0L;
+ } else {
+ assert io.isLeaf() : io;
+ assert cnt > 0 : cnt;
+ assert idx >= 0 : idx;
+ assert cnt >= idx : "cnt=" + cnt + ", idx=" + idx;
+
+ checkDestroyed();
+
+ if (idx < cnt) {
+ nextRow = getRow(io, pageAddr, idx);
+ }
+ }
+
+ return true;
+ }
+ }
+
/**
* Page handler for basic {@link Get} operation.
*/
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index 28a0a68b47..d6fa91a3f3 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -69,6 +69,9 @@ public class RocksUtils {
/**
* Checks the status of the iterator and throws an exception if it is not
correct.
*
+ * <p>Check the status first. This operation is guaranteed to throw if an
internal error has occurred during the iteration. Otherwise,
+ * we've exhausted the data range.
+ *
* @param it RocksDB iterator.
* @throws IgniteInternalException if the iterator has an incorrect status.
*/
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 3b1d5ebc01..13c2f3123d 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage.index;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
import static
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.addIndex;
@@ -37,14 +38,20 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -227,14 +234,14 @@ public abstract class AbstractSortedIndexStorageTest {
}
@Test
- void testEmpty() throws Exception {
+ void testEmpty() {
SortedIndexStorage index =
createIndexStorage(shuffledRandomDefinitions());
assertThat(scan(index, null, null, 0), is(empty()));
}
@Test
- void testGet() throws Exception {
+ void testGet() {
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_INDEX")
.addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
.addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -267,7 +274,7 @@ public abstract class AbstractSortedIndexStorageTest {
*/
@ParameterizedTest
@VariableSource("ALL_TYPES_COLUMN_DEFINITIONS")
- void testSingleColumnIndex(ColumnDefinition columnDefinition) throws
Exception {
+ void testSingleColumnIndex(ColumnDefinition columnDefinition) {
testPutGetRemove(List.of(columnDefinition));
}
@@ -275,7 +282,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests that appending an already existing row does no harm.
*/
@Test
- void testPutIdempotence() throws Exception {
+ void testPutIdempotence() {
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_INDEX")
.addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
.addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -302,7 +309,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests that it is possible to add rows with the same columns but
different Row IDs.
*/
@Test
- void testMultiplePuts() throws Exception {
+ void testMultiplePuts() {
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_INDEX")
.addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
.addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -335,7 +342,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests the {@link SortedIndexStorage#remove} method.
*/
@Test
- void testRemove() throws Exception {
+ void testRemove() {
SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_INDEX")
.addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
.addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -389,7 +396,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests the Put-Get-Remove case when an index is created using all
possible column in random order.
*/
@RepeatedTest(5)
- void testCreateMultiColumnIndex() throws Exception {
+ void testCreateMultiColumnIndex() {
testPutGetRemove(shuffledDefinitions());
}
@@ -397,7 +404,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests the happy case of the {@link SortedIndexStorage#scan} method.
*/
@RepeatedTest(5)
- void testScan() throws Exception {
+ void testScan() {
SortedIndexStorage indexStorage =
createIndexStorage(shuffledDefinitions());
List<TestIndexRow> entries = IntStream.range(0, 10)
@@ -434,7 +441,7 @@ public abstract class AbstractSortedIndexStorageTest {
}
@Test
- public void testBoundsAndOrder() throws Exception {
+ public void testBoundsAndOrder() {
ColumnTypeSpec string = ColumnTypeSpec.STRING;
ColumnTypeSpec int32 = ColumnTypeSpec.INT32;
@@ -525,7 +532,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests that an empty range is returned if {@link
SortedIndexStorage#scan} method is called using overlapping keys.
*/
@Test
- void testEmptyRange() throws Exception {
+ void testEmptyRange() {
List<ColumnDefinition> indexSchema = shuffledRandomDefinitions();
SortedIndexStorage indexStorage = createIndexStorage(indexSchema);
@@ -549,7 +556,7 @@ public abstract class AbstractSortedIndexStorageTest {
@ParameterizedTest
@VariableSource("ALL_TYPES_COLUMN_DEFINITIONS")
- void testNullValues(ColumnDefinition columnDefinition) throws Exception {
+ void testNullValues(ColumnDefinition columnDefinition) {
SortedIndexStorage storage =
createIndexStorage(List.of(columnDefinition));
TestIndexRow entry1 = TestIndexRow.randomRow(storage);
@@ -579,6 +586,653 @@ public abstract class AbstractSortedIndexStorageTest {
}
}
+ /**
+ * Checks simple scenarios for a scanning cursor.
+ */
+ @Test
+ void testScanSimple() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ for (int i = 0; i < 5; i++) {
+ put(indexStorage, serializer.serializeRow(new Object[]{i}, new
RowId(TEST_PARTITION)));
+ }
+
+ // Checking without borders.
+ assertThat(
+ scan(indexStorage, null, null, 0,
AbstractSortedIndexStorageTest::firstArrayElement),
+ contains(0, 1, 2, 3, 4)
+ );
+
+ // Let's check with borders.
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(0),
+ serializer.serializeRowPrefix(4),
+ (GREATER_OR_EQUAL | LESS_OR_EQUAL),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(0, 1, 2, 3, 4)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(0),
+ serializer.serializeRowPrefix(4),
+ (GREATER_OR_EQUAL | LESS),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(0, 1, 2, 3)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(0),
+ serializer.serializeRowPrefix(4),
+ (GREATER | LESS_OR_EQUAL),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(1, 2, 3, 4)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(0),
+ serializer.serializeRowPrefix(4),
+ (GREATER | LESS),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(1, 2, 3)
+ );
+
+ // Let's check only with the lower bound.
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(1),
+ null,
+ (GREATER_OR_EQUAL | LESS_OR_EQUAL),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(1, 2, 3, 4)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(1),
+ null,
+ (GREATER_OR_EQUAL | LESS),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(1, 2, 3, 4)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(1),
+ null,
+ (GREATER | LESS_OR_EQUAL),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(2, 3, 4)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ serializer.serializeRowPrefix(1),
+ null,
+ (GREATER | LESS),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(2, 3, 4)
+ );
+
+ // Let's check only with the upper bound.
+ assertThat(
+ scan(
+ indexStorage,
+ null,
+ serializer.serializeRowPrefix(3),
+ (GREATER_OR_EQUAL | LESS_OR_EQUAL),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(0, 1, 2, 3)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ null,
+ serializer.serializeRowPrefix(3),
+ (GREATER_OR_EQUAL | LESS),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(0, 1, 2)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ null,
+ serializer.serializeRowPrefix(3),
+ (GREATER | LESS_OR_EQUAL),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(0, 1, 2, 3)
+ );
+
+ assertThat(
+ scan(
+ indexStorage,
+ null,
+ serializer.serializeRowPrefix(3),
+ (GREATER | LESS),
+ AbstractSortedIndexStorageTest::firstArrayElement
+ ),
+ contains(0, 1, 2)
+ );
+ }
+
+ @Test
+ void testScanContractAddRowBeforeInvokeHasNext() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index = [0]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, new
RowId(TEST_PARTITION)));
+
+ // index = [0]
+ // cursor = ^ with cached [0]
+ assertTrue(scan.hasNext());
+ // index = [0]
+ // cursor = ^ with no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractAddRowAfterInvokeHasNext() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index =
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+
+ // index = [0]
+ // cursor = ^ already finished
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, new
RowId(TEST_PARTITION)));
+
+ // index = [0]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ // index = [0]
+ // cursor = ^ no cached row
+ scan = indexStorage.scan(null, null, 0);
+
+ // index = [0]
+ // cursor = ^ with cached [0]
+ assertTrue(scan.hasNext());
+ // index = [0]
+ // cursor = ^ with no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ // index = [0] [1]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, new
RowId(TEST_PARTITION)));
+
+ // index = [0] [1]
+ // cursor = ^ with cached [1]
+ assertTrue(scan.hasNext());
+ // index = [0] [1]
+ // cursor = ^ with no cached row
+ assertEquals(1, serializer.deserializeColumns(scan.next())[0]);
+
+ // index = [0] [1]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractInvokeOnlyNext() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index =
+ // cursor = ^ with no cached row
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ // index = [0]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, new
RowId(TEST_PARTITION)));
+
+ // index = [0]
+ // cursor = ^ already finished
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ // index = [0]
+ // cursor = ^ no cached row
+ scan = indexStorage.scan(null, null, 0);
+
+ // index = [0]
+ // cursor = ^ no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ assertThrows(NoSuchElementException.class, scan::next);
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractAddRowsOnly() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+ RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+ RowId rowId2 = new RowId(TEST_PARTITION, 1, 0);
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index = [0, r1]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+ // index = [0, r1]
+ // cursor = ^ with no cached row
+ IndexRow nextRow = scan.next();
+
+ assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId1, nextRow.rowId());
+
+ // index = [0, r0] [0, r1] [0, r2]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId2));
+
+ // index = [0, r0] [0, r1] [0, r2]
+ // cursor = ^ with cached [0, r2]
+ assertTrue(scan.hasNext());
+
+ // index = [0, r0] [0, r1] [0, r2]
+ // cursor = ^ with no cached row
+ nextRow = scan.next();
+
+ assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId2, nextRow.rowId());
+
+ // index = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0));
+ put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId0));
+
+ // index = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0]
+ // cursor = ^ with cached [1,
r0]
+ assertTrue(scan.hasNext());
+
+ // index = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0]
+ // cursor = ^ with no cached
row
+ nextRow = scan.next();
+
+ assertEquals(1, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId0, nextRow.rowId());
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractForFinishCursor() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index =
+ // cursor = ^ with no cached row
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ // index = [0]
+ // cursor = ^ already finished
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, new
RowId(TEST_PARTITION, 0, 0)));
+
+ // index = [0]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ scan = indexStorage.scan(null, null, 0);
+
+ // index = [0]
+ // cursor = ^ with cached [0, r0]
+ assertTrue(scan.hasNext());
+ // index = [0]
+ // cursor = ^ with no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ // index = [0]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ // index = [-1] [0]
+ // cursor = ^ already finished
+ put(indexStorage, serializer.serializeRow(new Object[]{-1}, new
RowId(TEST_PARTITION, 0, 0)));
+
+ // index = [-1] [0]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractNextMethodOnly() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+ RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+ RowId rowId2 = new RowId(TEST_PARTITION, 0, 1);
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+ // index = [0, r0]
+ // cursor = ^ with no cached row
+ IndexRow nextRow = scan.next();
+
+ assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId0, nextRow.rowId());
+
+ // index = [0, r0] [0, r1]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+ // index = [0, r0] [0, r1]
+ // cursor = ^ with no cached row
+ nextRow = scan.next();
+
+ assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId1, nextRow.rowId());
+
+ // index = [-1, r2] [0, r0] [0, r1] [1, r2]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId2));
+ put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId2));
+
+ // index = [-1, r2] [0, r0] [0, r1] [1, r2]
+ // cursor = ^ with no cached row
+ nextRow = scan.next();
+
+ assertEquals(1, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId2, nextRow.rowId());
+
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractRemoveRowsOnly() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+ RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0));
+ put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index = [0, r0] [0, r1] [1, r0] [2, r1]
+ // cursor = ^ with cached [0, r0]
+ assertTrue(scan.hasNext());
+
+ // index = [0, r1] [1, r0] [2, r1]
+ // cursor = ^ with cached [0, r0]
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+ // index = [0, r1] [1, r0] [2, r1]
+ // cursor = ^ with no cached row
+ IndexRow nextRow = scan.next();
+
+ assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId0, nextRow.rowId());
+
+ // index = [1, r0] [2, r1]
+ // cursor = ^ with no cached row
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+ // index = [1, r0] [2, r1]
+ // cursor = ^ with cached [1, r0]
+ assertTrue(scan.hasNext());
+
+ // index = [1, r0] [2, r1]
+ // cursor = ^ with no cached row
+ nextRow = scan.next();
+
+ assertEquals(1, serializer.deserializeColumns(nextRow)[0]);
+ assertEquals(rowId0, nextRow.rowId());
+
+ // index = [1, r0]
+ // cursor = ^ with no cached row
+ remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractReplaceRow() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId = new RowId(TEST_PARTITION);
+
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index = [0] [2]
+ // cursor = ^ with no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ // Replace 0 -> 1.
+ // index = [1] [2]
+ // cursor = ^ with no cached row
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+ // index = [1] [2]
+ // cursor = ^ with cached [1]
+ assertTrue(scan.hasNext());
+ // index = [1] [2]
+ // cursor = ^ with no cached row
+ assertEquals(1, serializer.deserializeColumns(scan.next())[0]);
+
+ // index = [1] [2]
+ // cursor = ^ with cached [2]
+ assertTrue(scan.hasNext());
+ // index = [1] [2]
+ // cursor = ^ with no cached row
+ assertEquals(2, serializer.deserializeColumns(scan.next())[0]);
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractRemoveCachedRow() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ RowId rowId = new RowId(TEST_PARTITION);
+
+ // index = [0] [1]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+ // index = [0] [1]
+ // cursor = ^ with cached [0]
+ assertTrue(scan.hasNext());
+
+ // index = [1]
+ // cursor = ^ with cached [0]
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+
+ // index = [1]
+ // cursor = ^ with no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ // index = [1]
+ // cursor = ^ with cached [1]
+ assertTrue(scan.hasNext());
+
+ // index =
+ // cursor = ^ with cached [1]
+ remove(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+ // index =
+ // cursor = ^ with no cached row
+ assertEquals(1, serializer.deserializeColumns(scan.next())[0]);
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+
+ scan = indexStorage.scan(null, null, 0);
+
+ // index = [2]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+ // index = [2]
+ // cursor = ^ with cached [2]
+ assertTrue(scan.hasNext());
+
+ // index =
+ // cursor = ^ with cached [2]
+ remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+ // index =
+ // cursor = ^ with no cached row
+ assertEquals(2, serializer.deserializeColumns(scan.next())[0]);
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
+ @Test
+ void testScanContractRemoveNextAndAddFirstRow() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId = new RowId(TEST_PARTITION);
+
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+ Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index = [0] [2]
+ // cursor = ^ with no cached row
+ assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+ // index = [-1] [2]
+ // cursor = ^ with no cached row
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId));
+
+ // index = [-1] [2]
+ // cursor = ^ with cached [2]
+ assertTrue(scan.hasNext());
+ // index = [-1] [2]
+ // cursor = ^ with no cached row
+ assertEquals(2, serializer.deserializeColumns(scan.next())[0]);
+
+ assertFalse(scan.hasNext());
+ assertThrows(NoSuchElementException.class, scan::next);
+ }
+
private List<ColumnDefinition> shuffledRandomDefinitions() {
return shuffledDefinitions(d -> random.nextBoolean());
}
@@ -611,7 +1265,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Tests the Get-Put-Remove scenario: inserts some keys into the storage
and checks that they have been successfully persisted and can
* be removed.
*/
- private void testPutGetRemove(List<ColumnDefinition> indexSchema) throws
Exception {
+ private void testPutGetRemove(List<ColumnDefinition> indexSchema) {
SortedIndexStorage indexStorage = createIndexStorage(indexSchema);
TestIndexRow entry1 = TestIndexRow.randomRow(indexStorage);
@@ -644,7 +1298,7 @@ public abstract class AbstractSortedIndexStorageTest {
* Extracts a single value by a given key or {@code null} if it does not
exist.
*/
@Nullable
- private static IndexRow getSingle(SortedIndexStorage indexStorage,
BinaryTuple fullPrefix) throws Exception {
+ private static IndexRow getSingle(SortedIndexStorage indexStorage,
BinaryTuple fullPrefix) {
List<RowId> rowIds = get(indexStorage, fullPrefix);
assertThat(rowIds, anyOf(empty(), hasSize(1)));
@@ -663,17 +1317,28 @@ public abstract class AbstractSortedIndexStorageTest {
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
@MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
- ) throws Exception {
+ ) {
+ return scan(index, lowerBound, upperBound, flags, identity());
+ }
+
+ private static <T> List<T> scan(
+ SortedIndexStorage index,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ @MagicConstant(flagsFromClass = SortedIndexStorage.class) int
flags,
+ Function<Object[], T> mapper
+ ) {
var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
try (Cursor<IndexRow> cursor = index.scan(lowerBound, upperBound,
flags)) {
return cursor.stream()
.map(serializer::deserializeColumns)
+ .map(mapper)
.collect(toUnmodifiableList());
}
}
- protected static List<RowId> get(SortedIndexStorage index, BinaryTuple
key) throws Exception {
+ protected static List<RowId> get(SortedIndexStorage index, BinaryTuple
key) {
try (Cursor<RowId> cursor = index.get(key)) {
return cursor.stream().collect(toUnmodifiableList());
}
@@ -694,4 +1359,22 @@ public abstract class AbstractSortedIndexStorageTest {
return null;
});
}
+
+ private static <T> List<T> getRemaining(Cursor<IndexRow> scanCursor,
Function<IndexRow, T> mapper) {
+ List<T> result = new ArrayList<>();
+
+ while (scanCursor.hasNext()) {
+ result.add(mapper.apply(scanCursor.next()));
+ }
+
+ return result;
+ }
+
+ private static <T> Function<IndexRow, T>
firstColumn(BinaryTupleRowSerializer serializer) {
+ return indexRow -> (T) serializer.deserializeColumns(indexRow)[0];
+ }
+
+ private static <T> T firstArrayElement(Object[] objects) {
+ return (T) objects[0];
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
index 9b029d0018..65c6308656 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
@@ -108,7 +108,7 @@ public class BinaryTupleRowSerializer {
/**
* Creates a prefix of an {@link IndexRow} using the provided columns.
*/
- public BinaryTuplePrefix serializeRowPrefix(Object[] prefixColumnValues) {
+ public BinaryTuplePrefix serializeRowPrefix(Object... prefixColumnValues) {
if (prefixColumnValues.length > schema.size()) {
throw new IllegalArgumentException(String.format(
"Incorrect number of column values passed. Expected not
more than %d, got %d",
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 4f7f727cb2..bef8d5846d 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -17,14 +17,14 @@
package org.apache.ignite.internal.storage.index.impl;
-import static org.apache.ignite.internal.util.IgniteUtils.capacity;
+import static java.util.Collections.emptyNavigableMap;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.Set;
-import java.util.SortedMap;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -45,7 +45,13 @@ import org.jetbrains.annotations.Nullable;
* Test implementation of MV sorted index storage.
*/
public class TestSortedIndexStorage implements SortedIndexStorage {
- private final ConcurrentNavigableMap<ByteBuffer, Set<RowId>> index;
+ private static final Object NULL = new Object();
+
+ /**
+ * {@code NavigableMap<RowId, Object>} is used as a {@link NavigableSet},
but map was chosen because methods like
+ * {@link NavigableSet#first()} throw an {@link NoSuchElementException} if
the set is empty.
+ */
+ private final ConcurrentNavigableMap<ByteBuffer, NavigableMap<RowId,
Object>> index;
private final SortedIndexDescriptor descriptor;
@@ -68,11 +74,28 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
public Cursor<RowId> get(BinaryTuple key) throws StorageException {
checkClosed();
- Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(),
Set.of()).stream()
- .peek(rowId -> checkClosed())
- .iterator();
+ Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(),
emptyNavigableMap()).keySet().iterator();
+
+ return new Cursor<>() {
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ checkClosed();
+
+ return iterator.hasNext();
+ }
- return Cursor.fromBareIterator(iterator);
+ @Override
+ public RowId next() {
+ checkClosed();
+
+ return iterator.next();
+ }
+ };
}
@Override
@@ -80,18 +103,11 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
checkClosed();
index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
- if (v == null) {
- return Set.of(row.rowId());
- } else if (v.contains(row.rowId())) {
- return v;
- } else {
- var result = new HashSet<RowId>(capacity(v.size() + 1));
+ NavigableMap<RowId, Object> rowIds = v == null ? new
ConcurrentSkipListMap<>() : v;
- result.addAll(v);
- result.add(row.rowId());
+ rowIds.put(row.rowId(), NULL);
- return result;
- }
+ return rowIds;
});
}
@@ -100,19 +116,9 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
checkClosed();
index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
- if (v.contains(row.rowId())) {
- if (v.size() == 1) {
- return null;
- } else {
- var result = new HashSet<>(v);
+ v.remove(row.rowId());
- result.remove(row.rowId());
-
- return result;
- }
- } else {
- return v;
- }
+ return v.isEmpty() ? null : v;
});
}
@@ -135,48 +141,23 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
setEqualityFlag(upperBound);
}
- SortedMap<ByteBuffer, Set<RowId>> data;
+ NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> navigableMap;
if (lowerBound == null && upperBound == null) {
- data = index;
+ navigableMap = index;
} else if (lowerBound == null) {
- data = index.headMap(upperBound.byteBuffer());
+ navigableMap = index.headMap(upperBound.byteBuffer());
} else if (upperBound == null) {
- data = index.tailMap(lowerBound.byteBuffer());
+ navigableMap = index.tailMap(lowerBound.byteBuffer());
} else {
try {
- data = index.subMap(lowerBound.byteBuffer(),
upperBound.byteBuffer());
+ navigableMap = index.subMap(lowerBound.byteBuffer(),
upperBound.byteBuffer());
} catch (IllegalArgumentException e) {
- data = Collections.emptySortedMap();
+ navigableMap = emptyNavigableMap();
}
}
- Iterator<? extends IndexRow> iterator = data.entrySet().stream()
- .flatMap(e -> {
- var tuple = new
BinaryTuple(descriptor.binaryTupleSchema(), e.getKey());
-
- return e.getValue().stream().map(rowId -> new
IndexRowImpl(tuple, rowId));
- })
- .iterator();
-
- return new Cursor<>() {
- @Override
- public void close() {
- // No-op.
- }
-
- @Override
- public boolean hasNext() {
- checkClosed();
-
- return iterator.hasNext();
- }
-
- @Override
- public IndexRow next() {
- return iterator.next();
- }
- };
+ return new ScanCursor(navigableMap);
}
private static void setEqualityFlag(BinaryTuplePrefix prefix) {
@@ -208,4 +189,93 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
throw new StorageClosedException("Storage is already closed");
}
}
+
+ private class ScanCursor implements Cursor<IndexRow> {
+ private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>>
indexMap;
+
+ @Nullable
+ private Boolean hasNext;
+
+ @Nullable
+ private Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry;
+
+ @Nullable
+ private RowId rowId;
+
+ private ScanCursor(NavigableMap<ByteBuffer, NavigableMap<RowId,
Object>> indexMap) {
+ this.indexMap = indexMap;
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ checkClosed();
+
+ advanceIfNeeded();
+
+ return hasNext;
+ }
+
+ @Override
+ public IndexRow next() {
+ checkClosed();
+
+ advanceIfNeeded();
+
+ boolean hasNext = this.hasNext;
+
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ this.hasNext = null;
+
+ return new IndexRowImpl(new
BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry.getKey()), rowId);
+ }
+
+ private void advanceIfNeeded() {
+ if (hasNext != null) {
+ return;
+ }
+
+ if (indexMapEntry == null) {
+ indexMapEntry = indexMap.firstEntry();
+ }
+
+ if (rowId == null) {
+ if (indexMapEntry != null) {
+ rowId = getRowId(indexMapEntry.getValue().firstEntry());
+ }
+ } else {
+ Entry<RowId, Object> nextRowIdEntry =
indexMapEntry.getValue().higherEntry(rowId);
+
+ if (nextRowIdEntry != null) {
+ rowId = nextRowIdEntry.getKey();
+ } else {
+ Entry<ByteBuffer, NavigableMap<RowId, Object>>
nextIndexMapEntry = indexMap.higherEntry(indexMapEntry.getKey());
+
+ if (nextIndexMapEntry == null) {
+ hasNext = false;
+
+ return;
+ } else {
+ indexMapEntry = nextIndexMapEntry;
+
+ rowId =
getRowId(indexMapEntry.getValue().firstEntry());
+ }
+ }
+ }
+
+ hasNext = rowId != null;
+ }
+
+ @Nullable
+ private RowId getRowId(@Nullable Entry<RowId, ?> rowIdEntry) {
+ return rowIdEntry == null ? null : rowIdEntry.getKey();
+ }
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 4923f005b6..b9d4beffeb 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.storage.pagememory.index.sorted;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
import java.util.function.Function;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -175,9 +176,7 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
SortedIndexRowKey upper = createBound(upperBound, includeUpper);
- return convertCursor(sortedIndexTree.find(lower, upper),
this::toIndexRowImpl);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Failed to create scan cursor", e);
+ return new ScanCursor(lower, upper);
} finally {
closeBusyLock.leaveBusy();
}
@@ -272,4 +271,99 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
}
};
}
+
+ private class ScanCursor implements Cursor<IndexRow> {
+ @Nullable
+ private Boolean hasNext;
+
+ @Nullable
+ private final SortedIndexRowKey lower;
+
+ @Nullable
+ private final SortedIndexRowKey upper;
+
+ @Nullable
+ private SortedIndexRow treeRow;
+
+ private ScanCursor(@Nullable SortedIndexRowKey lower, @Nullable
SortedIndexRowKey upper) {
+ this.lower = lower;
+ this.upper = upper;
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ advanceIfNeeded();
+
+ return hasNext;
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while advancing the cursor",
e);
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public IndexRow next() {
+ if (!closeBusyLock.enterBusy()) {
+ throwStorageClosedException();
+ }
+
+ try {
+ advanceIfNeeded();
+
+ boolean hasNext = this.hasNext;
+
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ this.hasNext = null;
+
+ return toIndexRowImpl(treeRow);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while advancing the cursor",
e);
+ } finally {
+ closeBusyLock.leaveBusy();
+ }
+ }
+
+ private void advanceIfNeeded() throws IgniteInternalCheckedException {
+ if (hasNext != null) {
+ return;
+ }
+
+ if (treeRow == null) {
+ treeRow = lower == null ? sortedIndexTree.findFirst() :
sortedIndexTree.findNext(lower, true);
+ } else {
+ SortedIndexRow next = sortedIndexTree.findNext(treeRow, false);
+
+ if (next == null) {
+ hasNext = false;
+
+ return;
+ } else {
+ treeRow = next;
+ }
+ }
+
+ hasNext = treeRow != null && (upper == null ||
compareRows(treeRow, upper) < 0);
+ }
+
+ private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey
key2) {
+ return sortedIndexTree.getBinaryTupleComparator().compare(
+ key1.indexColumns().valueBuffer(),
+ key2.indexColumns().valueBuffer()
+ );
+ }
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
index 2217081f09..5c21b14843 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
@@ -164,4 +164,11 @@ public class SortedIndexTree extends
BplusTree<SortedIndexRowKey, SortedIndexRow
assert result == Boolean.TRUE : result;
}
+
+ /**
+ * Returns comparator of index columns {@link BinaryTuple}s.
+ */
+ BinaryTupleComparator getBinaryTupleComparator() {
+ return binaryTupleComparator;
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index fb558047e3..b06cc00a74 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.storage.rocksdb.index;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.CursorUtils.map;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.NoSuchElementException;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -167,23 +168,77 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
RocksIterator it = indexCf.newIterator(options);
- if (lowerBound == null) {
- it.seek(partitionStorage.partitionStartPrefix());
- } else {
- it.seek(lowerBound);
- }
+ return new Cursor<>() {
+ @Nullable
+ private Boolean hasNext;
+
+ private byte @Nullable [] key;
- return new RocksIteratorAdapter<>(it) {
@Override
- protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
- return ByteBuffer.wrap(key).order(ORDER);
+ public void close() {
+ try {
+ closeAll(it, options, upperBoundSlice);
+ } catch (Exception e) {
+ throw new StorageException("Error closing cursor", e);
+ }
}
@Override
- public void close() {
- super.close();
+ public boolean hasNext() {
+ advanceIfNeeded();
+
+ return hasNext;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ advanceIfNeeded();
+
+ boolean hasNext = this.hasNext;
+
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ this.hasNext = null;
+
+ return ByteBuffer.wrap(key).order(ORDER);
+ }
+
+ private void advanceIfNeeded() throws StorageException {
+ if (hasNext != null) {
+ return;
+ }
+
+ try {
+ it.refresh();
+ } catch (RocksDBException e) {
+ throw new StorageException("Error refreshing an iterator",
e);
+ }
+
+ if (key == null) {
+ it.seek(lowerBound == null ?
partitionStorage.partitionStartPrefix() : lowerBound);
+ } else {
+ it.seekForPrev(key);
+
+ if (it.isValid()) {
+ it.next();
+ } else {
+ RocksUtils.checkIterator(it);
+
+ it.seek(lowerBound == null ?
partitionStorage.partitionStartPrefix() : lowerBound);
+ }
+ }
+
+ if (!it.isValid()) {
+ RocksUtils.checkIterator(it);
+
+ hasNext = false;
+ } else {
+ key = it.key();
- RocksUtils.closeAll(options, upperBoundSlice);
+ hasNext = true;
+ }
}
};
}