This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 19c6dada703 IGNITE-27038 Read rows for indexing in batches (#6964)
19c6dada703 is described below
commit 19c6dada70381fb4b5d07263c5a0d2d4eddac296
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 14 17:48:07 2025 +0400
IGNITE-27038 Read rows for indexing in batches (#6964)
---
.../ignite/internal/index/IndexBuildTask.java | 10 +----
.../internal/storage/MvPartitionStorage.java | 10 +++--
.../apache/ignite/internal/storage/RowMeta.java | 26 ++++++++++++
.../storage/ThreadAssertingMvPartitionStorage.java | 5 ++-
.../AbstractMvPartitionStorageConcurrencyTest.java | 6 +--
.../storage/AbstractMvPartitionStorageTest.java | 29 ++++++-------
.../storage/AbstractMvTableStorageTest.java | 4 +-
.../storage/impl/TestMvPartitionStorage.java | 25 ++++++++++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 11 +++--
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 49 ++++++++++++++--------
10 files changed, 119 insertions(+), 56 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index cbbaee2b13a..89be2e29abb 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -263,13 +263,9 @@ class IndexBuildTask {
List<RowId> rowIds = new ArrayList<>(batchSize);
Map<UUID, CommitPartitionId> transactionsToResolve = new HashMap<>();
- while (rowIds.size() < batchSize && nextRowIdToBuild != null) {
- @Nullable RowMeta row =
partitionStorage.closestRow(nextRowIdToBuild);
-
- if (row == null) {
- break;
- }
+ List<RowMeta> rows = nextRowIdToBuild == null ? List.of() :
partitionStorage.rowsStartingWith(nextRowIdToBuild, batchSize);
+ for (RowMeta row : rows) {
rowIds.add(row.rowId());
if (row.isWriteIntent()) {
@@ -284,8 +280,6 @@ class IndexBuildTask {
);
}
}
-
- nextRowIdToBuild = row.rowId().increment();
}
Map<UUID, CompletableFuture<TxState>> txStateResolveFutures =
transactionsToResolve.entrySet().stream()
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 7342f178cec..ebca93ce275 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.close.ManuallyCloseable;
@@ -264,18 +265,19 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
/**
* Returns a row id, existing in the storage, that's greater or equal than
the lower bound. {@code null} if not found.
*
- * @param lowerBound Lower bound.
+ * @param lowerBound Lower bound (inclusive).
* @throws StorageException If failed to read data from the storage.
*/
@Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
/**
- * Returns a row, existing in the storage, that's greater or equal than
the lower bound. {@code null} if not found.
+ * Returns a batch of rows with subsequent IDs which IDs are greater or
equal than the lower bound.
*
- * @param lowerBound Lower bound.
+ * @param lowerBound Lower bound (inclusive).
+ * @param limit Maximum number of rows to return.
* @throws StorageException If failed to read data from the storage.
*/
- @Nullable RowMeta closestRow(RowId lowerBound) throws StorageException;
+ List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException;
/**
* Returns the head of GC queue.
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
index 51030c8c93f..cfd176c3295 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.storage;
+import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -67,4 +69,28 @@ public class RowMeta {
public int commitPartitionId() {
return commitPartitionId;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RowMeta rowMeta = (RowMeta) o;
+
+ return commitPartitionId == rowMeta.commitPartitionId
+ && Objects.equals(rowId, rowMeta.rowId)
+ && Objects.equals(transactionId, rowMeta.transactionId)
+ && Objects.equals(commitTableOrZoneId,
rowMeta.commitTableOrZoneId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowId, transactionId, commitTableOrZoneId,
commitPartitionId);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index d9d0746e628..30a3419a92b 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage;
import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -151,10 +152,10 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
}
@Override
- public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
assertThreadAllowsToRead();
- return partitionStorage.closestRow(lowerBound);
+ return partitionStorage.rowsStartingWith(lowerBound, limit);
}
@Override
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 6a1d2a49abf..aedf9d5c826 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -154,7 +154,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
abortWrite(ROW_ID, txId);
assertNull(storage.closestRowId(ROW_ID));
- assertNull(storage.closestRow(ROW_ID));
+ assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE),
is(empty()));
}
}
@@ -197,7 +197,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
);
assertNull(storage.closestRowId(ROW_ID));
- assertNull(storage.closestRow(ROW_ID));
+ assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE),
is(empty()));
}
}
@@ -224,7 +224,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
assertNull(storage.closestRowId(ROW_ID));
- assertNull(storage.closestRow(ROW_ID));
+ assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE),
is(empty()));
assertThat(rows, empty());
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 90d0d358db9..ac6a35d6267 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -26,6 +26,7 @@ import static
org.apache.ignite.internal.storage.CommitResultMatcher.equalsToCom
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -1520,7 +1521,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
}
@Test
- void testClosestRow() {
+ void testRowsStartingWith() {
RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
@@ -1531,23 +1532,23 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
addWrite(rowId1, binaryRow, txId);
addWrite(rowId2, binaryRow2, txId);
- assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId0));
- assertRowMetaEquals(expectedRowMeta1,
storage.closestRow(rowId0.increment()));
+ assertThat(storage.rowsStartingWith(rowId0, 0), is(empty()));
- assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId1));
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0, 1));
+ assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId0, Integer.MAX_VALUE));
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0.increment(), 1));
+ assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId0.increment(), Integer.MAX_VALUE));
- assertRowMetaEquals(expectedRowMeta2, storage.closestRow(rowId2));
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId1, 1));
+ assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId1, Integer.MAX_VALUE));
- assertNull(storage.closestRow(rowId2.increment()));
- }
+ assertRowMetasEqual(List.of(expectedRowMeta2),
storage.rowsStartingWith(rowId2, Integer.MAX_VALUE));
- private static void assertRowMetaEquals(RowMeta expected, RowMeta actual) {
- assertNotNull(actual);
+ assertThat(storage.rowsStartingWith(rowId2.increment(), 1),
is(empty()));
+ }
- assertEquals(expected.rowId(), actual.rowId());
- assertEquals(expected.transactionId(), actual.transactionId());
- assertEquals(expected.commitTableOrZoneId(),
actual.commitTableOrZoneId());
- assertEquals(expected.commitPartitionId(), actual.commitPartitionId());
+ private static void assertRowMetasEqual(List<RowMeta> expected,
List<RowMeta> actual) {
+ assertThat(expected, is(actual));
}
@Test
@@ -1558,7 +1559,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
addWrite(rowId, binaryRow, txId);
- assertRowMetaEquals(expectedRowMeta,
storage.closestRow(RowId.lowestRowId(PARTITION_ID)));
+ assertRowMetasEqual(List.of(expectedRowMeta),
storage.rowsStartingWith(RowId.lowestRowId(PARTITION_ID), Integer.MAX_VALUE));
}
@Test
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 0254c1fa5ce..0ad6634f230 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -608,7 +608,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
assertThrows(StorageDestroyedException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageDestroyedException.class, () ->
storage.closestRowId(rowId));
- assertThrows(StorageDestroyedException.class, () ->
storage.closestRow(rowId));
+ assertThrows(StorageDestroyedException.class, () ->
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
}
@Test
@@ -1560,7 +1560,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
assertThrows(StorageRebalanceException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageRebalanceException.class, () ->
storage.scan(clock.now()));
assertThrows(StorageRebalanceException.class, () ->
storage.closestRowId(rowId));
- assertThrows(StorageRebalanceException.class, () ->
storage.closestRow(rowId));
+ assertThrows(StorageRebalanceException.class, () ->
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
return null;
});
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 622ed7fe43c..90b3afc3988 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.storage.impl;
import static java.util.Comparator.comparing;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
@@ -631,9 +633,30 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
checkStorageClosedOrInProcessOfRebalance();
+ List<RowMeta> result = new ArrayList<>();
+ RowId currentLowerBound = lowerBound;
+ for (int i = 0; i < limit; i++) {
+ RowMeta row = closestRow(currentLowerBound);
+
+ if (row == null) {
+ break;
+ }
+
+ result.add(row);
+ currentLowerBound = row.rowId().increment();
+
+ if (currentLowerBound == null) {
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ private @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
Entry<RowId, VersionChain> entry = map.ceilingEntry(lowerBound);
if (entry == null) {
return null;
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 6d0fb0ee99a..28dc542db4d 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -619,22 +619,25 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
@Override
- public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
return busy(() -> {
throwExceptionIfStorageNotInRunnableState();
try (Cursor<VersionChain> cursor =
renewableState.versionChainTree().find(new VersionChainKey(lowerBound), null)) {
- if (cursor.hasNext()) {
+ List<RowMeta> result = new ArrayList<>();
+
+ for (int i = 0; i < limit && cursor.hasNext(); i++) {
VersionChain versionChain = cursor.next();
- return new RowMeta(
+ RowMeta row = new RowMeta(
versionChain.rowId(),
versionChain.transactionId(),
versionChain.commitTableId(),
versionChain.commitPartitionId()
);
+ result.add(row);
}
- return null;
+ return result;
} catch (Exception e) {
throw new StorageException("Error occurred while trying to
read a row id", e);
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3abd1e1953d..38f6bfd4c8a 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -53,6 +53,8 @@ import static
org.apache.ignite.internal.util.ByteUtils.longToBytes;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
@@ -1115,7 +1117,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
@@ -1123,36 +1125,47 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
.position(0)
.limit(ROW_PREFIX_SIZE);
+ List<RowMeta> result = new ArrayList<>();
+
try (RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts)) {
it.seek(keyBuf);
- if (!it.isValid()) {
- it.status();
+ for (int i = 0; i < limit; i++) {
+ if (!it.isValid()) {
+ it.status();
- return null;
- }
+ break;
+ }
- keyBuf.rewind();
- int keyLength = it.key(keyBuf);
- boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+ keyBuf.rewind();
+ int keyLength = it.key(keyBuf);
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
- RowId rowId = getRowId(keyBuf);
+ RowId rowId = getRowId(keyBuf);
- if (isWriteIntent) {
- ByteBuffer transactionState = ByteBuffer.wrap(it.value());
+ RowMeta row;
+ if (isWriteIntent) {
+ ByteBuffer transactionState =
ByteBuffer.wrap(it.value());
- readDataIdFromTxState(transactionState);
- UUID txId = new UUID(transactionState.getLong(),
transactionState.getLong());
- int commitTableId = transactionState.getInt();
- int commitPartitionId =
Short.toUnsignedInt(transactionState.getShort());
+ readDataIdFromTxState(transactionState);
+ UUID txId = new UUID(transactionState.getLong(),
transactionState.getLong());
+ int commitTableId = transactionState.getInt();
+ int commitPartitionId =
Short.toUnsignedInt(transactionState.getShort());
- return new RowMeta(rowId, txId, commitTableId,
commitPartitionId);
- } else {
- return RowMeta.withoutWriteIntent(rowId);
+ row = new RowMeta(rowId, txId, commitTableId,
commitPartitionId);
+ } else {
+ row = RowMeta.withoutWriteIntent(rowId);
+ }
+
+ result.add(row);
+
+ it.next();
}
} catch (RocksDBException e) {
throw new IgniteRocksDbException("Error finding closest Row
ID", e);
}
+
+ return result;
});
}