This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5186f94971 IGNITE-18632 Barrier for locks after cleanup started (#1663)
5186f94971 is described below

commit 5186f9497130b1a8bd8191a9b66b0c318c40b15a
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Feb 14 18:54:03 2023 +0400

    IGNITE-18632 Barrier for locks after cleanup started (#1663)
---
 .../ignite/internal/util/CollectionUtils.java      |  10 ++
 .../replicator/PartitionReplicaListener.java       | 170 ++++++++++++++-------
 .../ignite/internal/table/TxAbstractTest.java      |  70 +++++++++
 3 files changed, 198 insertions(+), 52 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 7af89a52f3..facc79eb6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -82,6 +82,16 @@ public final class CollectionUtils {
         return col == null || col.isEmpty();
     }
 
+    /**
+     * Tests if the given iterator is either {@code null} or empty.
+     *
+     * @param iter Iterator.
+     * @return Whether or not the given iterator is {@code null} or empty.
+     */
+    public static boolean nullOrEmpty(@Nullable Iterator<?> iter) {
+        return iter == null || !iter.hasNext();
+    }
+
     /**
      * Gets first element from given list or returns {@code null} if list is 
empty.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 26db7e70e1..eb15ddb1a1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -277,14 +277,26 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return ensureReplicaIsPrimary(request)
                 .thenCompose((isPrimary) -> {
                     if (request instanceof ReadWriteSingleRowReplicaRequest) {
-                        return 
processSingleEntryAction((ReadWriteSingleRowReplicaRequest) request);
+                        var req = (ReadWriteSingleRowReplicaRequest) request;
+
+                        return appendTxCommand(req.transactionId(), 
req.requestType(), () ->
+                                processSingleEntryAction(req));
                     } else if (request instanceof 
ReadWriteMultiRowReplicaRequest) {
-                        return 
processMultiEntryAction((ReadWriteMultiRowReplicaRequest) request);
+                        var req = (ReadWriteMultiRowReplicaRequest) request;
+
+                        return appendTxCommand(req.transactionId(), 
req.requestType(), () ->
+                                processMultiEntryAction(req));
                     } else if (request instanceof 
ReadWriteSwapRowReplicaRequest) {
-                        return 
processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request)
+                        var req = (ReadWriteSwapRowReplicaRequest) request;
+
+                        return appendTxCommand(req.transactionId(), 
req.requestType(), () ->
+                                processTwoEntriesAction(req))
                                 .thenApply(Function.identity());
                     } else if (request instanceof 
ReadWriteScanRetrieveBatchReplicaRequest) {
-                        return 
processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) 
request)
+                        var req = (ReadWriteScanRetrieveBatchReplicaRequest) 
request;
+
+                        return appendTxCommand(req.transactionId(), 
RequestType.RW_SCAN, () ->
+                                processScanRetrieveBatchAction(req))
                                 .thenApply(Function.identity());
                     } else if (request instanceof 
ReadWriteScanCloseReplicaRequest) {
                         
processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
@@ -1055,26 +1067,39 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
 
         List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
+        List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-18617
-        // TODO https://issues.apache.org/jira/browse/IGNITE-18632
-        TxCleanupReadyFutureList futs = 
txCleanupReadyFutures.computeIfAbsent(request.txId(), k -> new 
TxCleanupReadyFutureList());
+        txCleanupReadyFutures.compute(request.txId(), (id, txOps) -> {
+            if (txOps == null) {
+                txOps = new TxCleanupReadyFutureList();
+            }
 
-        synchronized (futs) {
-            txUpdateFutures.addAll(futs.futures);
+            for (RequestType opType : txOps.futures.keySet()) {
+                if (opType == RequestType.RW_GET || opType == 
RequestType.RW_GET_ALL || opType == RequestType.RW_SCAN) {
+                    txReadFutures.addAll(txOps.futures.get(opType));
+                } else {
+                    txUpdateFutures.addAll(txOps.futures.get(opType));
+                }
+            }
 
-            futs.futures.clear();
+            txOps.futures.clear();
 
-            futs.finished = true;
-        }
+            txOps.state = request.commit() ? TxState.COMMITED : 
TxState.ABORTED;
+
+            return txOps;
+        });
 
         if (txUpdateFutures.isEmpty()) {
-            releaseTxLocks(request.txId());
+            if (!txReadFutures.isEmpty()) {
+                allOffFuturesExceptionIgnored(txReadFutures, request)
+                        .thenRun(() -> releaseTxLocks(request.txId()));
+            }
 
             return completedFuture(null);
         }
 
-        return allOf(txUpdateFutures.toArray(new 
CompletableFuture<?>[txUpdateFutures.size()])).thenCompose(v -> {
+        return allOffFuturesExceptionIgnored(txUpdateFutures, 
request).thenCompose(v -> {
             HybridTimestampMessage timestampMsg = 
hybridTimestamp(request.commitTimestamp());
 
             TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
@@ -1086,10 +1111,30 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             return raftClient
                     .run(txCleanupCmd)
-                    .thenRun(() -> releaseTxLocks(request.txId()));
+                    .thenCompose(ignored -> 
allOffFuturesExceptionIgnored(txReadFutures, request)
+                            .thenRun(() -> releaseTxLocks(request.txId())));
         });
     }
 
+    /**
+     * Creates a future that waits all transaction operations are completed.
+     *
+     * @param txFutures Transaction operation futures.
+     * @param request Cleanup request.
+     * @return The future completes when all futures in passed list are 
completed.
+     */
+    private static CompletableFuture<Void> 
allOffFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures,
+            TxCleanupReplicaRequest request) {
+        return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
+                .exceptionally(e -> {
+                    assert !request.commit() :
+                            "Transaction is committing, but an operation has 
completed with exception [txId=" + request.txId()
+                                    + ", err=" + e.getMessage() + ']';
+
+                    return null;
+                });
+    }
+
     private void releaseTxLocks(UUID txId) {
         lockManager.locks(txId).forEachRemaining(lockManager::release);
     }
@@ -1131,6 +1176,45 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
+    /**
+     * Appends an operation to prevent the race between commit/rollback and 
the operation execution.
+     *
+     * @param txId Transaction id.
+     * @param cmdType Command type.
+     * @param op Operation closure.
+     * @param <T> Type of execution result.
+     * @return A future object representing the result of the given operation.
+     */
+    private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType 
cmdType, Supplier<CompletableFuture<T>> op) {
+        var fut = new CompletableFuture<T>();
+
+        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+            if (txOps == null) {
+                txOps = new TxCleanupReadyFutureList();
+            }
+
+            if (txOps.state == TxState.ABORTED || txOps.state == 
TxState.COMMITED) {
+                fut.completeExceptionally(new 
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is 
already finished."));
+            } else {
+                txOps.futures.computeIfAbsent(cmdType, type -> new 
ArrayList<>()).add(fut);
+            }
+
+            return txOps;
+        });
+
+        if (!fut.isDone()) {
+            op.get().whenComplete((v, th) -> {
+                if (th != null) {
+                    fut.completeExceptionally(th);
+                } else {
+                    fut.complete(v);
+                }
+            });
+        }
+
+        return fut;
+    }
+
     /**
      * Finds the row and its identifier by given pk search row.
      *
@@ -1504,17 +1588,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<Object> applyUpdateCommand(UpdateCommand cmd) {
-        return applyUpdatingCommand(cmd.txId(), () -> {
-            storageUpdateHandler.handleUpdate(
-                    cmd.txId(),
-                    cmd.rowUuid(),
-                    cmd.tablePartitionId().asTablePartitionId(),
-                    cmd.rowBuffer(),
-                    null
-            );
-
-            return applyCmdWithExceptionHandling(cmd);
-        });
+        storageUpdateHandler.handleUpdate(
+                cmd.txId(),
+                cmd.rowUuid(),
+                cmd.tablePartitionId().asTablePartitionId(),
+                cmd.rowBuffer(),
+                null
+        );
+
+        return applyCmdWithExceptionHandling(cmd);
     }
 
     /**
@@ -1524,30 +1606,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand 
cmd) {
-        return applyUpdatingCommand(cmd.txId(), () -> {
-            storageUpdateHandler.handleUpdateAll(cmd.txId(), 
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), null);
+        storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), 
cmd.tablePartitionId().asTablePartitionId(), null);
 
-            return applyCmdWithExceptionHandling(cmd);
-        });
-    }
-
-    private CompletableFuture<Object> applyUpdatingCommand(UUID txId, 
Supplier<CompletableFuture<Object>> closure) {
-        CompletableFuture<Object> applyCmdFuture;
-
-        TxCleanupReadyFutureList futs = 
txCleanupReadyFutures.computeIfAbsent(txId, k -> new 
TxCleanupReadyFutureList());
-
-        // TODO https://issues.apache.org/jira/browse/IGNITE-18632
-        synchronized (futs) {
-            if (futs.finished) {
-                throw new 
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is 
already finished.");
-            }
-
-            applyCmdFuture = closure.get();
-
-            futs.futures.add(applyCmdFuture);
-        }
-
-        return applyCmdFuture;
+        return applyCmdWithExceptionHandling(cmd);
     }
 
     /**
@@ -2177,14 +2238,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     /**
-     * Class that stores list of futures for updates that can block tx 
cleanup, and finished flag.
+     * Class that stores a list of futures for operations that has happened in 
a specific transaction.
+     * Also, the class has a property {@code state} that represents a 
transaction state.
      */
     private static class TxCleanupReadyFutureList {
-        final List<CompletableFuture<?>> futures = new ArrayList<>();
+        /**
+         * Operation type is mapped operation futures.
+         */
+        final Map<RequestType, List<CompletableFuture<?>>> futures = new 
HashMap<>();
 
         /**
-         * Whether the transaction is finished and therefore locked for 
further updates by started cleanup process.
+         * Transaction state. {@code TxState#ABORTED} and {@code 
TxState#COMMITED} match the final transaction states.
+         * If the property is {@code null} the transaction is in pending state.
          */
-        boolean finished;
+        TxState state;
     }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 03a3a87c8d..83f98718c9 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -61,6 +61,8 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.KeyValueView;
@@ -1898,4 +1900,72 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         Collection<Tuple> retrievedKeys3 = 
accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
         validateBalance(retrievedKeys3, 300.);
     }
+
+    @Test
+    public void testTransactionAlreadyCommitted() {
+        testTransactionAlreadyFinished(true);
+    }
+
+    @Test
+    public void testTransactionAlreadyRolledback() {
+        testTransactionAlreadyFinished(false);
+    }
+
+    /**
+     * Checks operations that act after a transaction is committed, are 
finished with exception.
+     *
+     * @param commit True when transaction is committed, false the transaction 
is rolled back.
+     */
+    private void testTransactionAlreadyFinished(boolean commit) {
+        Transaction tx = igniteTransactions.begin();
+
+        var txId = ((ReadWriteTransactionImpl) tx).id();
+
+        Transaction sameTxWithoutFinishGuard = new 
ReadWriteTransactionImpl(txManager(accounts), txId);
+
+        log.info("Started transaction {}", txId);
+
+        var accountsRv = accounts.recordView();
+
+        accountsRv.upsert(tx, makeValue(1, 100.));
+        accountsRv.upsert(tx, makeValue(2, 200.));
+
+        Collection<Tuple> res = accountsRv.getAll(sameTxWithoutFinishGuard, 
List.of(makeKey(1), makeKey(2)));
+
+        validateBalance(res, 100., 200.);
+
+        if (commit) {
+            tx.commit();
+
+            log.info("Committed transaction {}", txId);
+        } else {
+            tx.rollback();
+
+            log.info("Rolled back transaction {}", txId);
+        }
+
+        TransactionException ex = assertThrows(TransactionException.class, () 
-> accountsRv.get(sameTxWithoutFinishGuard, makeKey(1)));
+        assertTrue(ex.getMessage().contains("Transaction is already 
finished."));
+
+        ex = assertThrows(TransactionException.class, () -> 
accountsRv.delete(sameTxWithoutFinishGuard, makeKey(1)));
+        assertTrue(ex.getMessage().contains("Transaction is already 
finished."));
+
+        ex = assertThrows(TransactionException.class, () -> 
accountsRv.get(sameTxWithoutFinishGuard, makeKey(2)));
+        assertTrue(ex.getMessage().contains("Transaction is already 
finished."));
+
+        ex = assertThrows(TransactionException.class, () -> 
accountsRv.upsert(sameTxWithoutFinishGuard, makeValue(2, 300.)));
+        assertTrue(ex.getMessage().contains("Transaction is already 
finished."));
+
+        
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
+
+        if (commit) {
+            res = accountsRv.getAll(null, List.of(makeKey(1), makeKey(2)));
+
+            validateBalance(res, 100., 200.);
+        } else {
+            res = accountsRv.getAll(null, List.of(makeKey(1), makeKey(2)));
+
+            assertTrue(CollectionUtils.nullOrEmpty(res));
+        }
+    }
 }

Reply via email to