This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 73cbbddc18d IGNITE-27048 Stop partition scan on index build early 
(#6976)
73cbbddc18d is described below

commit 73cbbddc18d79cfb30101e89f70d0e0012896ff5
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Nov 17 17:35:09 2025 +0400

    IGNITE-27048 Stop partition scan on index build early (#6976)
---
 .../ignite/internal/index/IndexBuildTask.java      | 19 +++++---
 .../ignite/internal/pagememory/tree/BplusTree.java | 19 ++++----
 .../internal/pagememory/tree/IgniteTree.java       | 12 ++---
 .../internal/storage/MvPartitionStorage.java       | 12 ++++-
 .../storage/ThreadAssertingMvPartitionStorage.java | 11 ++++-
 .../AbstractMvPartitionStorageConcurrencyTest.java | 12 +++--
 .../storage/AbstractMvPartitionStorageGcTest.java  |  2 +
 .../storage/AbstractMvPartitionStorageTest.java    | 57 +++++++++++++++++-----
 .../storage/AbstractMvTableStorageTest.java        | 12 ++++-
 .../storage/impl/TestMvPartitionStorage.java       | 13 +++--
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 20 +++++++-
 .../storage/rocksdb/PartitionDataHelper.java       | 13 +++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 51 +++++++++++++++++--
 13 files changed, 200 insertions(+), 53 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 89be2e29abb..22770d8cfee 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
@@ -166,7 +167,8 @@ class IndexBuildTask {
         LOG.info("Start building the index: [{}]", createCommonIndexInfo());
 
         try {
-            supplyAsync(this::handleNextBatch, executor)
+            supplyAsync(partitionStorage::highestRowId, executor)
+                    .thenApplyAsync(this::handleNextBatch, executor)
                     .thenCompose(Function.identity())
                     .whenComplete((unused, throwable) -> {
                         if (throwable != null) {
@@ -220,13 +222,13 @@ class IndexBuildTask {
         return taskFuture;
     }
 
-    private CompletableFuture<Void> handleNextBatch() {
+    private CompletableFuture<Void> handleNextBatch(@Nullable RowId 
highestRowId) {
         if (!enterBusy()) {
             return nullCompletedFuture();
         }
 
         try {
-            return createBatchToIndex()
+            return createBatchToIndex(highestRowId)
                     .thenCompose(batch -> {
                         return replicaService.invoke(node, 
createBuildIndexReplicaRequest(batch, initialOperationTimestamp));
                     })
@@ -247,7 +249,7 @@ class IndexBuildTask {
                             return 
CompletableFutures.<Void>nullCompletedFuture();
                         }
 
-                        return handleNextBatch();
+                        return handleNextBatch(highestRowId);
                     }, executor)
                     .thenCompose(Function.identity());
         } catch (Throwable t) {
@@ -257,13 +259,18 @@ class IndexBuildTask {
         }
     }
 
-    private CompletableFuture<BatchToIndex> createBatchToIndex() {
+    private CompletableFuture<BatchToIndex> createBatchToIndex(@Nullable RowId 
highestRowId) {
+        if (highestRowId == null) {
+            return completedFuture(new BatchToIndex(List.of(), Set.of()));
+        }
+
         RowId nextRowIdToBuild = indexStorage.getNextRowIdToBuild();
 
         List<RowId> rowIds = new ArrayList<>(batchSize);
         Map<UUID, CommitPartitionId> transactionsToResolve = new HashMap<>();
 
-        List<RowMeta> rows = nextRowIdToBuild == null ? List.of() : 
partitionStorage.rowsStartingWith(nextRowIdToBuild, batchSize);
+        List<RowMeta> rows = nextRowIdToBuild == null ? List.of()
+                : partitionStorage.rowsStartingWith(nextRowIdToBuild, 
highestRowId, batchSize);
 
         for (RowMeta row : rows) {
             rowIds.add(row.rowId());
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index c9d7c1318a1..4995258d104 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1215,32 +1215,33 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
     }
 
     @Override
-    public final Cursor<T> find(@Nullable L lower, @Nullable L upper) throws 
IgniteInternalCheckedException {
-        return find(lower, upper, null);
+    public final Cursor<T> find(@Nullable L lowerInclusive, @Nullable L 
upperInclusive) throws IgniteInternalCheckedException {
+        return find(lowerInclusive, upperInclusive, null);
     }
 
     @Override
-    public final Cursor<T> find(@Nullable L lower, @Nullable L upper, 
@Nullable Object x) throws IgniteInternalCheckedException {
-        return find(lower, upper, null, x);
+    public final Cursor<T> find(@Nullable L lowerInclusive, @Nullable L 
upperInclusive, @Nullable Object x)
+            throws IgniteInternalCheckedException {
+        return find(lowerInclusive, upperInclusive, null, x);
     }
 
     /**
      * Getting the cursor through the rows of the tree.
      *
-     * @param lower Lower bound inclusive or {@code null} if unbounded.
-     * @param upper Upper bound inclusive or {@code null} if unbounded.
+     * @param lowerInclusive Lower bound inclusive or {@code null} if 
unbounded.
+     * @param upperInclusive Upper bound inclusive or {@code null} if 
unbounded.
      * @param c Tree row closure.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteInternalCheckedException If failed.
      */
     public <R> Cursor<R> find(
-            @Nullable L lower,
-            @Nullable L upper,
+            @Nullable L lowerInclusive,
+            @Nullable L upperInclusive,
             @Nullable TreeRowMapClosure<L, T, R> c,
             @Nullable Object x
     ) throws IgniteInternalCheckedException {
-        return find(lower, upper, true, true, c, x);
+        return find(lowerInclusive, upperInclusive, true, true, c, x);
     }
 
     /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
index 064b4eca7e8..e89943f6c10 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
@@ -59,21 +59,21 @@ public interface IgniteTree<L, T> {
     /**
      * Returns a cursor from lower to upper bounds inclusive.
      *
-     * @param lower Lower bound or {@code null} if unbounded.
-     * @param upper Upper bound or {@code null} if unbounded.
+     * @param lowerInclusive Lower bound (inclusive) or {@code null} if 
unbounded.
+     * @param upperInclusive Upper bound (inclusive) or {@code null} if 
unbounded.
      * @throws IgniteInternalCheckedException If failed.
      */
-    Cursor<T> find(@Nullable L lower, @Nullable L upper) throws 
IgniteInternalCheckedException;
+    Cursor<T> find(@Nullable L lowerInclusive, @Nullable L upperInclusive) 
throws IgniteInternalCheckedException;
 
     /**
      * Returns a cursor from lower to upper bounds inclusive.
      *
-     * @param lower Lower bound or {@code null} if unbounded.
-     * @param upper Upper bound or {@code null} if unbounded.
+     * @param lowerInclusive Lower bound (inclusive) or {@code null} if 
unbounded.
+     * @param upperInclusive Upper bound (inclusive) or {@code null} if 
unbounded.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @throws IgniteInternalCheckedException If failed.
      */
-    Cursor<T> find(@Nullable L lower, @Nullable L upper, @Nullable Object x) 
throws IgniteInternalCheckedException;
+    Cursor<T> find(@Nullable L lowerInclusive, @Nullable L upperInclusive, 
@Nullable Object x) throws IgniteInternalCheckedException;
 
     /**
      * Returns a value mapped to the lowest key, or {@code null} if tree is 
empty.
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 7a9d70d0e87..42d13340b21 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -285,14 +285,22 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
      */
     @Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
 
+    /**
+     * Returns the greatest row ID, existing in the storage. {@code null} if 
the storage is empty.
+     *
+     * @throws StorageException If failed to read data from the storage.
+     */
+    @Nullable RowId highestRowId() throws StorageException;
+
     /**
      * Returns a batch of rows with subsequent IDs which IDs are greater or 
equal than the lower bound.
      *
-     * @param lowerBound Lower bound (inclusive).
+     * @param lowerBoundInclusive Lower bound (inclusive).
+     * @param upperBoundInclusive Upper bound (inclusive).
      * @param limit Maximum number of rows to return.
      * @throws StorageException If failed to read data from the storage.
      */
-    List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException;
+    List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId 
upperBoundInclusive, int limit) throws StorageException;
 
     /**
      * Returns the head of GC queue.
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 30a3419a92b..89f8fb0be44 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -152,10 +152,17 @@ public class ThreadAssertingMvPartitionStorage implements 
MvPartitionStorage, Wr
     }
 
     @Override
-    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
+    public @Nullable RowId highestRowId() throws StorageException {
         assertThreadAllowsToRead();
 
-        return partitionStorage.rowsStartingWith(lowerBound, limit);
+        return partitionStorage.highestRowId();
+    }
+
+    @Override
+    public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId 
upperBoundInclusive, int limit) throws StorageException {
+        assertThreadAllowsToRead();
+
+        return partitionStorage.rowsStartingWith(lowerBoundInclusive, 
upperBoundInclusive, limit);
     }
 
     @Override
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index aedf9d5c826..84a84571133 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -154,7 +154,9 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             abortWrite(ROW_ID, txId);
 
             assertNull(storage.closestRowId(ROW_ID));
-            assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE), 
is(empty()));
+            assertThat(storage.rowsStartingWith(ROW_ID, 
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE), is(empty()));
+
+            assertNull(storage.highestRowId());
         }
     }
 
@@ -197,7 +199,9 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             );
 
             assertNull(storage.closestRowId(ROW_ID));
-            assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE), 
is(empty()));
+            assertThat(storage.rowsStartingWith(ROW_ID, 
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE), is(empty()));
+
+            assertNull(storage.highestRowId());
         }
     }
 
@@ -224,7 +228,9 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
 
             assertNull(storage.closestRowId(ROW_ID));
-            assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE), 
is(empty()));
+            assertThat(storage.rowsStartingWith(ROW_ID, 
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE), is(empty()));
+
+            assertNull(storage.highestRowId());
 
             assertThat(rows, empty());
         }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index 96a960afe7e..913e60f5c07 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -86,6 +86,7 @@ public abstract class AbstractMvPartitionStorageGcTest 
extends BaseMvPartitionSt
 
         // Let's check that the storage is empty.
         assertNull(storage.closestRowId(ROW_ID));
+        assertNull(storage.highestRowId());
     }
 
     @Test
@@ -106,6 +107,7 @@ public abstract class AbstractMvPartitionStorageGcTest 
extends BaseMvPartitionSt
 
         // Let's check that the storage is empty.
         assertNull(storage.closestRowId(ROW_ID));
+        assertNull(storage.highestRowId());
     }
 
     @Test
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index ac6a35d6267..812c390282a 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1520,8 +1520,26 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         assertNull(storage.closestRowId(rowId2.increment()));
     }
 
+    @Test
+    void testHighestRowId() {
+        RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
+        RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
+
+        assertThat(storage.highestRowId(), is(nullValue()));
+
+        addWrite(rowId1, binaryRow, txId);
+
+        assertThat(storage.highestRowId(), is(rowId1));
+
+        addWrite(rowId2, binaryRow2, txId);
+
+        assertThat(storage.highestRowId(), is(rowId2));
+    }
+
     @Test
     void testRowsStartingWith() {
+        RowId highestRowId = RowId.highestRowId(PARTITION_ID);
+
         RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
         RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
         RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
@@ -1532,23 +1550,37 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         addWrite(rowId1, binaryRow, txId);
         addWrite(rowId2, binaryRow2, txId);
 
-        assertThat(storage.rowsStartingWith(rowId0, 0), is(empty()));
+        // Limiting with 0 provides no rows.
+        assertThat(storage.rowsStartingWith(rowId0, highestRowId, 0), 
is(empty()));
 
-        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0, 1));
-        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId0, Integer.MAX_VALUE));
-        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0.increment(), 1));
-        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId0.increment(), Integer.MAX_VALUE));
+        // Starting with rowId0 (which does not exist in the storage).
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0, highestRowId, 1));
+        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId0, highestRowId, Integer.MAX_VALUE));
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0.increment(), highestRowId, 1));
+        assertRowMetasEqual(
+                List.of(expectedRowMeta1, expectedRowMeta2),
+                storage.rowsStartingWith(rowId0.increment(), highestRowId, 
Integer.MAX_VALUE)
+        );
+
+        // Starting with rowId1 (which exists in the storage).
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId1, highestRowId, 1));
+        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId1, highestRowId, Integer.MAX_VALUE));
 
-        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId1, 1));
-        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId1, Integer.MAX_VALUE));
+        // Starting with rowId2 (which exists in the storage).
+        assertRowMetasEqual(List.of(expectedRowMeta2), 
storage.rowsStartingWith(rowId2, highestRowId, Integer.MAX_VALUE));
 
-        assertRowMetasEqual(List.of(expectedRowMeta2), 
storage.rowsStartingWith(rowId2, Integer.MAX_VALUE));
+        // Starting with a row ID that is greater than the greatest row ID in 
the storage.
+        assertThat(storage.rowsStartingWith(rowId2.increment(), highestRowId, 
Integer.MAX_VALUE), is(empty()));
 
-        assertThat(storage.rowsStartingWith(rowId2.increment(), 1), 
is(empty()));
+        // Upper bound limits the search
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0, rowId1, Integer.MAX_VALUE));
+
+        // Upper bound is inclusive.
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId1, rowId1, Integer.MAX_VALUE));
     }
 
     private static void assertRowMetasEqual(List<RowMeta> expected, 
List<RowMeta> actual) {
-        assertThat(expected, is(actual));
+        assertThat(actual, is(expected));
     }
 
     @Test
@@ -1559,7 +1591,10 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
 
         addWrite(rowId, binaryRow, txId);
 
-        assertRowMetasEqual(List.of(expectedRowMeta), 
storage.rowsStartingWith(RowId.lowestRowId(PARTITION_ID), Integer.MAX_VALUE));
+        assertRowMetasEqual(
+                List.of(expectedRowMeta),
+                storage.rowsStartingWith(RowId.lowestRowId(PARTITION_ID), 
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE)
+        );
     }
 
     @Test
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 0ad6634f230..26937ec879f 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -608,7 +608,11 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvTableStorageTest
         assertThrows(StorageDestroyedException.class, () -> 
storage.scanVersions(rowId));
 
         assertThrows(StorageDestroyedException.class, () -> 
storage.closestRowId(rowId));
-        assertThrows(StorageDestroyedException.class, () -> 
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
+        assertThrows(
+                StorageDestroyedException.class,
+                () -> storage.rowsStartingWith(rowId, 
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE)
+        );
+        assertThrows(StorageDestroyedException.class, () -> 
assertNull(storage.highestRowId()));
     }
 
     @Test
@@ -1560,7 +1564,11 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvTableStorageTest
             assertThrows(StorageRebalanceException.class, () -> 
storage.scanVersions(rowId));
             assertThrows(StorageRebalanceException.class, () -> 
storage.scan(clock.now()));
             assertThrows(StorageRebalanceException.class, () -> 
storage.closestRowId(rowId));
-            assertThrows(StorageRebalanceException.class, () -> 
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
+            assertThrows(
+                    StorageRebalanceException.class,
+                    () -> storage.rowsStartingWith(rowId, 
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE)
+            );
+            assertThrows(StorageRebalanceException.class, () -> 
storage.highestRowId());
 
             return null;
         });
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index b75bc31b249..de05824cbe0 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -657,15 +657,22 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
+    public @Nullable RowId highestRowId() throws StorageException {
+        checkStorageClosedOrInProcessOfRebalance();
+
+        return map.floorKey(RowId.highestRowId(partitionId));
+    }
+
+    @Override
+    public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId 
upperBoundInclusive, int limit) throws StorageException {
         checkStorageClosedOrInProcessOfRebalance();
 
         List<RowMeta> result = new ArrayList<>();
-        RowId currentLowerBound = lowerBound;
+        RowId currentLowerBound = lowerBoundInclusive;
         for (int i = 0; i < limit; i++) {
             RowMeta row = closestRow(currentLowerBound);
 
-            if (row == null) {
+            if (row == null || row.rowId().compareTo(upperBoundInclusive) > 0) 
{
                 break;
             }
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 28dc542db4d..495f0b2eb5f 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -619,11 +619,27 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     @Override
-    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
+    public @Nullable RowId highestRowId() throws StorageException {
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            try (Cursor<VersionChain> cursor = 
renewableState.versionChainTree().find(new VersionChainKey(lowerBound), null)) {
+            try {
+                VersionChain lastChain = 
renewableState.versionChainTree().findLast();
+                return lastChain == null ? null : lastChain.rowId();
+            } catch (Exception e) {
+                throw new StorageException("Error occurred while trying to 
read a row id", e);
+            }
+        });
+    }
+
+    @Override
+    public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId 
upperBoundInclusive, int limit) throws StorageException {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            VersionChainKey lowerBoundKey = new 
VersionChainKey(lowerBoundInclusive);
+            VersionChainKey upperBoundKey = new 
VersionChainKey(upperBoundInclusive);
+            try (Cursor<VersionChain> cursor = 
renewableState.versionChainTree().find(lowerBoundKey, upperBoundKey)) {
                 List<RowMeta> result = new ArrayList<>();
 
                 for (int i = 0; i < limit && cursor.hasNext(); i++) {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index 194a25d5838..9beeaf560d8 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -94,7 +94,10 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
     private final int partitionId;
 
     /** Upper bound for scans. */
-    private final Slice upperBound;
+    final Slice upperBound;
+
+    /** Lower bound for backward scans. */
+    private final Slice lowerBound;
 
     /** Partition data column family. */
     final ColumnFamilyHandle partCf;
@@ -124,9 +127,13 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
         this.partitionStartPrefix = compositeKey(tableId, partitionId);
         this.partitionEndPrefix = incrementPrefix(partitionStartPrefix);
 
+        this.lowerBound = new Slice(partitionStartPrefix);
         this.upperBound = new Slice(partitionEndPrefix);
         this.upperBoundReadOpts = new 
ReadOptions().setIterateUpperBound(upperBound);
-        this.scanReadOpts = new 
ReadOptions().setIterateUpperBound(upperBound).setAutoPrefixMode(true);
+        this.scanReadOpts = new ReadOptions()
+                .setIterateLowerBound(lowerBound)
+                .setIterateUpperBound(upperBound)
+                .setAutoPrefixMode(true);
     }
 
     public int partitionId() {
@@ -349,6 +356,6 @@ public final class PartitionDataHelper implements 
ManuallyCloseable {
 
     @Override
     public void close() {
-        RocksUtils.closeAll(scanReadOpts, upperBoundReadOpts, upperBound);
+        RocksUtils.closeAll(scanReadOpts, upperBoundReadOpts, upperBound, 
lowerBound);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 38f6bfd4c8a..d8abb9d0b30 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -96,6 +96,7 @@ import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteBatchWithIndex;
 
@@ -1117,17 +1118,59 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
+    public @Nullable RowId highestRowId() throws StorageException {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBound)
+            ByteBuffer keyBuf = DIRECT_DATA_ID_KEY_BUFFER.get().clear()
                     .position(0)
                     .limit(ROW_PREFIX_SIZE);
 
+            try (RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts)) {
+                it.seekToLast();
+
+                if (!it.isValid()) {
+                    it.status();
+
+                    return null;
+                }
+
+                it.key(keyBuf);
+
+                return getRowId(keyBuf);
+            } catch (RocksDBException e) {
+                throw new IgniteRocksDbException("Error finding highest Row 
ID", e);
+            }
+        });
+    }
+
+    @Override
+    public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId 
upperBoundInclusive, int limit) throws StorageException {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            @Nullable RowId upperBoundExclusive = 
upperBoundInclusive.increment();
+
+            ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBoundInclusive)
+                    .position(0)
+                    .limit(ROW_PREFIX_SIZE);
+            @Nullable ByteBuffer upperBoundBuf;
+            if (upperBoundExclusive != null) {
+                upperBoundBuf = 
allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER);
+                writeRowPrefix(upperBoundBuf, upperBoundExclusive);
+            } else {
+                upperBoundBuf = null;
+            }
+
             List<RowMeta> result = new ArrayList<>();
 
-            try (RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts)) {
+            try (
+                    @Nullable Slice maybeUpperBound = upperBoundBuf != null ? 
new Slice(upperBoundBuf.array()) : null;
+                    ReadOptions readOptions = new ReadOptions()
+                            .setIterateUpperBound(maybeUpperBound != null ? 
maybeUpperBound : helper.upperBound)
+                            .setAutoPrefixMode(true);
+                    RocksIterator it = db.newIterator(helper.partCf, 
readOptions)
+            ) {
                 it.seek(keyBuf);
 
                 for (int i = 0; i < limit; i++) {
@@ -1162,7 +1205,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     it.next();
                 }
             } catch (RocksDBException e) {
-                throw new IgniteRocksDbException("Error finding closest Row 
ID", e);
+                throw new IgniteRocksDbException("Error finding following Row 
IDs", e);
             }
 
             return result;

Reply via email to