This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f9bfb52907 IGNITE-19570 Write intent resolution for RW transactions 
(#2475)
f9bfb52907 is described below

commit f9bfb52907ca7074c67661e6de7183645add1341
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Aug 24 11:34:54 2023 +0300

    IGNITE-19570 Write intent resolution for RW transactions (#2475)
---
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |  49 +++
 .../table/distributed/raft/PartitionListener.java  |   2 +-
 .../replicator/PartitionReplicaListener.java       | 343 ++++++++++-----------
 3 files changed, 208 insertions(+), 186 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 8d1d9594d1..e222628537 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.runner.app;
 
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,6 +34,8 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -183,6 +187,10 @@ public class ItRaftCommandLeftInLogUntilRestartTest 
extends ClusterPerClassInteg
 
             assertTrue(IgniteTestUtils.waitForCondition(() -> 
appliedIndexNode0.get() == appliedIndexNode1.get(), 10_000));
 
+            RaftGroupService raftGroupService = 
table.internalTable().partitionRaftGroupService(0);
+
+            raftGroupService.peers().forEach(peer -> 
assertThat(raftGroupService.snapshot(peer), willCompleteSuccessfully()));
+
             leaderAndGroupRef.set(new IgniteBiTuple<>(leader, table.tableId() 
+ "_part_0"));
 
             afterBlock.accept(tx);
@@ -193,6 +201,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
         }
 
         stopNodes();
+
+        log.info("Restart the cluster");
+
         startCluster();
 
         var node0Started = (IgniteImpl) CLUSTER_NODES.get(0);
@@ -294,6 +305,44 @@ public class ItRaftCommandLeftInLogUntilRestartTest 
extends ClusterPerClassInteg
                 new RuntimeException(IgniteStringFormatter.format("Cannot 
check a row {}", row), e);
             }
         }
+
+        transferLeadershipToLocalNode(ignite);
+
+        for (Object[] row : dataSet) {
+            try {
+                Tuple txTuple = table.keyValueView().get(null, 
Tuple.create().set("ID", row[0]));
+
+                assertNotNull(txTuple);
+
+                assertEquals(row[1], txTuple.value("NAME"));
+                assertEquals(row[2], txTuple.value("SALARY"));
+            } catch (Exception e) {
+                new RuntimeException(IgniteStringFormatter.format("Cannot 
check a row {} when the local node leader", row), e);
+            }
+        }
+    }
+
+    /**
+     * Transfers the leader to the local node related to the Ignite instance.
+     *
+     * @param ignite Ignite instance.
+     */
+    private static void transferLeadershipToLocalNode(IgniteImpl ignite) {
+        TableImpl table = (TableImpl) 
ignite.tables().table(DEFAULT_TABLE_NAME);
+
+        RaftGroupService raftGroupService = 
table.internalTable().partitionRaftGroupService(0);
+
+        List<Peer> peers = raftGroupService.peers();
+        assertNotNull(peers);
+
+        Peer leader = raftGroupService.leader();
+        assertNotNull(leader);
+
+        Peer localPeer = peers.stream().filter(peer -> 
peer.consistentId().equals(ignite.name())).findFirst().get();
+
+        log.info("Leader is transferring [from={}, to={}]", leader, localPeer);
+
+        assertThat(raftGroupService.transferLeadership(localPeer), 
willCompleteSuccessfully());
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index d2cdede161..9d98c69831 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -123,7 +123,7 @@ public class PartitionListener implements RaftGroupListener 
{
         this.safeTime = safeTime;
         this.storageIndexTracker = storageIndexTracker;
 
-        // TODO: IGNITE-18502 Implement a pending update storage
+        // TODO: IGNITE-18502 Excessive full partition scan on node start
         try (PartitionTimestampCursor cursor = 
partitionDataStorage.scan(HybridTimestamp.MAX_VALUE)) {
             while (cursor.hasNext()) {
                 ReadResult readResult = cursor.next();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 3d805731ed..69ab2d9cfb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -135,6 +135,7 @@ import 
org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -463,24 +464,26 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return safeReadFuture.thenCompose(unused -> 
scanSortedIndex(request, indexStorage));
         }
 
-        return safeReadFuture.thenCompose(unused -> 
retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
+        return safeReadFuture.thenCompose(unused -> 
retrieveExactEntriesUntilCursorEmpty(txId, readTimestamp, cursorId, 
batchCount));
     }
 
     /**
      * Extracts exact amount of entries, or less if cursor is become empty, 
from a cursor on the specific time.
      *
+     * @param txId Transaction id is used for RW only.
      * @param readTimestamp Timestamp of the moment when that moment when the 
data will be extracted.
      * @param cursorId Cursor id.
      * @param count Amount of entries which sill be extracted.
      * @return Result future.
      */
     private CompletableFuture<List<BinaryRow>> 
retrieveExactEntriesUntilCursorEmpty(
-            HybridTimestamp readTimestamp,
+            @Nullable UUID txId,
+            @Nullable HybridTimestamp readTimestamp,
             IgniteUuid cursorId,
             int count
     ) {
         @SuppressWarnings("resource") PartitionTimestampCursor cursor = 
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
-                id -> mvDataStorage.scan(readTimestamp));
+                id -> mvDataStorage.scan(readTimestamp == null ? 
HybridTimestamp.MAX_VALUE : readTimestamp));
 
         var resolutionFuts = new 
ArrayList<CompletableFuture<BinaryRow>>(count);
 
@@ -491,7 +494,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             BinaryRow candidate =
                     newestCommitTimestamp == null || 
!readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp);
 
-            resolutionFuts.add(resolveRoReadResult(readResult, readTimestamp, 
() -> candidate));
+            resolutionFuts.add(resolveReadResult(readResult, txId, 
readTimestamp, () -> candidate));
         }
 
         return allOf(resolutionFuts.toArray(new 
CompletableFuture[0])).thenCompose(unused -> {
@@ -506,7 +509,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
 
             if (rows.size() < count && cursor.hasNext()) {
-                return retrieveExactEntriesUntilCursorEmpty(readTimestamp, 
cursorId, count - rows.size()).thenApply(binaryRows -> {
+                return retrieveExactEntriesUntilCursorEmpty(txId, 
readTimestamp, cursorId, count - rows.size()).thenApply(binaryRows -> {
                     rows.addAll(binaryRows);
 
                     return rows;
@@ -517,6 +520,38 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
+    /**
+     * Extracts exact amount of entries, or less if cursor is become empty, 
from a cursor on the specific time. Use it for RW.
+     *
+     * @param txId Transaction id.
+     * @param cursorId Cursor id.
+     * @return Future finishes with the resolved binary row.
+     */
+    private CompletableFuture<List<BinaryRow>> 
retrieveExactEntriesUntilCursorEmpty(UUID txId, IgniteUuid cursorId, int count) 
{
+        return retrieveExactEntriesUntilCursorEmpty(txId, null, cursorId, 
count).thenCompose(rows -> {
+            if (CollectionUtils.nullOrEmpty(rows)) {
+                return completedFuture(Collections.emptyList());
+            }
+
+            CompletableFuture[] futs = new CompletableFuture[rows.size()];
+
+            for (BinaryRow row : rows) {
+                futs[rows.indexOf(row)] = 
schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
+                        .thenCompose(validationResult -> {
+                            if (validationResult.isSuccessful()) {
+                                return completedFuture(row);
+                            } else {
+                                throw new 
IncompatibleSchemaException("Operation failed because schema "
+                                        + validationResult.fromSchemaVersion() 
+ " is not backward-compatible with "
+                                        + validationResult.toSchemaVersion() + 
" for table " + validationResult.failedTableId());
+                            }
+                        });
+            }
+
+            return CompletableFuture.allOf(futs).thenApply((unused) -> rows);
+        });
+    }
+
     /**
      * Processes single entry request for read only transaction.
      *
@@ -700,34 +735,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.S).thenCompose(tblLock -> {
-            var batchRows = new ArrayList<BinaryRow>(batchCount);
-
-            @SuppressWarnings("resource") PartitionTimestampCursor cursor = 
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
-                    id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
-
-            return continueScanBatchRetrieval(cursor, batchCount, txId, 
batchRows);
-        });
-    }
-
-    private CompletableFuture<List<BinaryRow>> continueScanBatchRetrieval(
-            PartitionTimestampCursor cursor,
-            int batchCount,
-            UUID txId,
-            List<BinaryRow> batchRows
-    ) {
-        if (batchRows.size() < batchCount && cursor.hasNext()) {
-            return resolveAndCheckReadCompatibility(cursor.next(), txId)
-                    .thenCompose(resolvedReadResult -> {
-                        if (resolvedReadResult != null) {
-                            batchRows.add(resolvedReadResult);
-                        }
-
-                        return continueScanBatchRetrieval(cursor, batchCount, 
txId, batchRows);
-                    });
-        }
-
-        return completedFuture(batchRows);
+        return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.S).thenCompose(tblLock ->
+                retrieveExactEntriesUntilCursorEmpty(txId, cursorId, 
batchCount));
     }
 
     /**
@@ -915,28 +924,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         RowId rowId = indexRow.rowId();
 
-        ReadResult readResult = mvDataStorage.read(rowId, timestamp);
-
-        return resolveRoReadResult(readResult, timestamp, () -> {
-            if (readResult.newestCommitTimestamp() == null) {
-                return null;
+        return resolvePlainReadResult(rowId, null, 
timestamp).thenComposeAsync(resolvedReadResult -> {
+            if (resolvedReadResult != null && indexRowMatches(indexRow, 
resolvedReadResult, schemaAwareIndexStorage)) {
+                result.add(resolvedReadResult);
             }
 
-            ReadResult committedReadResult = mvDataStorage.read(rowId, 
readResult.newestCommitTimestamp());
-
-            assert !committedReadResult.isWriteIntent() :
-                    "The result is not committed [rowId=" + rowId + ", 
timestamp="
-                            + readResult.newestCommitTimestamp() + ']';
-
-            return committedReadResult.binaryRow();
-        })
-                .thenComposeAsync(resolvedReadResult -> {
-                    if (resolvedReadResult != null && 
indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) {
-                        result.add(resolvedReadResult);
-                    }
-
-                    return continueReadOnlyIndexScan(schemaAwareIndexStorage, 
cursor, timestamp, batchSize, result);
-                }, scanRequestExecutor);
+            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, 
timestamp, batchSize, result);
+        }, scanRequestExecutor);
     }
 
     /**
@@ -970,28 +964,28 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(null); // End of range reached. 
Exit loop.
                     }
 
-                    return lockManager.acquire(txId, new LockKey(tableId(), 
currentRow.rowId()), LockMode.S)
+                    RowId rowId = currentRow.rowId();
+
+                    return lockManager.acquire(txId, new LockKey(tableId(), 
rowId), LockMode.S)
                             .thenComposeAsync(rowLock -> { // Table row S lock
-                                ReadResult readResult = 
mvDataStorage.read(currentRow.rowId(), HybridTimestamp.MAX_VALUE);
-                                return 
resolveAndCheckReadCompatibility(readResult, txId)
-                                        .thenCompose(resolvedReadResult -> {
-                                            if (resolvedReadResult != null) {
-                                                if 
(indexRowMatches(currentRow, resolvedReadResult, schemaAwareIndexStorage)) {
-                                                    
result.add(resolvedReadResult);
-                                                }
-                                            }
-
-                                            // Proceed scan.
-                                            return continueIndexScan(
-                                                    txId,
-                                                    schemaAwareIndexStorage,
-                                                    indexLocker,
-                                                    indexCursor,
-                                                    batchSize,
-                                                    result,
-                                                    isUpperBoundAchieved
-                                            );
-                                        });
+                                return resolvePlainReadResult(rowId, 
txId).thenCompose(resolvedReadResult -> {
+                                    if (resolvedReadResult != null) {
+                                        if (indexRowMatches(currentRow, 
resolvedReadResult, schemaAwareIndexStorage)) {
+                                            result.add(resolvedReadResult);
+                                        }
+                                    }
+
+                                    // Proceed scan.
+                                    return continueIndexScan(
+                                            txId,
+                                            schemaAwareIndexStorage,
+                                            indexLocker,
+                                            indexCursor,
+                                            batchSize,
+                                            result,
+                                            isUpperBoundAchieved
+                                    );
+                                });
                             }, scanRequestExecutor);
                 });
     }
@@ -1024,34 +1018,29 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         return lockManager.acquire(txId, new LockKey(tableId(), rowId), 
LockMode.S)
                 .thenComposeAsync(rowLock -> { // Table row S lock
-                    ReadResult readResult = mvDataStorage.read(rowId, 
HybridTimestamp.MAX_VALUE);
-                    return resolveAndCheckReadCompatibility(readResult, txId)
-                            .thenCompose(resolvedReadResult -> {
-                                if (resolvedReadResult != null) {
-                                    result.add(resolvedReadResult);
-                                }
+                    return resolvePlainReadResult(rowId, 
txId).thenCompose(resolvedReadResult -> {
+                        if (resolvedReadResult != null) {
+                            result.add(resolvedReadResult);
+                        }
 
-                                // Proceed lookup.
-                                return continueIndexLookup(txId, indexCursor, 
batchSize, result);
-                            });
+                        // Proceed lookup.
+                        return continueIndexLookup(txId, indexCursor, 
batchSize, result);
+                    });
                 }, scanRequestExecutor);
     }
 
-    private CompletableFuture<Void> continueReadOnlyIndexLookup(
-            Cursor<RowId> indexCursor,
-            HybridTimestamp timestamp,
-            int batchSize,
-            List<BinaryRow> result
-    ) {
-        if (result.size() >= batchSize || !indexCursor.hasNext()) {
-            return completedFuture(null);
-        }
-
-        RowId rowId = indexCursor.next();
-
-        ReadResult readResult = mvDataStorage.read(rowId, timestamp);
+    /**
+     * Resolves a result received from a direct storage read.
+     *
+     * @param rowId Row id to resolve.
+     * @param txId Transaction id is used for RW only.
+     * @param timestamp Read timestamp.
+     * @return Future finishes with the resolved binary row.
+     */
+    private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId, 
@Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
+        ReadResult readResult = mvDataStorage.read(rowId, timestamp == null ? 
HybridTimestamp.MAX_VALUE : timestamp);
 
-        return resolveRoReadResult(readResult, timestamp, () -> {
+        return resolveReadResult(readResult, txId, timestamp, () -> {
             if (readResult.newestCommitTimestamp() == null) {
                 return null;
             }
@@ -1063,7 +1052,48 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             + readResult.newestCommitTimestamp() + ']';
 
             return committedReadResult.binaryRow();
-        }).thenComposeAsync(resolvedReadResult -> {
+        });
+    }
+
+    /**
+     * Resolves a result received from a direct storage read. Use it for RW.
+     *
+     * @param rowId Row id.
+     * @param txId Transaction id.
+     * @return Future finishes with the resolved binary row.
+     */
+    private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId, 
UUID txId) {
+        return resolvePlainReadResult(rowId, txId, null).thenCompose(row -> {
+            if (row == null) {
+                return completedFuture(null);
+            }
+
+            return 
schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
+                    .thenApply(validationResult -> {
+                        if (validationResult.isSuccessful()) {
+                            return row;
+                        } else {
+                            throw new IncompatibleSchemaException("Operation 
failed because schema "
+                                    + validationResult.fromSchemaVersion() + " 
is not backward-compatible with "
+                                    + validationResult.toSchemaVersion() + " 
for table " + validationResult.failedTableId());
+                        }
+                    });
+        });
+    }
+
+    private CompletableFuture<Void> continueReadOnlyIndexLookup(
+            Cursor<RowId> indexCursor,
+            HybridTimestamp timestamp,
+            int batchSize,
+            List<BinaryRow> result
+    ) {
+        if (result.size() >= batchSize || !indexCursor.hasNext()) {
+            return completedFuture(null);
+        }
+
+        RowId rowId = indexCursor.next();
+
+        return resolvePlainReadResult(rowId, null, 
timestamp).thenComposeAsync(resolvedReadResult -> {
             if (resolvedReadResult != null) {
                 result.add(resolvedReadResult);
             }
@@ -1326,14 +1356,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         RowId rowId = cursor.next();
 
-        return resolveAndCheckReadCompatibility(mvDataStorage.read(rowId, 
HybridTimestamp.MAX_VALUE), txId)
-                .thenCompose(row -> {
-                    if (row != null) {
-                        return action.apply(rowId, row);
-                    } else {
-                        return continueResolvingByPk(cursor, txId, action);
-                    }
-                });
+        return resolvePlainReadResult(rowId, txId).thenCompose(row -> {
+            if (row != null) {
+                return action.apply(rowId, row);
+            } else {
+                return continueResolvingByPk(cursor, txId, action);
+            }
+        });
 
     }
 
@@ -2246,100 +2275,44 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
     }
 
-    private CompletableFuture<BinaryRow> 
resolveAndCheckReadCompatibility(ReadResult readResult, UUID txId) {
-        BinaryRow row = resolveRwReadResult(readResult, txId);
-
-        if (row == null) {
-            return completedFuture(row);
-        }
-
-        return schemaCompatValidator.validateBackwards(row.schemaVersion(), 
tableId(), txId)
-                .thenCompose(validationResult -> {
-                    if (validationResult.isSuccessful()) {
-                        return completedFuture(row);
-                    } else {
-                        throw new IncompatibleSchemaException("Operation 
failed because schema "
-                                + validationResult.fromSchemaVersion() + " is 
not backward-compatible with "
-                                + validationResult.toSchemaVersion() + " for 
table " + validationResult.failedTableId());
-                    }
-                });
-
-    }
-
-    /**
-     * Resolves a read result for RW transaction.
-     *
-     * @param readResult Read result to resolve.
-     * @param txId Transaction id.
-     * @return Resolved binary row.
-     */
-    @Nullable
-    private BinaryRow resolveRwReadResult(ReadResult readResult, UUID txId) {
-        // This is a safe join (waiting of the future result), because the 
resolution for RW transaction cannot lead to a network request.
-        return resolveReadResult(readResult, txId, null, null).join();
-    }
-
-    /**
-     * Resolves a read result for RO transaction.
-     *
-     * @param readResult Read result to resolve.
-     * @param timestamp Timestamp.
-     * @param lastCommitted Action to get the latest committed row.
-     * @return Future to resolved binary row.
-     */
-    private CompletableFuture<BinaryRow> resolveRoReadResult(
-            ReadResult readResult,
-            HybridTimestamp timestamp,
-            Supplier<BinaryRow> lastCommitted
-    ) {
-        return resolveReadResult(readResult, null, timestamp, lastCommitted);
-    }
-
     /**
      * Resolves read result to the corresponding binary row. Following rules 
are used for read result resolution:
      * <ol>
-     *     <li>If txId is not null (RW request), assert that retrieved tx id 
matches proposed one or that retrieved tx id is null
+     *     <li>If timestamp is null (RW request), assert that retrieved tx id 
matches proposed one or that retrieved tx id is null
      *     and return binary row. Currently it's only possible to retrieve 
write intents if they belong to the same transaction,
      *     locks prevent reading write intents created by others.</li>
-     *     <li>If txId is null (RO request), perform write intent resolution 
if given readResult is a write intent itself
+     *     <li>If timestamp is not null (RO request), perform write intent 
resolution if given readResult is a write intent itself
      *     or return binary row otherwise.</li>
      * </ol>
      *
      * @param readResult Read result to resolve.
      * @param txId Nullable transaction id, should be provided if resolution 
is performed within the context of RW transaction.
      * @param timestamp Timestamp is used in RO transaction only.
-     * @param lastCommitted Action to get the latest committed row, it is used 
in RO transaction only.
+     * @param lastCommitted Action to get the latest committed row.
      * @return Future to resolved binary row.
      */
     private CompletableFuture<BinaryRow> resolveReadResult(
             ReadResult readResult,
             @Nullable UUID txId,
             @Nullable HybridTimestamp timestamp,
-            @Nullable Supplier<BinaryRow> lastCommitted
+            Supplier<BinaryRow> lastCommitted
     ) {
         if (readResult == null) {
             return completedFuture(null);
+        } else if (!readResult.isWriteIntent()) {
+            return completedFuture(readResult.binaryRow());
         } else {
-            if (txId != null) {
-                // RW request.
+            // RW resolution.
+            if (timestamp == null) {
                 UUID retrievedResultTxId = readResult.transactionId();
 
-                if (retrievedResultTxId == null || 
txId.equals(retrievedResultTxId)) {
+                if (txId.equals(retrievedResultTxId)) {
                     // Same transaction - return retrieved value. It may be 
both writeIntent or regular value.
                     return completedFuture(readResult.binaryRow());
-                } else {
-                    // Should never happen, currently, locks prevent reading 
another transaction intents during RW requests.
-                    throw new AssertionError("Mismatched transaction id, 
expectedTxId={" + txId + "},"
-                            + " actualTxId={" + retrievedResultTxId + '}');
-                }
-            } else {
-                if (!readResult.isWriteIntent()) {
-                    return completedFuture(readResult.binaryRow());
                 }
-
-                // RO request.
-                return resolveWriteIntentAsync(readResult, timestamp, 
lastCommitted);
             }
+
+            return resolveWriteIntentAsync(readResult, timestamp, 
lastCommitted);
         }
     }
 
@@ -2359,14 +2332,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return resolveTxState(
                 new TablePartitionId(readResult.commitTableId(), 
readResult.commitPartitionId()),
                 readResult.transactionId(),
-                timestamp)
-                .thenApply(readLastCommitted -> {
-                    if (readLastCommitted) {
-                        return lastCommitted.get();
-                    } else {
-                        return readResult.binaryRow();
-                    }
-                });
+                timestamp
+        ).thenApply(readLastCommitted -> {
+            if (readLastCommitted) {
+                return lastCommitted.get();
+            } else {
+                return readResult.binaryRow();
+            }
+        });
     }
 
     /**
@@ -2375,25 +2348,25 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param commitGrpId Commit partition id.
      * @param txId Transaction id.
      * @param timestamp Timestamp.
-     * @return Future with boolean value, indicating whether the transaction 
was committed before timestamp.
+     * @return The future completes with true when the transaction is not 
completed yet and false otherwise.
      */
     private CompletableFuture<Boolean> resolveTxState(
             TablePartitionId commitGrpId,
             UUID txId,
             HybridTimestamp timestamp
     ) {
-        requireNonNull(timestamp, "timestamp");
+        boolean readLatest = timestamp == null;
 
         return placementDriver.sendMetaRequest(commitGrpId, 
FACTORY.txStateReplicaRequest()
                         .groupId(commitGrpId)
-                        .readTimestampLong(timestamp.longValue())
+                        .readTimestampLong((readLatest ? 
HybridTimestamp.MIN_VALUE : timestamp).longValue())
                         .txId(txId)
                         .build())
                 .thenApply(txMeta -> {
                     if (txMeta == null) {
                         return true;
                     } else if (txMeta.txState() == TxState.COMMITED) {
-                        return txMeta.commitTimestamp().compareTo(timestamp) > 
0;
+                        return !readLatest && 
txMeta.commitTimestamp().compareTo(timestamp) > 0;
                     } else {
                         assert txMeta.txState() == TxState.ABORTED : 
"Unexpected transaction state [state=" + txMeta.txState() + ']';
 

Reply via email to