This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 761c899d3e IGNITE-18243 Implement a peek method for the sorted index
cursor (#1424)
761c899d3e is described below
commit 761c899d3eb34df8b0d56e306c96293b72402d44
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Dec 13 15:30:03 2022 +0300
IGNITE-18243 Implement a peek method for the sorted index cursor (#1424)
---
.../ignite/internal/storage/index/PeekCursor.java | 41 +++
.../internal/storage/index/SortedIndexStorage.java | 3 +-
.../index/AbstractSortedIndexStorageTest.java | 303 +++++++++++++++++++++
.../storage/index/impl/TestSortedIndexStorage.java | 61 ++++-
.../index/sorted/PageMemorySortedIndexStorage.java | 34 ++-
.../rocksdb/index/RocksDbSortedIndexStorage.java | 73 +++--
6 files changed, 478 insertions(+), 37 deletions(-)
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PeekCursor.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PeekCursor.java
new file mode 100644
index 0000000000..2ef036f690
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PeekCursor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link Cursor} extension with the ability to {@link #peek() peek} at the
next element.
+ */
+public interface PeekCursor<T> extends Cursor<T> {
+ /**
+ * Returns the next element without advancing the cursor, {@code null} if
there is no next element.
+ *
+ * <p>Usage notes:
+ * <ul>
+ * <li>After the cursor is created, {@code #peek()} will return the
actual (up-to-date) next element;</li>
+ * <li>After calling {@link #hasNext()}, if it returned {@code true},
then {@code peek()} will return the element (cached) that
+ * {@link #next()} would return, but without advancing the cursor;</li>
+ * <li>After calling {@link #hasNext()}, if it returned {@code false},
then {@code peek()} will always return {@code null};</li>
+ * <li>After {@link #next()} is called, but before {@link #hasNext()}
is called, {@code peek()} will always return the actual
+ * (up-to-date) next element without advancing the cursor.</li>
+ * </ul>
+ */
+ @Nullable T peek();
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 2aa1542537..636d8f1ec1 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage.index;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.util.Cursor;
import org.intellij.lang.annotations.MagicConstant;
import org.jetbrains.annotations.Nullable;
@@ -59,7 +58,7 @@ public interface SortedIndexStorage extends IndexStorage {
* @return Cursor with fetched index rows.
* @throws IllegalArgumentException If backwards flag is passed and
backwards iteration is not supported by the storage.
*/
- Cursor<IndexRow> scan(
+ PeekCursor<IndexRow> scan(
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
@MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 13c2f3123d..cfc5760288 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -48,6 +49,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -1233,6 +1235,276 @@ public abstract class AbstractSortedIndexStorageTest {
assertThrows(NoSuchElementException.class, scan::next);
}
+
+ @Test
+ void testScanPeekForFinishedCursor() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ PeekCursor<IndexRow> scan0 = indexStorage.scan(null, null, 0);
+ PeekCursor<IndexRow> scan1 = indexStorage.scan(null, null, 0);
+
+ // index =
+ // cursor0 = ^ already finished
+ assertFalse(scan0.hasNext());
+ assertNull(scan0.peek());
+
+ // index =
+ // cursor1 = ^ already finished
+ assertThrows(NoSuchElementException.class, scan1::next);
+ assertNull(scan1.peek());
+
+ // index = [0]
+ // cursor0 = ^ already finished
+ // cursor1 = ^ already finished
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, new
RowId(TEST_PARTITION)));
+
+ assertNull(scan0.peek());
+ assertNull(scan1.peek());
+
+ // index = [0]
+ // cursor0 = ^ no cached row
+ // cursor1 = ^ no cached row
+ scan0 = indexStorage.scan(null, null, 0);
+ scan1 = indexStorage.scan(null, null, 0);
+
+ assertEquals(0, serializer.deserializeColumns(scan0.peek())[0]);
+ assertEquals(0, serializer.deserializeColumns(scan1.peek())[0]);
+
+ // index = [0]
+ // cursor0 = ^ cached [0]
+ assertTrue(scan0.hasNext());
+ assertEquals(0, serializer.deserializeColumns(scan0.peek())[0]);
+
+ // index = [0]
+ // cursor0 = ^ no cached row
+ assertEquals(0, serializer.deserializeColumns(scan0.next())[0]);
+ assertNull(scan0.peek());
+
+ // index = [0]
+ // cursor0 = ^ already finished
+ assertFalse(scan0.hasNext());
+ assertThrows(NoSuchElementException.class, scan0::next);
+ assertNull(scan0.peek());
+
+ // index = [0]
+ // cursor1 = ^ no cached row
+ assertEquals(0, serializer.deserializeColumns(scan1.next())[0]);
+ assertNull(scan1.peek());
+
+ // index = [0]
+ // cursor1 = ^ already finished
+ assertThrows(NoSuchElementException.class, scan1::next);
+ assertNull(scan1.peek());
+ }
+
+ @Test
+ void testScanPeekAddRowsOnly() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+ RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+
+ PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ // index = [0, r1]
+ // cursor = ^ no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+ assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0, r0] [0, r1]
+ // cursor = ^ no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+ assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0, r0] [0, r1]
+ // cursor = ^ cached [0, r0]
+ assertTrue(scan.hasNext());
+
+ assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1]
+ // cursor = ^ cached [0, r0]
+ put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId0));
+
+ assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ cached [0, r0]
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId1));
+
+ assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ no cached row
+ assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.next(),
firstColumn(serializer)));
+ assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ cached [0, r1]
+ assertTrue(scan.hasNext());
+ assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ no cached row
+ assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.next(),
firstColumn(serializer)));
+ assertEquals(SimpleRow.of(1, rowId1), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ cached [1, r1]
+ assertTrue(scan.hasNext());
+ assertEquals(SimpleRow.of(1, rowId1), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ no cached row
+ assertEquals(SimpleRow.of(1, rowId1), SimpleRow.of(scan.next(),
firstColumn(serializer)));
+ assertNull(scan.peek());
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertNull(scan.peek());
+
+ // index = [-1, r0] [0, r0] [0, r1] [1, r1]
+ // cursor = ^ already finished
+ assertThrows(NoSuchElementException.class, scan::next);
+ assertNull(scan.peek());
+ }
+
+ @Test
+ void testScanPeekRemoveRows() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+ RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+
+ PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0));
+ put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+ // index = [0, r0] [0, r1] [1, r0] [2, r1]
+ // cursor = ^ no cached row
+ assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0, r1] [1, r0] [2, r1]
+ // cursor = ^ no cached row
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+ // index = [0, r1] [1, r0] [2, r1]
+ // cursor = ^ no cached row
+ assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [1, r0] [2, r1]
+ // cursor = ^ no cached row
+ remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+ assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [1, r0] [2, r1]
+ // cursor = ^ cached [1, r0]
+ assertTrue(scan.hasNext());
+ assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [1, r0]
+ // cursor = ^ cached [1, r0]
+ remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+ assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.next(),
firstColumn(serializer)));
+ assertNull(scan.peek());
+
+ // index = [1, r0]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertNull(scan.peek());
+
+ // index = [1, r0]
+ // cursor = ^ already finished
+ assertThrows(NoSuchElementException.class, scan::next);
+ assertNull(scan.peek());
+ }
+
+ @Test
+ void testScanPeekReplaceRow() {
+ SortedIndexDefinition indexDefinition =
SchemaBuilders.sortedIndex("TEST_IDX")
+
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+ .build();
+
+ SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+ BinaryTupleRowSerializer serializer = new
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+ PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+ RowId rowId = new RowId(TEST_PARTITION);
+
+ // index = [0] [1]
+ // cursor = ^ with no cached row
+ put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+ // index = [0] [1]
+ // cursor = ^ with cached [0]
+ assertTrue(scan.hasNext());
+
+ assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0] [2]
+ // cursor = ^ with cached [0]
+ remove(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+ put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+ assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0] [2]
+ // cursor = ^ with no cached row
+ assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.next(),
firstColumn(serializer)));
+ assertEquals(SimpleRow.of(2, rowId), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0] [2]
+ // cursor = ^ with cached [2]
+ assertTrue(scan.hasNext());
+
+ assertEquals(SimpleRow.of(2, rowId), SimpleRow.of(scan.peek(),
firstColumn(serializer)));
+
+ // index = [0] [2]
+ // cursor = ^ with no cached row
+ assertEquals(SimpleRow.of(2, rowId), SimpleRow.of(scan.next(),
firstColumn(serializer)));
+
+ // index = [0] [2]
+ // cursor = ^ already finished
+ assertFalse(scan.hasNext());
+ assertNull(scan.peek());
+
+ // index = [0] [2]
+ // cursor = ^ already finished
+ assertThrows(NoSuchElementException.class, scan::next);
+ assertNull(scan.peek());
+ }
+
private List<ColumnDefinition> shuffledRandomDefinitions() {
return shuffledDefinitions(d -> random.nextBoolean());
}
@@ -1377,4 +1649,35 @@ public abstract class AbstractSortedIndexStorageTest {
private static <T> T firstArrayElement(Object[] objects) {
return (T) objects[0];
}
+
+ private static final class SimpleRow<T> {
+ private final T indexColumns;
+
+ private final RowId rowId;
+
+ private SimpleRow(T indexColumns, RowId rowId) {
+ this.indexColumns = indexColumns;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SimpleRow<?> simpleRow = (SimpleRow<?>) o;
+ return Objects.equals(indexColumns, simpleRow.indexColumns) &&
Objects.equals(rowId, simpleRow.rowId);
+ }
+
+ private static <T> SimpleRow<T> of(T indexColumns, RowId rowId) {
+ return new SimpleRow<>(indexColumns, rowId);
+ }
+
+ private static <T> SimpleRow<T> of(IndexRow indexRow,
Function<IndexRow, T> mapper) {
+ return new SimpleRow<>(mapper.apply(indexRow), indexRow.rowId());
+ }
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index bef8d5846d..8286ecdc59 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
@@ -123,7 +124,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
}
@Override
- public Cursor<IndexRow> scan(
+ public PeekCursor<IndexRow> scan(
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags
@@ -190,14 +191,14 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
}
}
- private class ScanCursor implements Cursor<IndexRow> {
+ private class ScanCursor implements PeekCursor<IndexRow> {
private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>>
indexMap;
@Nullable
private Boolean hasNext;
@Nullable
- private Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry;
+ private Entry<ByteBuffer, NavigableMap<RowId, Object>> currentEntry;
@Nullable
private RowId rowId;
@@ -234,7 +235,43 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
this.hasNext = null;
- return new IndexRowImpl(new
BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry.getKey()), rowId);
+ return new IndexRowImpl(new
BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
+ }
+
+ @Override
+ public @Nullable IndexRow peek() {
+ if (hasNext != null) {
+ if (hasNext) {
+ return new IndexRowImpl(new
BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
+ }
+
+ return null;
+ }
+
+ Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry0 =
currentEntry == null ? indexMap.firstEntry() : currentEntry;
+
+ RowId nextRowId = null;
+
+ if (rowId == null) {
+ if (indexMapEntry0 != null) {
+ nextRowId =
getRowId(indexMapEntry0.getValue().firstEntry());
+ }
+ } else {
+ Entry<RowId, Object> nextRowIdEntry =
indexMapEntry0.getValue().higherEntry(rowId);
+
+ if (nextRowIdEntry != null) {
+ nextRowId = nextRowIdEntry.getKey();
+ } else {
+ indexMapEntry0 =
indexMap.higherEntry(indexMapEntry0.getKey());
+
+ if (indexMapEntry0 != null) {
+ nextRowId =
getRowId(indexMapEntry0.getValue().firstEntry());
+ }
+ }
+ }
+
+ return nextRowId == null
+ ? null : new IndexRowImpl(new
BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry0.getKey()),
nextRowId);
}
private void advanceIfNeeded() {
@@ -242,30 +279,30 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
return;
}
- if (indexMapEntry == null) {
- indexMapEntry = indexMap.firstEntry();
+ if (currentEntry == null) {
+ currentEntry = indexMap.firstEntry();
}
if (rowId == null) {
- if (indexMapEntry != null) {
- rowId = getRowId(indexMapEntry.getValue().firstEntry());
+ if (currentEntry != null) {
+ rowId = getRowId(currentEntry.getValue().firstEntry());
}
} else {
- Entry<RowId, Object> nextRowIdEntry =
indexMapEntry.getValue().higherEntry(rowId);
+ Entry<RowId, Object> nextRowIdEntry =
currentEntry.getValue().higherEntry(rowId);
if (nextRowIdEntry != null) {
rowId = nextRowIdEntry.getKey();
} else {
- Entry<ByteBuffer, NavigableMap<RowId, Object>>
nextIndexMapEntry = indexMap.higherEntry(indexMapEntry.getKey());
+ Entry<ByteBuffer, NavigableMap<RowId, Object>>
nextIndexMapEntry = indexMap.higherEntry(currentEntry.getKey());
if (nextIndexMapEntry == null) {
hasNext = false;
return;
} else {
- indexMapEntry = nextIndexMapEntry;
+ currentEntry = nextIndexMapEntry;
- rowId =
getRowId(indexMapEntry.getValue().firstEntry());
+ rowId = getRowId(currentEntry.getValue().firstEntry());
}
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index b9d4beffeb..c29dab554c 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
@@ -163,7 +164,7 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
}
@Override
- public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
+ public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
if (!closeBusyLock.enterBusy()) {
throwStorageClosedException();
}
@@ -272,7 +273,7 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
};
}
- private class ScanCursor implements Cursor<IndexRow> {
+ private class ScanCursor implements PeekCursor<IndexRow> {
@Nullable
private Boolean hasNext;
@@ -337,6 +338,35 @@ public class PageMemorySortedIndexStorage implements
SortedIndexStorage {
}
}
+ @Override
+ public @Nullable IndexRow peek() {
+ if (hasNext != null) {
+ if (hasNext) {
+ return toIndexRowImpl(treeRow);
+ }
+
+ return null;
+ }
+
+ try {
+ SortedIndexRow nextTreeRow;
+
+ if (treeRow == null) {
+ nextTreeRow = lower == null ? sortedIndexTree.findFirst()
: sortedIndexTree.findNext(lower, true);
+ } else {
+ nextTreeRow = sortedIndexTree.findNext(treeRow, false);
+ }
+
+ if (nextTreeRow == null || (upper != null &&
compareRows(nextTreeRow, upper) >= 0)) {
+ return null;
+ }
+
+ return toIndexRowImpl(nextTreeRow);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error when peeking next element",
e);
+ }
+ }
+
private void advanceIfNeeded() throws IgniteInternalCheckedException {
if (hasNext != null) {
return;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index b06cc00a74..d28f2f41f9 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.storage.rocksdb.index;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.CursorUtils.map;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
+import java.util.function.Function;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
@@ -93,7 +94,7 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
public Cursor<RowId> get(BinaryTuple key) throws StorageException {
BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
- return map(scan(keyPrefix, keyPrefix, true, true), this::decodeRowId);
+ return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
}
@Override
@@ -119,18 +120,19 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
}
@Override
- public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
+ public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
- return map(scan(lowerBound, upperBound, includeLower, includeUpper),
this::decodeRow);
+ return scan(lowerBound, upperBound, includeLower, includeUpper,
this::decodeRow);
}
- private Cursor<ByteBuffer> scan(
+ private <T> PeekCursor<T> scan(
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
boolean includeLower,
- boolean includeUpper
+ boolean includeUpper,
+ Function<ByteBuffer, T> mapper
) {
byte[] lowerBoundBytes;
@@ -158,17 +160,21 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
}
}
- return createScanCursor(lowerBoundBytes, upperBoundBytes);
+ return createScanCursor(lowerBoundBytes, upperBoundBytes, mapper);
}
- private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound,
byte @Nullable [] upperBound) {
+ private <T> PeekCursor<T> createScanCursor(
+ byte @Nullable [] lowerBound,
+ byte @Nullable [] upperBound,
+ Function<ByteBuffer, T> mapper
+ ) {
Slice upperBoundSlice = upperBound == null ? new
Slice(partitionStorage.partitionEndPrefix()) : new Slice(upperBound);
ReadOptions options = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
RocksIterator it = indexCf.newIterator(options);
- return new Cursor<>() {
+ return new PeekCursor<>() {
@Nullable
private Boolean hasNext;
@@ -191,7 +197,7 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
}
@Override
- public ByteBuffer next() {
+ public T next() {
advanceIfNeeded();
boolean hasNext = this.hasNext;
@@ -202,7 +208,28 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
this.hasNext = null;
- return ByteBuffer.wrap(key).order(ORDER);
+ return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+ }
+
+ @Override
+ public @Nullable T peek() {
+ if (hasNext != null) {
+ if (hasNext) {
+ return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+ }
+
+ return null;
+ }
+
+ refreshAndPrepareRocksIterator();
+
+ if (!it.isValid()) {
+ RocksUtils.checkIterator(it);
+
+ return null;
+ } else {
+ return
mapper.apply(ByteBuffer.wrap(it.key()).order(ORDER));
+ }
}
private void advanceIfNeeded() throws StorageException {
@@ -210,6 +237,20 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
return;
}
+ refreshAndPrepareRocksIterator();
+
+ if (!it.isValid()) {
+ RocksUtils.checkIterator(it);
+
+ hasNext = false;
+ } else {
+ key = it.key();
+
+ hasNext = true;
+ }
+ }
+
+ private void refreshAndPrepareRocksIterator() {
try {
it.refresh();
} catch (RocksDBException e) {
@@ -229,16 +270,6 @@ public class RocksDbSortedIndexStorage implements
SortedIndexStorage {
it.seek(lowerBound == null ?
partitionStorage.partitionStartPrefix() : lowerBound);
}
}
-
- if (!it.isValid()) {
- RocksUtils.checkIterator(it);
-
- hasNext = false;
- } else {
- key = it.key();
-
- hasNext = true;
- }
}
};
}