This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 761c899d3e IGNITE-18243 Implement a peek method for the sorted index 
cursor (#1424)
761c899d3e is described below

commit 761c899d3eb34df8b0d56e306c96293b72402d44
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Dec 13 15:30:03 2022 +0300

    IGNITE-18243 Implement a peek method for the sorted index cursor (#1424)
---
 .../ignite/internal/storage/index/PeekCursor.java  |  41 +++
 .../internal/storage/index/SortedIndexStorage.java |   3 +-
 .../index/AbstractSortedIndexStorageTest.java      | 303 +++++++++++++++++++++
 .../storage/index/impl/TestSortedIndexStorage.java |  61 ++++-
 .../index/sorted/PageMemorySortedIndexStorage.java |  34 ++-
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  73 +++--
 6 files changed, 478 insertions(+), 37 deletions(-)

diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PeekCursor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PeekCursor.java
new file mode 100644
index 0000000000..2ef036f690
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/PeekCursor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link Cursor} extension with the ability to {@link #peek() peek} at the 
next element.
+ */
+public interface PeekCursor<T> extends Cursor<T> {
+    /**
+     * Returns the next element without advancing the cursor, {@code null} if 
there is no next element.
+     *
+     * <p>Usage notes:
+     * <ul>
+     *     <li>After the cursor is created, {@code #peek()} will return the 
actual (up-to-date) next element;</li>
+     *     <li>After calling {@link #hasNext()}, if it returned {@code true}, 
then {@code peek()} will return the element (cached) that
+     *     {@link #next()} would return, but without advancing the cursor;</li>
+     *     <li>After calling {@link #hasNext()}, if it returned {@code false}, 
then {@code peek()} will always return {@code null};</li>
+     *     <li>After {@link #next()} is called, but before {@link #hasNext()} 
is called, {@code peek()} will always return the actual
+     *     (up-to-date) next element without advancing the cursor.</li>
+     * </ul>
+     */
+    @Nullable T peek();
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 2aa1542537..636d8f1ec1 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage.index;
 
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.util.Cursor;
 import org.intellij.lang.annotations.MagicConstant;
 import org.jetbrains.annotations.Nullable;
 
@@ -59,7 +58,7 @@ public interface SortedIndexStorage extends IndexStorage {
      * @return Cursor with fetched index rows.
      * @throws IllegalArgumentException If backwards flag is passed and 
backwards iteration is not supported by the storage.
      */
-    Cursor<IndexRow> scan(
+    PeekCursor<IndexRow> scan(
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 13c2f3123d..cfc5760288 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -48,6 +49,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -1233,6 +1235,276 @@ public abstract class AbstractSortedIndexStorageTest {
         assertThrows(NoSuchElementException.class, scan::next);
     }
 
+
+    @Test
+    void testScanPeekForFinishedCursor() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        PeekCursor<IndexRow> scan0 = indexStorage.scan(null, null, 0);
+        PeekCursor<IndexRow> scan1 = indexStorage.scan(null, null, 0);
+
+        // index   =
+        // cursor0 = ^ already finished
+        assertFalse(scan0.hasNext());
+        assertNull(scan0.peek());
+
+        // index   =
+        // cursor1 = ^ already finished
+        assertThrows(NoSuchElementException.class, scan1::next);
+        assertNull(scan1.peek());
+
+        // index   =  [0]
+        // cursor0 = ^ already finished
+        // cursor1 = ^ already finished
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, new 
RowId(TEST_PARTITION)));
+
+        assertNull(scan0.peek());
+        assertNull(scan1.peek());
+
+        // index   =  [0]
+        // cursor0 = ^ no cached row
+        // cursor1 = ^ no cached row
+        scan0 = indexStorage.scan(null, null, 0);
+        scan1 = indexStorage.scan(null, null, 0);
+
+        assertEquals(0, serializer.deserializeColumns(scan0.peek())[0]);
+        assertEquals(0, serializer.deserializeColumns(scan1.peek())[0]);
+
+        // index   = [0]
+        // cursor0 =    ^ cached [0]
+        assertTrue(scan0.hasNext());
+        assertEquals(0, serializer.deserializeColumns(scan0.peek())[0]);
+
+        // index   = [0]
+        // cursor0 =    ^ no cached row
+        assertEquals(0, serializer.deserializeColumns(scan0.next())[0]);
+        assertNull(scan0.peek());
+
+        // index   = [0]
+        // cursor0 =    ^ already finished
+        assertFalse(scan0.hasNext());
+        assertThrows(NoSuchElementException.class, scan0::next);
+        assertNull(scan0.peek());
+
+        // index   = [0]
+        // cursor1 =    ^ no cached row
+        assertEquals(0, serializer.deserializeColumns(scan1.next())[0]);
+        assertNull(scan1.peek());
+
+        // index   = [0]
+        // cursor1 =    ^ already finished
+        assertThrows(NoSuchElementException.class, scan1::next);
+        assertNull(scan1.peek());
+    }
+
+    @Test
+    void testScanPeekAddRowsOnly() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+        RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+
+        PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  =  [0, r1]
+        // cursor = ^ no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+        assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  =  [0, r0] [0, r1]
+        // cursor = ^ no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+        assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [0, r0] [0, r1]
+        // cursor =        ^ cached [0, r0]
+        assertTrue(scan.hasNext());
+
+        assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1]
+        // cursor =                 ^ cached [0, r0]
+        put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId0));
+
+        assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                 ^ cached [0, r0]
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId1));
+
+        assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                 ^ no cached row
+        assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.next(), 
firstColumn(serializer)));
+        assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                         ^ cached [0, r1]
+        assertTrue(scan.hasNext());
+        assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                         ^ no cached row
+        assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.next(), 
firstColumn(serializer)));
+        assertEquals(SimpleRow.of(1, rowId1), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                                 ^ cached [1, r1]
+        assertTrue(scan.hasNext());
+        assertEquals(SimpleRow.of(1, rowId1), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                                 ^ no cached row
+        assertEquals(SimpleRow.of(1, rowId1), SimpleRow.of(scan.next(), 
firstColumn(serializer)));
+        assertNull(scan.peek());
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                                 ^ already finished
+        assertFalse(scan.hasNext());
+        assertNull(scan.peek());
+
+        // index  = [-1, r0] [0, r0] [0, r1] [1, r1]
+        // cursor =                                 ^ already finished
+        assertThrows(NoSuchElementException.class, scan::next);
+        assertNull(scan.peek());
+    }
+
+    @Test
+    void testScanPeekRemoveRows() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+        RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+
+        PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0));
+        put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+        // index  =  [0, r0] [0, r1] [1, r0] [2, r1]
+        // cursor = ^ no cached row
+        assertEquals(SimpleRow.of(0, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  =  [0, r1] [1, r0] [2, r1]
+        // cursor = ^ no cached row
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+        // index  =  [0, r1] [1, r0] [2, r1]
+        // cursor = ^ no cached row
+        assertEquals(SimpleRow.of(0, rowId1), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  =  [1, r0] [2, r1]
+        // cursor = ^ no cached row
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+        assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [1, r0] [2, r1]
+        // cursor =        ^ cached [1, r0]
+        assertTrue(scan.hasNext());
+        assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [1, r0]
+        // cursor =        ^ cached [1, r0]
+        remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+        assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        assertEquals(SimpleRow.of(1, rowId0), SimpleRow.of(scan.next(), 
firstColumn(serializer)));
+        assertNull(scan.peek());
+
+        // index  = [1, r0]
+        // cursor =        ^ already finished
+        assertFalse(scan.hasNext());
+        assertNull(scan.peek());
+
+        // index  = [1, r0]
+        // cursor =        ^ already finished
+        assertThrows(NoSuchElementException.class, scan::next);
+        assertNull(scan.peek());
+    }
+
+    @Test
+    void testScanPeekReplaceRow() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        RowId rowId = new RowId(TEST_PARTITION);
+
+        // index  =  [0] [1]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+        // index  = [0] [1]
+        // cursor =    ^ with cached [0]
+        assertTrue(scan.hasNext());
+
+        assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [0] [2]
+        // cursor =    ^ with cached [0]
+        remove(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+        assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [0] [2]
+        // cursor =    ^ with no cached row
+        assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.next(), 
firstColumn(serializer)));
+        assertEquals(SimpleRow.of(2, rowId), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [0] [2]
+        // cursor =    ^ with cached [2]
+        assertTrue(scan.hasNext());
+
+        assertEquals(SimpleRow.of(2, rowId), SimpleRow.of(scan.peek(), 
firstColumn(serializer)));
+
+        // index  = [0] [2]
+        // cursor =        ^ with no cached row
+        assertEquals(SimpleRow.of(2, rowId), SimpleRow.of(scan.next(), 
firstColumn(serializer)));
+
+        // index  = [0] [2]
+        // cursor =        ^ already finished
+        assertFalse(scan.hasNext());
+        assertNull(scan.peek());
+
+        // index  = [0] [2]
+        // cursor =        ^ already finished
+        assertThrows(NoSuchElementException.class, scan::next);
+        assertNull(scan.peek());
+    }
+
     private List<ColumnDefinition> shuffledRandomDefinitions() {
         return shuffledDefinitions(d -> random.nextBoolean());
     }
@@ -1377,4 +1649,35 @@ public abstract class AbstractSortedIndexStorageTest {
     private static <T> T firstArrayElement(Object[] objects) {
         return (T) objects[0];
     }
+
+    private static final class SimpleRow<T> {
+        private final T indexColumns;
+
+        private final RowId rowId;
+
+        private SimpleRow(T indexColumns, RowId rowId) {
+            this.indexColumns = indexColumns;
+            this.rowId = rowId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SimpleRow<?> simpleRow = (SimpleRow<?>) o;
+            return Objects.equals(indexColumns, simpleRow.indexColumns) && 
Objects.equals(rowId, simpleRow.rowId);
+        }
+
+        private static <T> SimpleRow<T> of(T indexColumns, RowId rowId) {
+            return new SimpleRow<>(indexColumns, rowId);
+        }
+
+        private static <T> SimpleRow<T> of(IndexRow indexRow, 
Function<IndexRow, T> mapper) {
+            return new SimpleRow<>(mapper.apply(indexRow), indexRow.rowId());
+        }
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index bef8d5846d..8286ecdc59 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.PeekCursor;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.util.Cursor;
@@ -123,7 +124,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
     }
 
     @Override
-    public Cursor<IndexRow> scan(
+    public PeekCursor<IndexRow> scan(
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             int flags
@@ -190,14 +191,14 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
         }
     }
 
-    private class ScanCursor implements Cursor<IndexRow> {
+    private class ScanCursor implements PeekCursor<IndexRow> {
         private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> 
indexMap;
 
         @Nullable
         private Boolean hasNext;
 
         @Nullable
-        private Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry;
+        private Entry<ByteBuffer, NavigableMap<RowId, Object>> currentEntry;
 
         @Nullable
         private RowId rowId;
@@ -234,7 +235,43 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
             this.hasNext = null;
 
-            return new IndexRowImpl(new 
BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry.getKey()), rowId);
+            return new IndexRowImpl(new 
BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
+        }
+
+        @Override
+        public @Nullable IndexRow peek() {
+            if (hasNext != null) {
+                if (hasNext) {
+                    return new IndexRowImpl(new 
BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
+                }
+
+                return null;
+            }
+
+            Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry0 = 
currentEntry == null ? indexMap.firstEntry() : currentEntry;
+
+            RowId nextRowId = null;
+
+            if (rowId == null) {
+                if (indexMapEntry0 != null) {
+                    nextRowId = 
getRowId(indexMapEntry0.getValue().firstEntry());
+                }
+            } else {
+                Entry<RowId, Object> nextRowIdEntry = 
indexMapEntry0.getValue().higherEntry(rowId);
+
+                if (nextRowIdEntry != null) {
+                    nextRowId = nextRowIdEntry.getKey();
+                } else {
+                    indexMapEntry0 = 
indexMap.higherEntry(indexMapEntry0.getKey());
+
+                    if (indexMapEntry0 != null) {
+                        nextRowId = 
getRowId(indexMapEntry0.getValue().firstEntry());
+                    }
+                }
+            }
+
+            return nextRowId == null
+                    ? null : new IndexRowImpl(new 
BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry0.getKey()), 
nextRowId);
         }
 
         private void advanceIfNeeded() {
@@ -242,30 +279,30 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
                 return;
             }
 
-            if (indexMapEntry == null) {
-                indexMapEntry = indexMap.firstEntry();
+            if (currentEntry == null) {
+                currentEntry = indexMap.firstEntry();
             }
 
             if (rowId == null) {
-                if (indexMapEntry != null) {
-                    rowId = getRowId(indexMapEntry.getValue().firstEntry());
+                if (currentEntry != null) {
+                    rowId = getRowId(currentEntry.getValue().firstEntry());
                 }
             } else {
-                Entry<RowId, Object> nextRowIdEntry = 
indexMapEntry.getValue().higherEntry(rowId);
+                Entry<RowId, Object> nextRowIdEntry = 
currentEntry.getValue().higherEntry(rowId);
 
                 if (nextRowIdEntry != null) {
                     rowId = nextRowIdEntry.getKey();
                 } else {
-                    Entry<ByteBuffer, NavigableMap<RowId, Object>> 
nextIndexMapEntry = indexMap.higherEntry(indexMapEntry.getKey());
+                    Entry<ByteBuffer, NavigableMap<RowId, Object>> 
nextIndexMapEntry = indexMap.higherEntry(currentEntry.getKey());
 
                     if (nextIndexMapEntry == null) {
                         hasNext = false;
 
                         return;
                     } else {
-                        indexMapEntry = nextIndexMapEntry;
+                        currentEntry = nextIndexMapEntry;
 
-                        rowId = 
getRowId(indexMapEntry.getValue().firstEntry());
+                        rowId = getRowId(currentEntry.getValue().firstEntry());
                     }
                 }
             }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index b9d4beffeb..c29dab554c 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.PeekCursor;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
@@ -163,7 +164,7 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
     }
 
     @Override
-    public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, 
@Nullable BinaryTuplePrefix upperBound, int flags) {
+    public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, 
@Nullable BinaryTuplePrefix upperBound, int flags) {
         if (!closeBusyLock.enterBusy()) {
             throwStorageClosedException();
         }
@@ -272,7 +273,7 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
         };
     }
 
-    private class ScanCursor implements Cursor<IndexRow> {
+    private class ScanCursor implements PeekCursor<IndexRow> {
         @Nullable
         private Boolean hasNext;
 
@@ -337,6 +338,35 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
             }
         }
 
+        @Override
+        public @Nullable IndexRow peek() {
+            if (hasNext != null) {
+                if (hasNext) {
+                    return toIndexRowImpl(treeRow);
+                }
+
+                return null;
+            }
+
+            try {
+                SortedIndexRow nextTreeRow;
+
+                if (treeRow == null) {
+                    nextTreeRow = lower == null ? sortedIndexTree.findFirst() 
: sortedIndexTree.findNext(lower, true);
+                } else {
+                    nextTreeRow = sortedIndexTree.findNext(treeRow, false);
+                }
+
+                if (nextTreeRow == null || (upper != null && 
compareRows(nextTreeRow, upper) >= 0)) {
+                    return null;
+                }
+
+                return toIndexRowImpl(nextTreeRow);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error when peeking next element", 
e);
+            }
+        }
+
         private void advanceIfNeeded() throws IgniteInternalCheckedException {
             if (hasNext != null) {
                 return;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index b06cc00a74..d28f2f41f9 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.storage.rocksdb.index;
 
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.CursorUtils.map;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.NoSuchElementException;
+import java.util.function.Function;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.PeekCursor;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
@@ -93,7 +94,7 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
         BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
 
-        return map(scan(keyPrefix, keyPrefix, true, true), this::decodeRowId);
+        return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
     }
 
     @Override
@@ -119,18 +120,19 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
     }
 
     @Override
-    public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, 
@Nullable BinaryTuplePrefix upperBound, int flags) {
+    public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, 
@Nullable BinaryTuplePrefix upperBound, int flags) {
         boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
         boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
 
-        return map(scan(lowerBound, upperBound, includeLower, includeUpper), 
this::decodeRow);
+        return scan(lowerBound, upperBound, includeLower, includeUpper, 
this::decodeRow);
     }
 
-    private Cursor<ByteBuffer> scan(
+    private <T> PeekCursor<T> scan(
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             boolean includeLower,
-            boolean includeUpper
+            boolean includeUpper,
+            Function<ByteBuffer, T> mapper
     ) {
         byte[] lowerBoundBytes;
 
@@ -158,17 +160,21 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
             }
         }
 
-        return createScanCursor(lowerBoundBytes, upperBoundBytes);
+        return createScanCursor(lowerBoundBytes, upperBoundBytes, mapper);
     }
 
-    private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound, 
byte @Nullable [] upperBound) {
+    private <T> PeekCursor<T> createScanCursor(
+            byte @Nullable [] lowerBound,
+            byte @Nullable [] upperBound,
+            Function<ByteBuffer, T> mapper
+    ) {
         Slice upperBoundSlice = upperBound == null ? new 
Slice(partitionStorage.partitionEndPrefix()) : new Slice(upperBound);
 
         ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBoundSlice);
 
         RocksIterator it = indexCf.newIterator(options);
 
-        return new Cursor<>() {
+        return new PeekCursor<>() {
             @Nullable
             private Boolean hasNext;
 
@@ -191,7 +197,7 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
             }
 
             @Override
-            public ByteBuffer next() {
+            public T next() {
                 advanceIfNeeded();
 
                 boolean hasNext = this.hasNext;
@@ -202,7 +208,28 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
 
                 this.hasNext = null;
 
-                return ByteBuffer.wrap(key).order(ORDER);
+                return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+            }
+
+            @Override
+            public @Nullable T peek() {
+                if (hasNext != null) {
+                    if (hasNext) {
+                        return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+                    }
+
+                    return null;
+                }
+
+                refreshAndPrepareRocksIterator();
+
+                if (!it.isValid()) {
+                    RocksUtils.checkIterator(it);
+
+                    return null;
+                } else {
+                    return 
mapper.apply(ByteBuffer.wrap(it.key()).order(ORDER));
+                }
             }
 
             private void advanceIfNeeded() throws StorageException {
@@ -210,6 +237,20 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
                     return;
                 }
 
+                refreshAndPrepareRocksIterator();
+
+                if (!it.isValid()) {
+                    RocksUtils.checkIterator(it);
+
+                    hasNext = false;
+                } else {
+                    key = it.key();
+
+                    hasNext = true;
+                }
+            }
+
+            private void refreshAndPrepareRocksIterator() {
                 try {
                     it.refresh();
                 } catch (RocksDBException e) {
@@ -229,16 +270,6 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
                         it.seek(lowerBound == null ? 
partitionStorage.partitionStartPrefix() : lowerBound);
                     }
                 }
-
-                if (!it.isValid()) {
-                    RocksUtils.checkIterator(it);
-
-                    hasNext = false;
-                } else {
-                    key = it.key();
-
-                    hasNext = true;
-                }
             }
         };
     }

Reply via email to