This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 41e09209ef IGNITE-20482 Simplify write intent resolution in 
PartitionReplicaListener (#2625)
41e09209ef is described below

commit 41e09209ef13bea448d183cb39586f4f88237385
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Mon Sep 25 14:45:18 2023 +0300

    IGNITE-20482 Simplify write intent resolution in PartitionReplicaListener 
(#2625)
---
 .../apache/ignite/internal/util/IgniteUtils.java   |  10 ++
 .../ignite/internal/table/ItColocationTest.java    |   2 +-
 .../table/distributed/StorageUpdateHandler.java    | 105 ++++++++--------
 .../replicator/PartitionReplicaListener.java       | 134 +++++++++++----------
 .../org/apache/ignite/internal/tx/TxState.java     |   6 +
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   6 +-
 6 files changed, 135 insertions(+), 128 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 72e9fba7d7..1ea9fed961 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -913,6 +913,16 @@ public class IgniteUtils {
         return result;
     }
 
+    /**
+     * Find the first element in the given list.
+     *
+     * @param list List.
+     * @return Optional containing element (if present).
+     */
+    public static <T> Optional<T> findFirst(List<T> list) {
+        return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0));
+    }
+
     /**
      * Find any element in given collection.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 020b1cf550..362ba76c8d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -144,7 +144,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
         ) {
             @Override
             public CompletableFuture<Void> finish(
-                    HybridTimestampTracker timestampTracker,
+                    HybridTimestampTracker observableTimestampTracker,
                     TablePartitionId commitPartition,
                     ClusterNode recipientNode,
                     Long term,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index cd6e8518fb..f029e779b8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -265,7 +265,7 @@ public class StorageUpdateHandler {
      * @param commitTimestamp Commit timestamp. Not {@code null} if {@code 
commit} is {@code true}.
      */
     public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable 
HybridTimestamp commitTimestamp) {
-        handleTransactionCleanup(txId, commit, commitTimestamp, () -> {});
+        handleTransactionCleanup(txId, commit, commitTimestamp, null);
     }
 
     /**
@@ -276,86 +276,75 @@ public class StorageUpdateHandler {
      * @param commitTimestamp Commit timestamp. Not {@code null} if {@code 
commit} is {@code true}.
      * @param onApplication On application callback.
      */
-    public void handleTransactionCleanup(UUID txId, boolean commit,
-            @Nullable HybridTimestamp commitTimestamp, Runnable onApplication) 
{
+    public void handleTransactionCleanup(
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            @Nullable Runnable onApplication) {
         Set<RowId> pendingRowIds = pendingRows.removePendingRowIds(txId);
 
-        handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, 
onApplication);
-    }
+        // `pendingRowIds` might be empty when we have already cleaned up the 
storage for this transaction,
+        // for example, when primary (PartitionReplicaListener) is collocated 
with the raft node (PartitionListener)
+        // and one of them has already processed the cleanup request, since 
they share the instance of this class.
+        // However, we still need to run `onApplication` if it is not null, 
e.g. called in TxCleanupCommand handler in PartitionListener
+        // to update indexes. In this case it should be executed under 
`runConsistently`.
+        if (!pendingRowIds.isEmpty() || onApplication != null) {
+            storage.runConsistently(locker -> {
+                pendingRowIds.forEach(locker::lock);
+
+                if (commit) {
+                    performCommitWrite(pendingRowIds, commitTimestamp);
+                } else {
+                    performAbortWrite(pendingRowIds);
+                }
 
-    /**
-     * Handles the cleanup of a transaction. The transaction is either 
committed or rolled back.
-     *
-     * @param pendingRowIds Row ids of write-intents to be finalized, either 
committed or rolled back.
-     * @param commit Commit flag. {@code true} if transaction is committed, 
{@code false} otherwise.
-     * @param commitTimestamp Commit timestamp. Not {@code null} if {@code 
commit} is {@code true}.
-     * @param onApplication On application callback.
-     */
-    private void handleTransactionCleanup(Set<RowId> pendingRowIds, boolean 
commit,
-            @Nullable HybridTimestamp commitTimestamp, Runnable onApplication) 
{
-        if (commit) {
-            handleTransactionCommit(pendingRowIds, commitTimestamp, 
onApplication);
-        } else {
-            handleTransactionAbortion(pendingRowIds, onApplication);
+                if (onApplication != null) {
+                    onApplication.run();
+                }
+
+                return null;
+            });
         }
     }
 
     /**
-     * Handles the commit of a transaction.
+     * Commit write intents created by the provided transaction.
      *
      * @param pendingRowIds Row ids of write-intents to be committed.
      * @param commitTimestamp Commit timestamp.
-     * @param onApplication On application callback.
      */
-    private void handleTransactionCommit(Set<RowId> pendingRowIds, 
HybridTimestamp commitTimestamp, Runnable onApplication) {
-        storage.runConsistently(locker -> {
-            pendingRowIds.forEach(locker::lock);
-
-            pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
commitTimestamp));
-
-            onApplication.run();
-
-            return null;
-        });
+    private void performCommitWrite(Set<RowId> pendingRowIds, HybridTimestamp 
commitTimestamp) {
+        pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
commitTimestamp));
     }
 
     /**
-     * Handles the abortion of a transaction.
+     * Abort write intents created by the provided transaction.
      *
-     * @param pendingRowIds Row ids of write-intents to be rolled back.
-     * @param onApplication On application callback.
+     * @param pendingRowIds Row ids of write-intents to be aborted.
      */
-    private void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable 
onApplication) {
-        storage.runConsistently(locker -> {
-            for (RowId rowId : pendingRowIds) {
-                locker.lock(rowId);
-
-                try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
-                    if (!cursor.hasNext()) {
-                        continue;
-                    }
-
-                    ReadResult item = cursor.next();
+    private void performAbortWrite(Set<RowId> pendingRowIds) {
+        for (RowId rowId : pendingRowIds) {
+            try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+                if (!cursor.hasNext()) {
+                    continue;
+                }
 
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Prevent double storage 
updates within primary
-                    if (item.isWriteIntent()) {
-                        BinaryRow rowToRemove = item.binaryRow();
+                ReadResult item = cursor.next();
 
-                        if (rowToRemove == null) {
-                            continue;
-                        }
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 
Prevent double storage updates within primary
+                if (item.isWriteIntent()) {
+                    BinaryRow rowToRemove = item.binaryRow();
 
-                        indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, 
rowId, cursor);
+                    if (rowToRemove == null) {
+                        continue;
                     }
+
+                    indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, 
rowId, cursor);
                 }
             }
+        }
 
-            pendingRowIds.forEach(storage::abortWrite);
-
-            onApplication.run();
-
-            return null;
-        });
+        pendingRowIds.forEach(storage::abortWrite);
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0197aa61cf..1f6e8fc3f6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -29,9 +29,10 @@ import static 
org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import static org.apache.ignite.internal.util.IgniteUtils.filter;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
+import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
@@ -1434,7 +1435,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     txOps = new TxCleanupReadyFutureList();
                 }
 
-                if (txOps.state == ABORTED || txOps.state == COMMITED) {
+                if (isFinalState(txOps.state)) {
                     fut.completeExceptionally(
                             new 
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is 
already finished."));
                 } else {
@@ -1470,37 +1471,47 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Result of the given action.
      */
     private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryTuple 
pk, HybridTimestamp ts) {
+        // Indexes store values associated with different versions of one 
entry.
+        // It's possible to have multiple entries for a particular search key
+        // only if we insert, delete and again insert an entry with the same 
indexed fields.
+        // It means that there exists one and only one non-empty readResult 
for any read timestamp for the given key.
+        // Which in turn means that if we have found non empty readResult 
during PK index iteration
+        // we can proceed with readResult resolution and stop the iteration.
         try (Cursor<RowId> cursor = getFromPkIndex(pk)) {
-            List<ReadResult> candidates = new ArrayList<>();
+            // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of 
multiple write intents should not be needed
+            List<ReadResult> writeIntents = new ArrayList<>();
+            List<ReadResult> regularEntries = new ArrayList<>();
 
             for (RowId rowId : cursor) {
                 ReadResult readResult = mvDataStorage.read(rowId, ts);
 
-                if (!readResult.isEmpty() || readResult.isWriteIntent()) {
-                    candidates.add(readResult);
+                if (readResult.isWriteIntent()) {
+                    writeIntents.add(readResult);
+                } else if (!readResult.isEmpty()) {
+                    regularEntries.add(readResult);
                 }
             }
 
-            if (candidates.isEmpty()) {
+            // Nothing found in the storage, return null.
+            if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
                 return completedFuture(null);
             }
 
-            // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of 
multiple write intents should not be needed
-            List<ReadResult> writeIntents = filter(candidates, 
ReadResult::isWriteIntent);
-
-            if (!writeIntents.isEmpty()) {
+            if (writeIntents.isEmpty()) {
+                // No write intents, then return the committed value. We 
already know that regularEntries is not empty.
+                return completedFuture(regularEntries.get(0).binaryRow());
+            } else {
                 ReadResult writeIntent = writeIntents.get(0);
 
                 // Assume that all write intents for the same key belong to 
the same transaction, as the key should be exclusively locked.
                 // This means that we can just resolve the state of this 
transaction.
                 checkWriteIntentsBelongSameTx(writeIntents);
 
-                return resolveTxState(
-                        writeIntent.transactionId(),
-                        new TablePartitionId(writeIntent.commitTableId(), 
writeIntent.commitPartitionId()),
-                        ts)
-                        .thenApply(readLastCommitted -> {
-                            if (readLastCommitted) {
+                return resolveWriteIntentReadability(writeIntent, ts)
+                        .thenApply(writeIntentReadable -> {
+                            if (writeIntentReadable) {
+                                return findAny(writeIntents, wi -> 
!wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
+                            } else {
                                 for (ReadResult wi : writeIntents) {
                                     HybridTimestamp newestCommitTimestamp = 
wi.newestCommitTimestamp();
 
@@ -1517,18 +1528,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     return committedReadResult.binaryRow();
                                 }
 
-                                return findAny(candidates, c -> 
!c.isWriteIntent() && !c.isEmpty()).map(ReadResult::binaryRow)
-                                        .orElse(null);
-                            } else {
-                                return findAny(writeIntents, wi -> 
!wi.isEmpty()).map(ReadResult::binaryRow)
-                                        .orElse(null);
+                                // No suitable value found in write intents, 
read the committed value (if exists)
+                                return 
findFirst(regularEntries).map(ReadResult::binaryRow).orElse(null);
                             }
                         });
-            } else {
-                BinaryRow result = findAny(candidates, r -> 
!r.isEmpty()).map(ReadResult::binaryRow)
-                        .orElse(null);
-
-                return completedFuture(result);
             }
         } catch (Exception e) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
@@ -2466,49 +2469,49 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             HybridTimestamp timestamp,
             Supplier<BinaryRow> lastCommitted
     ) {
-        return resolveTxState(
-                readResult.transactionId(),
-                new TablePartitionId(readResult.commitTableId(), 
readResult.commitPartitionId()),
-                timestamp
-        ).thenApply(readLastCommitted -> {
-            if (readLastCommitted) {
-                return lastCommitted.get();
-            } else {
-                return readResult.binaryRow();
-            }
-        });
+        return resolveWriteIntentReadability(readResult, timestamp)
+                .thenApply(writeIntentReadable -> writeIntentReadable ? 
readResult.binaryRow() : lastCommitted.get());
     }
 
     /**
-     * Resolve the actual tx state.
+     * Check whether we can read from the provided write intent.
      *
-     * @param txId Transaction id.
-     * @param commitGrpId Commit partition id.
+     * @param writeIntent Write intent to resolve.
      * @param timestamp Timestamp.
-     * @return The future completes with true when the transaction is not 
completed yet and false otherwise.
+     * @return The future completes with {@code true} when the transaction is 
committed and commit time <= read time,
+     *     {@code false} otherwise (whe the transaction is either in progress, 
or aborted, or committed and commit time > read time).
      */
-    private CompletableFuture<Boolean> resolveTxState(
-            UUID txId,
-            TablePartitionId commitGrpId,
-            HybridTimestamp timestamp
-    ) {
-        return transactionStateResolver.resolveTxState(txId, commitGrpId, 
timestamp).thenApply(txMeta -> {
-            if (txMeta == null || txMeta.txState() == ABANDONED) {
-                // TODO https://issues.apache.org/jira/browse/IGNITE-20427 
make the null value returned from commit partition
-                // TODO more determined
-                throw new TransactionAbandonedException(txId, txMeta);
-            } else if (txMeta.txState() == COMMITED) {
-                boolean readLatest = timestamp == null;
-
-                return !readLatest && 
txMeta.commitTimestamp().compareTo(timestamp) > 0;
-            } else if (txMeta.txState() == ABORTED) {
-                return true;
-            } else {
-                assert txMeta.txState() == PENDING : "Unexpected transaction 
state [state=" + txMeta.txState() + ']';
+    private CompletableFuture<Boolean> 
resolveWriteIntentReadability(ReadResult writeIntent, HybridTimestamp 
timestamp) {
+        UUID txId = writeIntent.transactionId();
 
-                return true;
-            }
-        });
+        return transactionStateResolver.resolveTxState(
+                        txId,
+                        new TablePartitionId(writeIntent.commitTableId(), 
writeIntent.commitPartitionId()),
+                        timestamp)
+                .thenApply(txMeta -> canReadFromWriteIntent(txId, txMeta, 
timestamp));
+    }
+
+    /**
+     * Check whether we can read write intents created by this transaction.
+     *
+     * @param txId Transaction id.
+     * @param txMeta Transaction metainfo.
+     * @param timestamp Read timestamp.
+     * @return {@code true} if we can read from entries created in this 
transaction
+     *     (when the transaction was committed and commit time <= read time).
+     */
+    private static Boolean canReadFromWriteIntent(UUID txId, @Nullable 
TransactionMeta txMeta, @Nullable HybridTimestamp timestamp) {
+        if (txMeta == null || txMeta.txState() == ABANDONED) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-20427 make 
the null value returned from commit partition
+            // TODO more determined
+            throw new TransactionAbandonedException(txId, txMeta);
+        }
+        if (txMeta.txState() == COMMITED) {
+            boolean readLatest = timestamp == null;
+
+            return readLatest || txMeta.commitTimestamp().compareTo(timestamp) 
<= 0;
+        }
+        return false;
     }
 
     private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
@@ -2744,10 +2747,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         final Map<RequestType, List<CompletableFuture<?>>> futures = new 
EnumMap<>(RequestType.class);
 
         /**
-         * Transaction state. {@code TxState#ABORTED} and {@code 
TxState#COMMITED} match the final transaction states. If the property is
-         * {@code null} the transaction is in pending state.
+         * Transaction state. {@code TxState#ABORTED} and {@code 
TxState#COMMITED} match the final transaction states.
          */
-        TxState state;
+        TxState state = PENDING;
     }
 
     @Override
@@ -2892,7 +2894,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param commitTimestamp Commit timestamp.
      */
     private void markFinished(UUID txId, TxState txState, @Nullable 
HybridTimestamp commitTimestamp) {
-        assert txState == COMMITED || txState == ABORTED : "Unexpected state, 
txId=" + txId + ", txState=" + txState;
+        assert isFinalState(txState) : "Unexpected state [txId=" + txId + ", 
txState=" + txState + ']';
 
         txManager.updateTxMeta(txId, old -> old == null
                 ? null
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index c5a897b647..4a14f16be4 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -38,6 +38,12 @@ public enum TxState {
             { true,  true,  true,  true,  true,  true }
     };
 
+    /**
+     * Checks whether the state is final, i.e. no transition from this state 
is allowed.
+     *
+     * @param state Transaction state.
+     * @return {@code true} if the state is either {@link #COMMITED} or {@link 
#ABORTED}
+     */
     public static boolean isFinalState(TxState state) {
         return state == COMMITED || state == ABORTED;
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index c0a6979335..776daa566f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -221,7 +221,7 @@ public class TxManagerImpl implements TxManager {
 
     @Override
     public CompletableFuture<Void> finish(
-            HybridTimestampTracker timestampTracker,
+            HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
             ClusterNode recipientNode,
             Long term,
@@ -256,7 +256,7 @@ public class TxManagerImpl implements TxManager {
             return completedFuture(null);
         }
 
-        timestampTracker.update(commitTimestamp);
+        observableTimestampTracker.update(commitTimestamp);
 
         TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest()
                 .txId(txId)
@@ -319,7 +319,7 @@ public class TxManagerImpl implements TxManager {
     @Override
     public int finished() {
         return (int) txStateMap.entrySet().stream()
-                .filter(e -> e.getValue().txState() == COMMITED || 
e.getValue().txState() == ABORTED)
+                .filter(e -> isFinalState(e.getValue().txState()))
                 .count();
     }
 

Reply via email to