This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 73cbbddc18d IGNITE-27048 Stop partition scan on index build early
(#6976)
73cbbddc18d is described below
commit 73cbbddc18d79cfb30101e89f70d0e0012896ff5
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Nov 17 17:35:09 2025 +0400
IGNITE-27048 Stop partition scan on index build early (#6976)
---
.../ignite/internal/index/IndexBuildTask.java | 19 +++++---
.../ignite/internal/pagememory/tree/BplusTree.java | 19 ++++----
.../internal/pagememory/tree/IgniteTree.java | 12 ++---
.../internal/storage/MvPartitionStorage.java | 12 ++++-
.../storage/ThreadAssertingMvPartitionStorage.java | 11 ++++-
.../AbstractMvPartitionStorageConcurrencyTest.java | 12 +++--
.../storage/AbstractMvPartitionStorageGcTest.java | 2 +
.../storage/AbstractMvPartitionStorageTest.java | 57 +++++++++++++++++-----
.../storage/AbstractMvTableStorageTest.java | 12 ++++-
.../storage/impl/TestMvPartitionStorage.java | 13 +++--
.../mv/AbstractPageMemoryMvPartitionStorage.java | 20 +++++++-
.../storage/rocksdb/PartitionDataHelper.java | 13 +++--
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 51 +++++++++++++++++--
13 files changed, 200 insertions(+), 53 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 89be2e29abb..22770d8cfee 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.index;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
@@ -166,7 +167,8 @@ class IndexBuildTask {
LOG.info("Start building the index: [{}]", createCommonIndexInfo());
try {
- supplyAsync(this::handleNextBatch, executor)
+ supplyAsync(partitionStorage::highestRowId, executor)
+ .thenApplyAsync(this::handleNextBatch, executor)
.thenCompose(Function.identity())
.whenComplete((unused, throwable) -> {
if (throwable != null) {
@@ -220,13 +222,13 @@ class IndexBuildTask {
return taskFuture;
}
- private CompletableFuture<Void> handleNextBatch() {
+ private CompletableFuture<Void> handleNextBatch(@Nullable RowId
highestRowId) {
if (!enterBusy()) {
return nullCompletedFuture();
}
try {
- return createBatchToIndex()
+ return createBatchToIndex(highestRowId)
.thenCompose(batch -> {
return replicaService.invoke(node,
createBuildIndexReplicaRequest(batch, initialOperationTimestamp));
})
@@ -247,7 +249,7 @@ class IndexBuildTask {
return
CompletableFutures.<Void>nullCompletedFuture();
}
- return handleNextBatch();
+ return handleNextBatch(highestRowId);
}, executor)
.thenCompose(Function.identity());
} catch (Throwable t) {
@@ -257,13 +259,18 @@ class IndexBuildTask {
}
}
- private CompletableFuture<BatchToIndex> createBatchToIndex() {
+ private CompletableFuture<BatchToIndex> createBatchToIndex(@Nullable RowId
highestRowId) {
+ if (highestRowId == null) {
+ return completedFuture(new BatchToIndex(List.of(), Set.of()));
+ }
+
RowId nextRowIdToBuild = indexStorage.getNextRowIdToBuild();
List<RowId> rowIds = new ArrayList<>(batchSize);
Map<UUID, CommitPartitionId> transactionsToResolve = new HashMap<>();
- List<RowMeta> rows = nextRowIdToBuild == null ? List.of() :
partitionStorage.rowsStartingWith(nextRowIdToBuild, batchSize);
+ List<RowMeta> rows = nextRowIdToBuild == null ? List.of()
+ : partitionStorage.rowsStartingWith(nextRowIdToBuild,
highestRowId, batchSize);
for (RowMeta row : rows) {
rowIds.add(row.rowId());
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index c9d7c1318a1..4995258d104 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1215,32 +1215,33 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
}
@Override
- public final Cursor<T> find(@Nullable L lower, @Nullable L upper) throws
IgniteInternalCheckedException {
- return find(lower, upper, null);
+ public final Cursor<T> find(@Nullable L lowerInclusive, @Nullable L
upperInclusive) throws IgniteInternalCheckedException {
+ return find(lowerInclusive, upperInclusive, null);
}
@Override
- public final Cursor<T> find(@Nullable L lower, @Nullable L upper,
@Nullable Object x) throws IgniteInternalCheckedException {
- return find(lower, upper, null, x);
+ public final Cursor<T> find(@Nullable L lowerInclusive, @Nullable L
upperInclusive, @Nullable Object x)
+ throws IgniteInternalCheckedException {
+ return find(lowerInclusive, upperInclusive, null, x);
}
/**
* Getting the cursor through the rows of the tree.
*
- * @param lower Lower bound inclusive or {@code null} if unbounded.
- * @param upper Upper bound inclusive or {@code null} if unbounded.
+ * @param lowerInclusive Lower bound inclusive or {@code null} if
unbounded.
+ * @param upperInclusive Upper bound inclusive or {@code null} if
unbounded.
* @param c Tree row closure.
* @param x Implementation specific argument, {@code null} always means
that we need to return full detached data row.
* @return Cursor.
* @throws IgniteInternalCheckedException If failed.
*/
public <R> Cursor<R> find(
- @Nullable L lower,
- @Nullable L upper,
+ @Nullable L lowerInclusive,
+ @Nullable L upperInclusive,
@Nullable TreeRowMapClosure<L, T, R> c,
@Nullable Object x
) throws IgniteInternalCheckedException {
- return find(lower, upper, true, true, c, x);
+ return find(lowerInclusive, upperInclusive, true, true, c, x);
}
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
index 064b4eca7e8..e89943f6c10 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
@@ -59,21 +59,21 @@ public interface IgniteTree<L, T> {
/**
* Returns a cursor from lower to upper bounds inclusive.
*
- * @param lower Lower bound or {@code null} if unbounded.
- * @param upper Upper bound or {@code null} if unbounded.
+ * @param lowerInclusive Lower bound (inclusive) or {@code null} if
unbounded.
+ * @param upperInclusive Upper bound (inclusive) or {@code null} if
unbounded.
* @throws IgniteInternalCheckedException If failed.
*/
- Cursor<T> find(@Nullable L lower, @Nullable L upper) throws
IgniteInternalCheckedException;
+ Cursor<T> find(@Nullable L lowerInclusive, @Nullable L upperInclusive)
throws IgniteInternalCheckedException;
/**
* Returns a cursor from lower to upper bounds inclusive.
*
- * @param lower Lower bound or {@code null} if unbounded.
- * @param upper Upper bound or {@code null} if unbounded.
+ * @param lowerInclusive Lower bound (inclusive) or {@code null} if
unbounded.
+ * @param upperInclusive Upper bound (inclusive) or {@code null} if
unbounded.
* @param x Implementation specific argument, {@code null} always means
that we need to return full detached data row.
* @throws IgniteInternalCheckedException If failed.
*/
- Cursor<T> find(@Nullable L lower, @Nullable L upper, @Nullable Object x)
throws IgniteInternalCheckedException;
+ Cursor<T> find(@Nullable L lowerInclusive, @Nullable L upperInclusive,
@Nullable Object x) throws IgniteInternalCheckedException;
/**
* Returns a value mapped to the lowest key, or {@code null} if tree is
empty.
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 7a9d70d0e87..42d13340b21 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -285,14 +285,22 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
*/
@Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
+ /**
+ * Returns the greatest row ID, existing in the storage. {@code null} if
the storage is empty.
+ *
+ * @throws StorageException If failed to read data from the storage.
+ */
+ @Nullable RowId highestRowId() throws StorageException;
+
/**
* Returns a batch of rows with subsequent IDs which IDs are greater or
equal than the lower bound.
*
- * @param lowerBound Lower bound (inclusive).
+ * @param lowerBoundInclusive Lower bound (inclusive).
+ * @param upperBoundInclusive Upper bound (inclusive).
* @param limit Maximum number of rows to return.
* @throws StorageException If failed to read data from the storage.
*/
- List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException;
+ List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId
upperBoundInclusive, int limit) throws StorageException;
/**
* Returns the head of GC queue.
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 30a3419a92b..89f8fb0be44 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -152,10 +152,17 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
}
@Override
- public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
+ public @Nullable RowId highestRowId() throws StorageException {
assertThreadAllowsToRead();
- return partitionStorage.rowsStartingWith(lowerBound, limit);
+ return partitionStorage.highestRowId();
+ }
+
+ @Override
+ public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId
upperBoundInclusive, int limit) throws StorageException {
+ assertThreadAllowsToRead();
+
+ return partitionStorage.rowsStartingWith(lowerBoundInclusive,
upperBoundInclusive, limit);
}
@Override
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index aedf9d5c826..84a84571133 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -154,7 +154,9 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
abortWrite(ROW_ID, txId);
assertNull(storage.closestRowId(ROW_ID));
- assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE),
is(empty()));
+ assertThat(storage.rowsStartingWith(ROW_ID,
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE), is(empty()));
+
+ assertNull(storage.highestRowId());
}
}
@@ -197,7 +199,9 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
);
assertNull(storage.closestRowId(ROW_ID));
- assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE),
is(empty()));
+ assertThat(storage.rowsStartingWith(ROW_ID,
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE), is(empty()));
+
+ assertNull(storage.highestRowId());
}
}
@@ -224,7 +228,9 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
assertNull(storage.closestRowId(ROW_ID));
- assertThat(storage.rowsStartingWith(ROW_ID, Integer.MAX_VALUE),
is(empty()));
+ assertThat(storage.rowsStartingWith(ROW_ID,
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE), is(empty()));
+
+ assertNull(storage.highestRowId());
assertThat(rows, empty());
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index 96a960afe7e..913e60f5c07 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -86,6 +86,7 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
// Let's check that the storage is empty.
assertNull(storage.closestRowId(ROW_ID));
+ assertNull(storage.highestRowId());
}
@Test
@@ -106,6 +107,7 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
// Let's check that the storage is empty.
assertNull(storage.closestRowId(ROW_ID));
+ assertNull(storage.highestRowId());
}
@Test
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index ac6a35d6267..812c390282a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1520,8 +1520,26 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
assertNull(storage.closestRowId(rowId2.increment()));
}
+ @Test
+ void testHighestRowId() {
+ RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
+ RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
+
+ assertThat(storage.highestRowId(), is(nullValue()));
+
+ addWrite(rowId1, binaryRow, txId);
+
+ assertThat(storage.highestRowId(), is(rowId1));
+
+ addWrite(rowId2, binaryRow2, txId);
+
+ assertThat(storage.highestRowId(), is(rowId2));
+ }
+
@Test
void testRowsStartingWith() {
+ RowId highestRowId = RowId.highestRowId(PARTITION_ID);
+
RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
@@ -1532,23 +1550,37 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
addWrite(rowId1, binaryRow, txId);
addWrite(rowId2, binaryRow2, txId);
- assertThat(storage.rowsStartingWith(rowId0, 0), is(empty()));
+ // Limiting with 0 provides no rows.
+ assertThat(storage.rowsStartingWith(rowId0, highestRowId, 0),
is(empty()));
- assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0, 1));
- assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId0, Integer.MAX_VALUE));
- assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0.increment(), 1));
- assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId0.increment(), Integer.MAX_VALUE));
+ // Starting with rowId0 (which does not exist in the storage).
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0, highestRowId, 1));
+ assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId0, highestRowId, Integer.MAX_VALUE));
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0.increment(), highestRowId, 1));
+ assertRowMetasEqual(
+ List.of(expectedRowMeta1, expectedRowMeta2),
+ storage.rowsStartingWith(rowId0.increment(), highestRowId,
Integer.MAX_VALUE)
+ );
+
+ // Starting with rowId1 (which exists in the storage).
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId1, highestRowId, 1));
+ assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId1, highestRowId, Integer.MAX_VALUE));
- assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId1, 1));
- assertRowMetasEqual(List.of(expectedRowMeta1, expectedRowMeta2),
storage.rowsStartingWith(rowId1, Integer.MAX_VALUE));
+ // Starting with rowId2 (which exists in the storage).
+ assertRowMetasEqual(List.of(expectedRowMeta2),
storage.rowsStartingWith(rowId2, highestRowId, Integer.MAX_VALUE));
- assertRowMetasEqual(List.of(expectedRowMeta2),
storage.rowsStartingWith(rowId2, Integer.MAX_VALUE));
+ // Starting with a row ID that is greater than the greatest row ID in
the storage.
+ assertThat(storage.rowsStartingWith(rowId2.increment(), highestRowId,
Integer.MAX_VALUE), is(empty()));
- assertThat(storage.rowsStartingWith(rowId2.increment(), 1),
is(empty()));
+ // Upper bound limits the search
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId0, rowId1, Integer.MAX_VALUE));
+
+ // Upper bound is inclusive.
+ assertRowMetasEqual(List.of(expectedRowMeta1),
storage.rowsStartingWith(rowId1, rowId1, Integer.MAX_VALUE));
}
private static void assertRowMetasEqual(List<RowMeta> expected,
List<RowMeta> actual) {
- assertThat(expected, is(actual));
+ assertThat(actual, is(expected));
}
@Test
@@ -1559,7 +1591,10 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
addWrite(rowId, binaryRow, txId);
- assertRowMetasEqual(List.of(expectedRowMeta),
storage.rowsStartingWith(RowId.lowestRowId(PARTITION_ID), Integer.MAX_VALUE));
+ assertRowMetasEqual(
+ List.of(expectedRowMeta),
+ storage.rowsStartingWith(RowId.lowestRowId(PARTITION_ID),
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE)
+ );
}
@Test
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 0ad6634f230..26937ec879f 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -608,7 +608,11 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
assertThrows(StorageDestroyedException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageDestroyedException.class, () ->
storage.closestRowId(rowId));
- assertThrows(StorageDestroyedException.class, () ->
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
+ assertThrows(
+ StorageDestroyedException.class,
+ () -> storage.rowsStartingWith(rowId,
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE)
+ );
+ assertThrows(StorageDestroyedException.class, () ->
assertNull(storage.highestRowId()));
}
@Test
@@ -1560,7 +1564,11 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
assertThrows(StorageRebalanceException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageRebalanceException.class, () ->
storage.scan(clock.now()));
assertThrows(StorageRebalanceException.class, () ->
storage.closestRowId(rowId));
- assertThrows(StorageRebalanceException.class, () ->
storage.rowsStartingWith(rowId, Integer.MAX_VALUE));
+ assertThrows(
+ StorageRebalanceException.class,
+ () -> storage.rowsStartingWith(rowId,
RowId.highestRowId(PARTITION_ID), Integer.MAX_VALUE)
+ );
+ assertThrows(StorageRebalanceException.class, () ->
storage.highestRowId());
return null;
});
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index b75bc31b249..de05824cbe0 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -657,15 +657,22 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
+ public @Nullable RowId highestRowId() throws StorageException {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ return map.floorKey(RowId.highestRowId(partitionId));
+ }
+
+ @Override
+ public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId
upperBoundInclusive, int limit) throws StorageException {
checkStorageClosedOrInProcessOfRebalance();
List<RowMeta> result = new ArrayList<>();
- RowId currentLowerBound = lowerBound;
+ RowId currentLowerBound = lowerBoundInclusive;
for (int i = 0; i < limit; i++) {
RowMeta row = closestRow(currentLowerBound);
- if (row == null) {
+ if (row == null || row.rowId().compareTo(upperBoundInclusive) > 0)
{
break;
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 28dc542db4d..495f0b2eb5f 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -619,11 +619,27 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
@Override
- public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
+ public @Nullable RowId highestRowId() throws StorageException {
return busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- try (Cursor<VersionChain> cursor =
renewableState.versionChainTree().find(new VersionChainKey(lowerBound), null)) {
+ try {
+ VersionChain lastChain =
renewableState.versionChainTree().findLast();
+ return lastChain == null ? null : lastChain.rowId();
+ } catch (Exception e) {
+ throw new StorageException("Error occurred while trying to
read a row id", e);
+ }
+ });
+ }
+
+ @Override
+ public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId
upperBoundInclusive, int limit) throws StorageException {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ VersionChainKey lowerBoundKey = new
VersionChainKey(lowerBoundInclusive);
+ VersionChainKey upperBoundKey = new
VersionChainKey(upperBoundInclusive);
+ try (Cursor<VersionChain> cursor =
renewableState.versionChainTree().find(lowerBoundKey, upperBoundKey)) {
List<RowMeta> result = new ArrayList<>();
for (int i = 0; i < limit && cursor.hasNext(); i++) {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index 194a25d5838..9beeaf560d8 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -94,7 +94,10 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
private final int partitionId;
/** Upper bound for scans. */
- private final Slice upperBound;
+ final Slice upperBound;
+
+ /** Lower bound for backward scans. */
+ private final Slice lowerBound;
/** Partition data column family. */
final ColumnFamilyHandle partCf;
@@ -124,9 +127,13 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
this.partitionStartPrefix = compositeKey(tableId, partitionId);
this.partitionEndPrefix = incrementPrefix(partitionStartPrefix);
+ this.lowerBound = new Slice(partitionStartPrefix);
this.upperBound = new Slice(partitionEndPrefix);
this.upperBoundReadOpts = new
ReadOptions().setIterateUpperBound(upperBound);
- this.scanReadOpts = new
ReadOptions().setIterateUpperBound(upperBound).setAutoPrefixMode(true);
+ this.scanReadOpts = new ReadOptions()
+ .setIterateLowerBound(lowerBound)
+ .setIterateUpperBound(upperBound)
+ .setAutoPrefixMode(true);
}
public int partitionId() {
@@ -349,6 +356,6 @@ public final class PartitionDataHelper implements
ManuallyCloseable {
@Override
public void close() {
- RocksUtils.closeAll(scanReadOpts, upperBoundReadOpts, upperBound);
+ RocksUtils.closeAll(scanReadOpts, upperBoundReadOpts, upperBound,
lowerBound);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 38f6bfd4c8a..d8abb9d0b30 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -96,6 +96,7 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
@@ -1117,17 +1118,59 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public List<RowMeta> rowsStartingWith(RowId lowerBound, int limit) throws
StorageException {
+ public @Nullable RowId highestRowId() throws StorageException {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBound)
+ ByteBuffer keyBuf = DIRECT_DATA_ID_KEY_BUFFER.get().clear()
.position(0)
.limit(ROW_PREFIX_SIZE);
+ try (RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts)) {
+ it.seekToLast();
+
+ if (!it.isValid()) {
+ it.status();
+
+ return null;
+ }
+
+ it.key(keyBuf);
+
+ return getRowId(keyBuf);
+ } catch (RocksDBException e) {
+ throw new IgniteRocksDbException("Error finding highest Row
ID", e);
+ }
+ });
+ }
+
+ @Override
+ public List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId
upperBoundInclusive, int limit) throws StorageException {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ @Nullable RowId upperBoundExclusive =
upperBoundInclusive.increment();
+
+ ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBoundInclusive)
+ .position(0)
+ .limit(ROW_PREFIX_SIZE);
+ @Nullable ByteBuffer upperBoundBuf;
+ if (upperBoundExclusive != null) {
+ upperBoundBuf =
allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER);
+ writeRowPrefix(upperBoundBuf, upperBoundExclusive);
+ } else {
+ upperBoundBuf = null;
+ }
+
List<RowMeta> result = new ArrayList<>();
- try (RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts)) {
+ try (
+ @Nullable Slice maybeUpperBound = upperBoundBuf != null ?
new Slice(upperBoundBuf.array()) : null;
+ ReadOptions readOptions = new ReadOptions()
+ .setIterateUpperBound(maybeUpperBound != null ?
maybeUpperBound : helper.upperBound)
+ .setAutoPrefixMode(true);
+ RocksIterator it = db.newIterator(helper.partCf,
readOptions)
+ ) {
it.seek(keyBuf);
for (int i = 0; i < limit; i++) {
@@ -1162,7 +1205,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
it.next();
}
} catch (RocksDBException e) {
- throw new IgniteRocksDbException("Error finding closest Row
ID", e);
+ throw new IgniteRocksDbException("Error finding following Row
IDs", e);
}
return result;