This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 613f646ea75 IGNITE-28315 Improve index row resolution for RO scan 
(#7895)
613f646ea75 is described below

commit 613f646ea750df7975cec447d48201369d944ed0
Author: korlov42 <[email protected]>
AuthorDate: Tue Mar 31 14:55:01 2026 +0300

    IGNITE-28315 Improve index row resolution for RO scan (#7895)
---
 .../replicator/PartitionReplicaListener.java       | 71 +++++++++++++++++++---
 1 file changed, 63 insertions(+), 8 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b2e29dfff44..17420561b21 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -49,6 +49,7 @@ import static 
org.apache.ignite.internal.tx.TxStateMetaFinishing.castToFinishing
 import static 
org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
 import static 
org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txStateResolutionParameters;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.CollectionUtils.view;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
@@ -208,11 +209,13 @@ import 
org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaReques
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
 import 
org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
@@ -1309,19 +1312,71 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             return nullCompletedFuture();
         }
 
-        IndexRow indexRow = cursor.next();
+        List<Pair<IndexRow, CompletableFuture<TimedBinaryRow>>> 
indexRowWithWriteIntentFutures = null;
+        int resultStartIndex = result.size();
+
+        while (result.size() < batchSize && cursor.hasNext()) {
+            IndexRow indexRow = cursor.next();
+            RowId rowId = indexRow.rowId();
 
-        RowId rowId = indexRow.rowId();
+            CompletableFuture<@Nullable TimedBinaryRow> resolutionResult = 
resolvePlainReadResult(rowId, null, readTimestamp);
 
-        return resolvePlainReadResult(rowId, null, 
readTimestamp).thenComposeAsync(resolvedReadResult -> {
-            BinaryRow binaryRow = upgrade(binaryRow(resolvedReadResult), 
tableVersion);
+            if (resolutionResult.isDone() && 
!resolutionResult.isCompletedExceptionally()) {
+                BinaryRow binaryRow = 
upgrade(binaryRow(resolutionResult.join()), tableVersion);
 
-            if (binaryRow != null && indexRowMatches(indexRow, binaryRow, 
schemaAwareIndexStorage)) {
-                result.add(binaryRow);
+                if (binaryRow != null && indexRowMatches(indexRow, binaryRow, 
schemaAwareIndexStorage)) {
+                    result.add(binaryRow);
+                }
+            } else {
+                if (indexRowWithWriteIntentFutures == null) {
+                    indexRowWithWriteIntentFutures = new ArrayList<>();
+                }
+
+                indexRowWithWriteIntentFutures.add(new Pair<>(indexRow, 
resolutionResult));
+                result.add(null); // Placeholder; will be filled or removed 
after write intent resolution.
             }
+        }
+
+        if (nullOrEmpty(indexRowWithWriteIntentFutures)) {
+            return nullCompletedFuture();
+        }
+
+        List<Pair<IndexRow, CompletableFuture<TimedBinaryRow>>> 
finalIndexRowWithWriteIntentFutures = indexRowWithWriteIntentFutures;
+        return CompletableFutures.allOf(view(indexRowWithWriteIntentFutures, 
Pair::getSecond))
+                .thenComposeAsync(unused -> {
+                    int futureIdx = 0;
+
+                    ListIterator<BinaryRow> it = 
result.listIterator(resultStartIndex);
 
-            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, 
readTimestamp, batchSize, result, tableVersion);
-        }, scanRequestExecutor);
+                    while (it.hasNext()) {
+                        BinaryRow row = it.next();
+
+                        if (row == null) {
+                            Pair<IndexRow, CompletableFuture<TimedBinaryRow>> 
indexRowWithWriteIntent
+                                    = 
finalIndexRowWithWriteIntentFutures.get(futureIdx);
+                            IndexRow indexRow = 
indexRowWithWriteIntent.getFirst();
+                            TimedBinaryRow resolved = 
indexRowWithWriteIntent.getSecond().join();
+                            futureIdx++;
+
+                            BinaryRow binaryRow = upgrade(binaryRow(resolved), 
tableVersion);
+
+                            if (binaryRow != null && indexRowMatches(indexRow, 
binaryRow, schemaAwareIndexStorage)) {
+                                it.set(binaryRow);
+                            } else {
+                                it.remove();
+                            }
+                        }
+                    }
+
+                    assert futureIdx == 
finalIndexRowWithWriteIntentFutures.size()
+                            : "Expected " + 
finalIndexRowWithWriteIntentFutures.size() + " iterations but was " + futureIdx;
+
+                    if (result.size() < batchSize && cursor.hasNext()) {
+                        return 
continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, 
batchSize, result, tableVersion);
+                    }
+
+                    return nullCompletedFuture();
+                }, scanRequestExecutor);
     }
 
     /**

Reply via email to