This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 613f646ea75 IGNITE-28315 Improve index row resolution for RO scan
(#7895)
613f646ea75 is described below
commit 613f646ea750df7975cec447d48201369d944ed0
Author: korlov42 <[email protected]>
AuthorDate: Tue Mar 31 14:55:01 2026 +0300
IGNITE-28315 Improve index row resolution for RO scan (#7895)
---
.../replicator/PartitionReplicaListener.java | 71 +++++++++++++++++++---
1 file changed, 63 insertions(+), 8 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b2e29dfff44..17420561b21 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -49,6 +49,7 @@ import static
org.apache.ignite.internal.tx.TxStateMetaFinishing.castToFinishing
import static
org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
import static
org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txStateResolutionParameters;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.CollectionUtils.view;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
@@ -208,11 +209,13 @@ import
org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaReques
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
import
org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.ErrorGroups.Transactions;
@@ -1309,19 +1312,71 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return nullCompletedFuture();
}
- IndexRow indexRow = cursor.next();
+ List<Pair<IndexRow, CompletableFuture<TimedBinaryRow>>>
indexRowWithWriteIntentFutures = null;
+ int resultStartIndex = result.size();
+
+ while (result.size() < batchSize && cursor.hasNext()) {
+ IndexRow indexRow = cursor.next();
+ RowId rowId = indexRow.rowId();
- RowId rowId = indexRow.rowId();
+ CompletableFuture<@Nullable TimedBinaryRow> resolutionResult =
resolvePlainReadResult(rowId, null, readTimestamp);
- return resolvePlainReadResult(rowId, null,
readTimestamp).thenComposeAsync(resolvedReadResult -> {
- BinaryRow binaryRow = upgrade(binaryRow(resolvedReadResult),
tableVersion);
+ if (resolutionResult.isDone() &&
!resolutionResult.isCompletedExceptionally()) {
+ BinaryRow binaryRow =
upgrade(binaryRow(resolutionResult.join()), tableVersion);
- if (binaryRow != null && indexRowMatches(indexRow, binaryRow,
schemaAwareIndexStorage)) {
- result.add(binaryRow);
+ if (binaryRow != null && indexRowMatches(indexRow, binaryRow,
schemaAwareIndexStorage)) {
+ result.add(binaryRow);
+ }
+ } else {
+ if (indexRowWithWriteIntentFutures == null) {
+ indexRowWithWriteIntentFutures = new ArrayList<>();
+ }
+
+ indexRowWithWriteIntentFutures.add(new Pair<>(indexRow,
resolutionResult));
+ result.add(null); // Placeholder; will be filled or removed
after write intent resolution.
}
+ }
+
+ if (nullOrEmpty(indexRowWithWriteIntentFutures)) {
+ return nullCompletedFuture();
+ }
+
+ List<Pair<IndexRow, CompletableFuture<TimedBinaryRow>>>
finalIndexRowWithWriteIntentFutures = indexRowWithWriteIntentFutures;
+ return CompletableFutures.allOf(view(indexRowWithWriteIntentFutures,
Pair::getSecond))
+ .thenComposeAsync(unused -> {
+ int futureIdx = 0;
+
+ ListIterator<BinaryRow> it =
result.listIterator(resultStartIndex);
- return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor,
readTimestamp, batchSize, result, tableVersion);
- }, scanRequestExecutor);
+ while (it.hasNext()) {
+ BinaryRow row = it.next();
+
+ if (row == null) {
+ Pair<IndexRow, CompletableFuture<TimedBinaryRow>>
indexRowWithWriteIntent
+ =
finalIndexRowWithWriteIntentFutures.get(futureIdx);
+ IndexRow indexRow =
indexRowWithWriteIntent.getFirst();
+ TimedBinaryRow resolved =
indexRowWithWriteIntent.getSecond().join();
+ futureIdx++;
+
+ BinaryRow binaryRow = upgrade(binaryRow(resolved),
tableVersion);
+
+ if (binaryRow != null && indexRowMatches(indexRow,
binaryRow, schemaAwareIndexStorage)) {
+ it.set(binaryRow);
+ } else {
+ it.remove();
+ }
+ }
+ }
+
+ assert futureIdx ==
finalIndexRowWithWriteIntentFutures.size()
+ : "Expected " +
finalIndexRowWithWriteIntentFutures.size() + " iterations but was " + futureIdx;
+
+ if (result.size() < batchSize && cursor.hasNext()) {
+ return
continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp,
batchSize, result, tableVersion);
+ }
+
+ return nullCompletedFuture();
+ }, scanRequestExecutor);
}
/**