This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 19c6dada703 IGNITE-27038 Read rows for indexing in batches (#6964)
19c6dada703 is described below

commit 19c6dada70381fb4b5d07263c5a0d2d4eddac296
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 14 17:48:07 2025 +0400

    IGNITE-27038 Read rows for indexing in batches (#6964)
---
 .../ignite/internal/index/IndexBuildTask.java      | 10 +----
 .../internal/storage/MvPartitionStorage.java       | 10 +++--
 .../apache/ignite/internal/storage/RowMeta.java    | 26 ++++++++++++
 .../storage/ThreadAssertingMvPartitionStorage.java |  5 ++-
 .../AbstractMvPartitionStorageConcurrencyTest.java |  6 +--
 .../storage/AbstractMvPartitionStorageTest.java    | 29 ++++++-------
 .../storage/AbstractMvTableStorageTest.java        |  4 +-
 .../storage/impl/TestMvPartitionStorage.java       | 25 ++++++++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 11 +++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 49 ++++++++++++++--------
 10 files changed, 119 insertions(+), 56 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index cbbaee2b13a..89be2e29abb 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -263,13 +263,9 @@ class IndexBuildTask {
         List<RowId> rowIds = new ArrayList<>(batchSize);
         Map<UUID, CommitPartitionId> transactionsToResolve = new HashMap<>();
 
-        while (rowIds.size() < batchSize && nextRowIdToBuild != null) {
-            @Nullable RowMeta row = 
partitionStorage.closestRow(nextRowIdToBuild);
-
-            if (row == null) {
-                break;
-            }
+        List<RowMeta> rows = nextRowIdToBuild == null ? List.of() : 
partitionStorage.rowsStartingWith(nextRowIdToBuild, batchSize);
 
+        for (RowMeta row : rows) {
             rowIds.add(row.rowId());
 
             if (row.isWriteIntent()) {
@@ -284,8 +280,6 @@ class IndexBuildTask {
                     );
                 }
             }
-
-            nextRowIdToBuild = row.rowId().increment();
         }
 
         Map<UUID, CompletableFuture<TxState>> txStateResolveFutures = 
transactionsToResolve.entrySet().stream()
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 7342f178cec..ebca93ce275 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
@@ -264,18 +265,19 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
     /**
      * Returns a row id, existing in the storage, that's greater or equal than 
the lower bound. {@code null} if not found.
      *
-     * @param lowerBound Lower bound.
+     * @param lowerBound Lower bound (inclusive).
      * @throws StorageException If failed to read data from the storage.
      */
     @Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
 
     /**
-     * Returns a row, existing in the storage, that's greater or equal than 
the lower bound. {@code null} if not found.
+     * Returns a batch of rows with subsequent IDs which IDs are greater or 
equal than the lower bound.
      *
-     * @param lowerBound Lower bound.
+     * @param lowerBound Lower bound (inclusive).
+     * @param limit Maximum number of rows to return.
      * @throws StorageException If failed to read data from the storage.
      */
-    @Nullable RowMeta closestRow(RowId lowerBound) throws StorageException;
+    List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException;
 
     /**
      * Returns the head of GC queue.
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
index 51030c8c93f..cfd176c3295 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.storage;
 
+import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -67,4 +69,28 @@ public class RowMeta {
     public int commitPartitionId() {
         return commitPartitionId;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        RowMeta rowMeta = (RowMeta) o;
+
+        return commitPartitionId == rowMeta.commitPartitionId
+                && Objects.equals(rowId, rowMeta.rowId)
+                && Objects.equals(transactionId, rowMeta.transactionId)
+                && Objects.equals(commitTableOrZoneId, 
rowMeta.commitTableOrZoneId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowId, transactionId, commitTableOrZoneId, 
commitPartitionId);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index d9d0746e628..30a3419a92b 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage;
 import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
 import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -151,10 +152,10 @@ public class ThreadAssertingMvPartitionStorage implements 
MvPartitionStorage, Wr
     }
 
     @Override
-    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
         assertThreadAllowsToRead();
 
-        return partitionStorage.closestRow(lowerBound);
+        return partitionStorage.rowsStartingWith(lowerBound, limit);
     }
 
     @Override
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 6a1d2a49abf..aedf9d5c826 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -154,7 +154,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             abortWrite(ROW_ID, txId);
 
             assertNull(storage.closestRowId(ROW_ID));
-            assertNull(storage.closestRow(ROW_ID));
+            assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE), 
is(empty()));
         }
     }
 
@@ -197,7 +197,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             );
 
             assertNull(storage.closestRowId(ROW_ID));
-            assertNull(storage.closestRow(ROW_ID));
+            assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE), 
is(empty()));
         }
     }
 
@@ -224,7 +224,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
 
             assertNull(storage.closestRowId(ROW_ID));
-            assertNull(storage.closestRow(ROW_ID));
+            assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE), 
is(empty()));
 
             assertThat(rows, empty());
         }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 90d0d358db9..ac6a35d6267 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.ignite.internal.storage.CommitResultMatcher.equalsToCom
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -1520,7 +1521,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
     }
 
     @Test
-    void testClosestRow() {
+    void testRowsStartingWith() {
         RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
         RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
         RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
@@ -1531,23 +1532,23 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         addWrite(rowId1, binaryRow, txId);
         addWrite(rowId2, binaryRow2, txId);
 
-        assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId0));
-        assertRowMetaEquals(expectedRowMeta1, 
storage.closestRow(rowId0.increment()));
+        assertThat(storage.rowsStartingWith(rowId0, 0), is(empty()));
 
-        assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId1));
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0, 1));
+        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId0, Integer.MAX_VALUE));
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId0.increment(), 1));
+        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId0.increment(), Integer.MAX_VALUE));
 
-        assertRowMetaEquals(expectedRowMeta2, storage.closestRow(rowId2));
+        assertRowMetasEqual(List.of(expectedRowMeta1), 
storage.rowsStartingWith(rowId1, 1));
+        assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2), 
storage.rowsStartingWith(rowId1, Integer.MAX_VALUE));
 
-        assertNull(storage.closestRow(rowId2.increment()));
-    }
+        assertRowMetasEqual(List.of(expectedRowMeta2), 
storage.rowsStartingWith(rowId2, Integer.MAX_VALUE));
 
-    private static void assertRowMetaEquals(RowMeta expected, RowMeta actual) {
-        assertNotNull(actual);
+        assertThat(storage.rowsStartingWith(rowId2.increment(), 1), 
is(empty()));
+    }
 
-        assertEquals(expected.rowId(), actual.rowId());
-        assertEquals(expected.transactionId(), actual.transactionId());
-        assertEquals(expected.commitTableOrZoneId(), 
actual.commitTableOrZoneId());
-        assertEquals(expected.commitPartitionId(), actual.commitPartitionId());
+    private static void assertRowMetasEqual(List<RowMeta> expected, 
List<RowMeta> actual) {
+        assertThat(expected, is(actual));
     }
 
     @Test
@@ -1558,7 +1559,7 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
 
         addWrite(rowId, binaryRow, txId);
 
-        assertRowMetaEquals(expectedRowMeta, 
storage.closestRow(RowId.lowestRowId(PARTITION_ID)));
+        assertRowMetasEqual(List.of(expectedRowMeta), 
storage.rowsStartingWith(RowId.lowestRowId(PARTITION_ID), Integer.MAX_VALUE));
     }
 
     @Test
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 0254c1fa5ce..0ad6634f230 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -608,7 +608,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvTableStorageTest
         assertThrows(StorageDestroyedException.class, () -> 
storage.scanVersions(rowId));
 
         assertThrows(StorageDestroyedException.class, () -> 
storage.closestRowId(rowId));
-        assertThrows(StorageDestroyedException.class, () -> 
storage.closestRow(rowId));
+        assertThrows(StorageDestroyedException.class, () -> 
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
     }
 
     @Test
@@ -1560,7 +1560,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvTableStorageTest
             assertThrows(StorageRebalanceException.class, () -> 
storage.scanVersions(rowId));
             assertThrows(StorageRebalanceException.class, () -> 
storage.scan(clock.now()));
             assertThrows(StorageRebalanceException.class, () -> 
storage.closestRowId(rowId));
-            assertThrows(StorageRebalanceException.class, () -> 
storage.closestRow(rowId));
+            assertThrows(StorageRebalanceException.class, () -> 
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
 
             return null;
         });
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 622ed7fe43c..90b3afc3988 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.storage.impl;
 import static java.util.Comparator.comparing;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
@@ -631,9 +633,30 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
         checkStorageClosedOrInProcessOfRebalance();
 
+        List<RowMeta> result = new ArrayList<>();
+        RowId currentLowerBound = lowerBound;
+        for (int i = 0; i < limit; i++) {
+            RowMeta row = closestRow(currentLowerBound);
+
+            if (row == null) {
+                break;
+            }
+
+            result.add(row);
+            currentLowerBound = row.rowId().increment();
+
+            if (currentLowerBound == null) {
+                break;
+            }
+        }
+
+        return result;
+    }
+
+    private @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
         Entry<RowId, VersionChain> entry = map.ceilingEntry(lowerBound);
         if (entry == null) {
             return null;
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 6d0fb0ee99a..28dc542db4d 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -619,22 +619,25 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     @Override
-    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
             try (Cursor<VersionChain> cursor = 
renewableState.versionChainTree().find(new VersionChainKey(lowerBound), null)) {
-                if (cursor.hasNext()) {
+                List<RowMeta> result = new ArrayList<>();
+
+                for (int i = 0; i < limit && cursor.hasNext(); i++) {
                     VersionChain versionChain = cursor.next();
-                    return new RowMeta(
+                    RowMeta row = new RowMeta(
                             versionChain.rowId(),
                             versionChain.transactionId(),
                             versionChain.commitTableId(),
                             versionChain.commitPartitionId()
                     );
+                    result.add(row);
                 }
 
-                return null;
+                return result;
             } catch (Exception e) {
                 throw new StorageException("Error occurred while trying to 
read a row id", e);
             }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3abd1e1953d..38f6bfd4c8a 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -53,6 +53,8 @@ import static 
org.apache.ignite.internal.util.ByteUtils.longToBytes;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
@@ -1115,7 +1117,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+    public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws 
StorageException {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
@@ -1123,36 +1125,47 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     .position(0)
                     .limit(ROW_PREFIX_SIZE);
 
+            List<RowMeta> result = new ArrayList<>();
+
             try (RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts)) {
                 it.seek(keyBuf);
 
-                if (!it.isValid()) {
-                    it.status();
+                for (int i = 0; i < limit; i++) {
+                    if (!it.isValid()) {
+                        it.status();
 
-                    return null;
-                }
+                        break;
+                    }
 
-                keyBuf.rewind();
-                int keyLength = it.key(keyBuf);
-                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+                    keyBuf.rewind();
+                    int keyLength = it.key(keyBuf);
+                    boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                RowId rowId = getRowId(keyBuf);
+                    RowId rowId = getRowId(keyBuf);
 
-                if (isWriteIntent) {
-                    ByteBuffer transactionState = ByteBuffer.wrap(it.value());
+                    RowMeta row;
+                    if (isWriteIntent) {
+                        ByteBuffer transactionState = 
ByteBuffer.wrap(it.value());
 
-                    readDataIdFromTxState(transactionState);
-                    UUID txId = new UUID(transactionState.getLong(), 
transactionState.getLong());
-                    int commitTableId = transactionState.getInt();
-                    int commitPartitionId = 
Short.toUnsignedInt(transactionState.getShort());
+                        readDataIdFromTxState(transactionState);
+                        UUID txId = new UUID(transactionState.getLong(), 
transactionState.getLong());
+                        int commitTableId = transactionState.getInt();
+                        int commitPartitionId = 
Short.toUnsignedInt(transactionState.getShort());
 
-                    return new RowMeta(rowId, txId, commitTableId, 
commitPartitionId);
-                } else {
-                    return RowMeta.withoutWriteIntent(rowId);
+                        row = new RowMeta(rowId, txId, commitTableId, 
commitPartitionId);
+                    } else {
+                        row = RowMeta.withoutWriteIntent(rowId);
+                    }
+
+                    result.add(row);
+
+                    it.next();
                 }
             } catch (RocksDBException e) {
                 throw new IgniteRocksDbException("Error finding closest Row 
ID", e);
             }
+
+            return result;
         });
     }
 

Reply via email to