This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5186f94971 IGNITE-18632 Barrier for locks after cleanup started (#1663)
5186f94971 is described below
commit 5186f9497130b1a8bd8191a9b66b0c318c40b15a
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Feb 14 18:54:03 2023 +0400
IGNITE-18632 Barrier for locks after cleanup started (#1663)
---
.../ignite/internal/util/CollectionUtils.java | 10 ++
.../replicator/PartitionReplicaListener.java | 170 ++++++++++++++-------
.../ignite/internal/table/TxAbstractTest.java | 70 +++++++++
3 files changed, 198 insertions(+), 52 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 7af89a52f3..facc79eb6a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -82,6 +82,16 @@ public final class CollectionUtils {
return col == null || col.isEmpty();
}
+ /**
+ * Tests if the given iterator is either {@code null} or empty.
+ *
+ * @param iter Iterator.
+ * @return Whether or not the given iterator is {@code null} or empty.
+ */
+ public static boolean nullOrEmpty(@Nullable Iterator<?> iter) {
+ return iter == null || !iter.hasNext();
+ }
+
/**
* Gets first element from given list or returns {@code null} if list is
empty.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 26db7e70e1..eb15ddb1a1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -277,14 +277,26 @@ public class PartitionReplicaListener implements
ReplicaListener {
return ensureReplicaIsPrimary(request)
.thenCompose((isPrimary) -> {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
- return
processSingleEntryAction((ReadWriteSingleRowReplicaRequest) request);
+ var req = (ReadWriteSingleRowReplicaRequest) request;
+
+ return appendTxCommand(req.transactionId(),
req.requestType(), () ->
+ processSingleEntryAction(req));
} else if (request instanceof
ReadWriteMultiRowReplicaRequest) {
- return
processMultiEntryAction((ReadWriteMultiRowReplicaRequest) request);
+ var req = (ReadWriteMultiRowReplicaRequest) request;
+
+ return appendTxCommand(req.transactionId(),
req.requestType(), () ->
+ processMultiEntryAction(req));
} else if (request instanceof
ReadWriteSwapRowReplicaRequest) {
- return
processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request)
+ var req = (ReadWriteSwapRowReplicaRequest) request;
+
+ return appendTxCommand(req.transactionId(),
req.requestType(), () ->
+ processTwoEntriesAction(req))
.thenApply(Function.identity());
} else if (request instanceof
ReadWriteScanRetrieveBatchReplicaRequest) {
- return
processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest)
request)
+ var req = (ReadWriteScanRetrieveBatchReplicaRequest)
request;
+
+ return appendTxCommand(req.transactionId(),
RequestType.RW_SCAN, () ->
+ processScanRetrieveBatchAction(req))
.thenApply(Function.identity());
} else if (request instanceof
ReadWriteScanCloseReplicaRequest) {
processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
@@ -1055,26 +1067,39 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
+ List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
// TODO https://issues.apache.org/jira/browse/IGNITE-18617
- // TODO https://issues.apache.org/jira/browse/IGNITE-18632
- TxCleanupReadyFutureList futs =
txCleanupReadyFutures.computeIfAbsent(request.txId(), k -> new
TxCleanupReadyFutureList());
+ txCleanupReadyFutures.compute(request.txId(), (id, txOps) -> {
+ if (txOps == null) {
+ txOps = new TxCleanupReadyFutureList();
+ }
- synchronized (futs) {
- txUpdateFutures.addAll(futs.futures);
+ for (RequestType opType : txOps.futures.keySet()) {
+ if (opType == RequestType.RW_GET || opType ==
RequestType.RW_GET_ALL || opType == RequestType.RW_SCAN) {
+ txReadFutures.addAll(txOps.futures.get(opType));
+ } else {
+ txUpdateFutures.addAll(txOps.futures.get(opType));
+ }
+ }
- futs.futures.clear();
+ txOps.futures.clear();
- futs.finished = true;
- }
+ txOps.state = request.commit() ? TxState.COMMITED :
TxState.ABORTED;
+
+ return txOps;
+ });
if (txUpdateFutures.isEmpty()) {
- releaseTxLocks(request.txId());
+ if (!txReadFutures.isEmpty()) {
+ allOffFuturesExceptionIgnored(txReadFutures, request)
+ .thenRun(() -> releaseTxLocks(request.txId()));
+ }
return completedFuture(null);
}
- return allOf(txUpdateFutures.toArray(new
CompletableFuture<?>[txUpdateFutures.size()])).thenCompose(v -> {
+ return allOffFuturesExceptionIgnored(txUpdateFutures,
request).thenCompose(v -> {
HybridTimestampMessage timestampMsg =
hybridTimestamp(request.commitTimestamp());
TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
@@ -1086,10 +1111,30 @@ public class PartitionReplicaListener implements
ReplicaListener {
return raftClient
.run(txCleanupCmd)
- .thenRun(() -> releaseTxLocks(request.txId()));
+ .thenCompose(ignored ->
allOffFuturesExceptionIgnored(txReadFutures, request)
+ .thenRun(() -> releaseTxLocks(request.txId())));
});
}
+ /**
+ * Creates a future that waits all transaction operations are completed.
+ *
+ * @param txFutures Transaction operation futures.
+ * @param request Cleanup request.
+ * @return The future completes when all futures in passed list are
completed.
+ */
+ private static CompletableFuture<Void>
allOffFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures,
+ TxCleanupReplicaRequest request) {
+ return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
+ .exceptionally(e -> {
+ assert !request.commit() :
+ "Transaction is committing, but an operation has
completed with exception [txId=" + request.txId()
+ + ", err=" + e.getMessage() + ']';
+
+ return null;
+ });
+ }
+
private void releaseTxLocks(UUID txId) {
lockManager.locks(txId).forEachRemaining(lockManager::release);
}
@@ -1131,6 +1176,45 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ /**
+ * Appends an operation to prevent the race between commit/rollback and
the operation execution.
+ *
+ * @param txId Transaction id.
+ * @param cmdType Command type.
+ * @param op Operation closure.
+ * @param <T> Type of execution result.
+ * @return A future object representing the result of the given operation.
+ */
+ private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType
cmdType, Supplier<CompletableFuture<T>> op) {
+ var fut = new CompletableFuture<T>();
+
+ txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+ if (txOps == null) {
+ txOps = new TxCleanupReadyFutureList();
+ }
+
+ if (txOps.state == TxState.ABORTED || txOps.state ==
TxState.COMMITED) {
+ fut.completeExceptionally(new
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is
already finished."));
+ } else {
+ txOps.futures.computeIfAbsent(cmdType, type -> new
ArrayList<>()).add(fut);
+ }
+
+ return txOps;
+ });
+
+ if (!fut.isDone()) {
+ op.get().whenComplete((v, th) -> {
+ if (th != null) {
+ fut.completeExceptionally(th);
+ } else {
+ fut.complete(v);
+ }
+ });
+ }
+
+ return fut;
+ }
+
/**
* Finds the row and its identifier by given pk search row.
*
@@ -1504,17 +1588,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<Object> applyUpdateCommand(UpdateCommand cmd) {
- return applyUpdatingCommand(cmd.txId(), () -> {
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowBuffer(),
- null
- );
-
- return applyCmdWithExceptionHandling(cmd);
- });
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowBuffer(),
+ null
+ );
+
+ return applyCmdWithExceptionHandling(cmd);
}
/**
@@ -1524,30 +1606,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand
cmd) {
- return applyUpdatingCommand(cmd.txId(), () -> {
- storageUpdateHandler.handleUpdateAll(cmd.txId(),
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), null);
+ storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(), null);
- return applyCmdWithExceptionHandling(cmd);
- });
- }
-
- private CompletableFuture<Object> applyUpdatingCommand(UUID txId,
Supplier<CompletableFuture<Object>> closure) {
- CompletableFuture<Object> applyCmdFuture;
-
- TxCleanupReadyFutureList futs =
txCleanupReadyFutures.computeIfAbsent(txId, k -> new
TxCleanupReadyFutureList());
-
- // TODO https://issues.apache.org/jira/browse/IGNITE-18632
- synchronized (futs) {
- if (futs.finished) {
- throw new
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is
already finished.");
- }
-
- applyCmdFuture = closure.get();
-
- futs.futures.add(applyCmdFuture);
- }
-
- return applyCmdFuture;
+ return applyCmdWithExceptionHandling(cmd);
}
/**
@@ -2177,14 +2238,19 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
/**
- * Class that stores list of futures for updates that can block tx
cleanup, and finished flag.
+ * Class that stores a list of futures for operations that has happened in
a specific transaction.
+ * Also, the class has a property {@code state} that represents a
transaction state.
*/
private static class TxCleanupReadyFutureList {
- final List<CompletableFuture<?>> futures = new ArrayList<>();
+ /**
+ * Operation type is mapped operation futures.
+ */
+ final Map<RequestType, List<CompletableFuture<?>>> futures = new
HashMap<>();
/**
- * Whether the transaction is finished and therefore locked for
further updates by started cleanup process.
+ * Transaction state. {@code TxState#ABORTED} and {@code
TxState#COMMITED} match the final transaction states.
+ * If the property is {@code null} the transaction is in pending state.
*/
- boolean finished;
+ TxState state;
}
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 03a3a87c8d..83f98718c9 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -61,6 +61,8 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
@@ -1898,4 +1900,72 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
Collection<Tuple> retrievedKeys3 =
accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys3, 300.);
}
+
+ @Test
+ public void testTransactionAlreadyCommitted() {
+ testTransactionAlreadyFinished(true);
+ }
+
+ @Test
+ public void testTransactionAlreadyRolledback() {
+ testTransactionAlreadyFinished(false);
+ }
+
+ /**
+ * Checks operations that act after a transaction is committed, are
finished with exception.
+ *
+ * @param commit True when transaction is committed, false the transaction
is rolled back.
+ */
+ private void testTransactionAlreadyFinished(boolean commit) {
+ Transaction tx = igniteTransactions.begin();
+
+ var txId = ((ReadWriteTransactionImpl) tx).id();
+
+ Transaction sameTxWithoutFinishGuard = new
ReadWriteTransactionImpl(txManager(accounts), txId);
+
+ log.info("Started transaction {}", txId);
+
+ var accountsRv = accounts.recordView();
+
+ accountsRv.upsert(tx, makeValue(1, 100.));
+ accountsRv.upsert(tx, makeValue(2, 200.));
+
+ Collection<Tuple> res = accountsRv.getAll(sameTxWithoutFinishGuard,
List.of(makeKey(1), makeKey(2)));
+
+ validateBalance(res, 100., 200.);
+
+ if (commit) {
+ tx.commit();
+
+ log.info("Committed transaction {}", txId);
+ } else {
+ tx.rollback();
+
+ log.info("Rolled back transaction {}", txId);
+ }
+
+ TransactionException ex = assertThrows(TransactionException.class, ()
-> accountsRv.get(sameTxWithoutFinishGuard, makeKey(1)));
+ assertTrue(ex.getMessage().contains("Transaction is already
finished."));
+
+ ex = assertThrows(TransactionException.class, () ->
accountsRv.delete(sameTxWithoutFinishGuard, makeKey(1)));
+ assertTrue(ex.getMessage().contains("Transaction is already
finished."));
+
+ ex = assertThrows(TransactionException.class, () ->
accountsRv.get(sameTxWithoutFinishGuard, makeKey(2)));
+ assertTrue(ex.getMessage().contains("Transaction is already
finished."));
+
+ ex = assertThrows(TransactionException.class, () ->
accountsRv.upsert(sameTxWithoutFinishGuard, makeValue(2, 300.)));
+ assertTrue(ex.getMessage().contains("Transaction is already
finished."));
+
+
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
+
+ if (commit) {
+ res = accountsRv.getAll(null, List.of(makeKey(1), makeKey(2)));
+
+ validateBalance(res, 100., 200.);
+ } else {
+ res = accountsRv.getAll(null, List.of(makeKey(1), makeKey(2)));
+
+ assertTrue(CollectionUtils.nullOrEmpty(res));
+ }
+ }
}