This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 9cd0e5017f IGNITE-21070 Ensure that data node's primary replica expiration properly handled (#3027) 9cd0e5017f is described below commit 9cd0e5017fac6d54da5f6c9c802e8ce686267668 Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Fri Jan 12 18:18:42 2024 +0300 IGNITE-21070 Ensure that data node's primary replica expiration properly handled (#3027) --- .../ItSchemaForwardCompatibilityTest.java | 2 +- .../internal/table/ItTransactionRecoveryTest.java | 24 +- .../IncompatibleSchemaAbortException.java | 31 --- .../replicator/PartitionReplicaListener.java | 22 +- .../replication/PartitionReplicaListenerTest.java | 12 +- .../ignite/internal/tx/impl/OrphanDetector.java | 85 +++--- .../internal/tx/impl/TxCleanupRequestHandler.java | 16 +- .../ignite/internal/tx/impl/TxManagerImpl.java | 42 ++- .../tx/impl/WriteIntentSwitchProcessor.java | 14 +- .../apache/ignite/internal/tx/TxCleanupTest.java | 198 ++++++++++++++ .../apache/ignite/internal/tx/TxManagerTest.java | 122 ++++++++- .../internal/tx/impl/OrphanDetectorTest.java | 290 +++++++++++++++++++++ 12 files changed, 704 insertions(+), 154 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java index 9ed055436d..1d7782a67d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java @@ -112,7 +112,7 @@ class ItSchemaForwardCompatibilityTest extends ClusterPerTestIntegrationTest { )) ); - assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR)); + assertThat(ex.code(), is(Transactions.TX_UNEXPECTED_STATE_ERR)); assertThat(tx.state(), is(TxState.ABORTED)); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java index b2bc84c043..a9c1f993cb 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java @@ -350,7 +350,8 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { UUID orphanTxId = startTransactionAndStopNode(txCrdNode); - AtomicInteger msgCount = new AtomicInteger(); + AtomicInteger stateMsgCount = new AtomicInteger(); + AtomicInteger recoveryMsgCount = new AtomicInteger(); IgniteImpl roCoordNode = node(0); @@ -360,13 +361,17 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { commitPartNode.dropMessages((nodeName, msg) -> { if (msg instanceof TxStateCommitPartitionRequest) { - msgCount.incrementAndGet(); + stateMsgCount.incrementAndGet(); assertEquals(TxState.ABANDONED, txVolatileState(commitPartNode, orphanTxId)); txMsgCaptureFut.complete(((TxStateCommitPartitionRequest) msg).txId()); } + if (msg instanceof TxRecoveryMessage) { + recoveryMsgCount.incrementAndGet(); + } + return false; }); @@ -374,21 +379,19 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { RecordView view = roCoordNode.tables().table(TABLE_NAME).recordView(); - try { - view.getAsync(recoveryTxReadOnly, Tuple.create().set("key", 42)); - } catch (Exception e) { - assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e)); - - log.info("Expected lock conflict.", e); - } + view.getAsync(recoveryTxReadOnly, Tuple.create().set("key", 42)); assertThat(txMsgCaptureFut, willCompleteSuccessfully()); runConflictingTransaction(commitPartNode, commitPartNode.transactions().begin()); - assertEquals(1, msgCount.get()); + assertEquals(1, stateMsgCount.get()); + + assertEquals(0, recoveryMsgCount.get()); assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTxId) == TxState.ABORTED, 10_000)); + + assertTrue(waitForCondition(() -> txStoredMeta(commitPartNode, orphanTxId).locksReleased(), 10_000)); } /** @@ -557,7 +560,6 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { assertInstanceOf(TransactionAlreadyFinishedException.class, ExceptionUtils.unwrapCause(errorResponse.throwable())); assertEquals(TxState.ABORTED, txStoredState(commitPartNode, orphanTx.id())); - } @Test diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaAbortException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaAbortException.java deleted file mode 100644 index 4855ee00bf..0000000000 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaAbortException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.table.distributed.replicator; - -import org.apache.ignite.lang.ErrorGroups.Transactions; -import org.apache.ignite.tx.TransactionException; - -/** - * Thrown when, during an attempt to commit a transaction, it turns out that the transaction cannot be committed - * because an incompatible schema change has happened. - */ -public class IncompatibleSchemaAbortException extends TransactionException { - public IncompatibleSchemaAbortException(String message) { - super(Transactions.TX_COMMIT_ERR, message); - } -} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index a003dbc413..979726da73 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -533,7 +533,7 @@ public class PartitionReplicaListener implements ReplicaListener { .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId)); } - LOG.info("Orphan transaction has to be aborted [tx={}].", txId); + LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", txId, txMeta); return triggerTxRecovery(txId, senderId); } @@ -1569,7 +1569,7 @@ public class PartitionReplicaListener implements ReplicaListener { txId, txCoordinatorId ).thenApply(txResult -> { - throwIfSchemaValidationOnCommitFailed(validationResult); + throwIfSchemaValidationOnCommitFailed(validationResult, txResult); return txResult; })); } else { @@ -1578,18 +1578,22 @@ public class PartitionReplicaListener implements ReplicaListener { } } - private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult) { + private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, TransactionResult txResult) { if (!validationResult.isSuccessful()) { if (validationResult.isTableDropped()) { // TODO: IGNITE-20966 - improve error message. - throw new IncompatibleSchemaAbortException( - format("Commit failed because a table was already dropped [tableId={}]", validationResult.failedTableId()) + throw new TransactionAlreadyFinishedException( + format("Commit failed because a table was already dropped [tableId={}]", validationResult.failedTableId()), + txResult ); } else { // TODO: IGNITE-20966 - improve error message. - throw new IncompatibleSchemaAbortException("Commit failed because schema " - + validationResult.fromSchemaVersion() + " is not forward-compatible with " - + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId()); + throw new TransactionAlreadyFinishedException( + "Commit failed because schema " + + validationResult.fromSchemaVersion() + " is not forward-compatible with " + + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId(), + txResult + ); } } } @@ -1676,7 +1680,7 @@ public class PartitionReplicaListener implements ReplicaListener { @Nullable HybridTimestamp commitTimestamp, String txCoordinatorId ) { - assert !commit || (commitTimestamp != null); + assert !(commit && commitTimestamp == null) : "Cannot commit without the timestamp."; HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : hybridClock.now(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 58c24ba8a9..5806bc9d80 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -144,7 +144,6 @@ import org.apache.ignite.internal.table.distributed.replication.request.ReadOnly import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest; -import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException; import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException; import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; @@ -159,6 +158,7 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.TransactionAlreadyFinishedException; import org.apache.ignite.internal.tx.TransactionIds; import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TransactionResult; @@ -1624,9 +1624,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { CompletableFuture<?> future = beginAndCommitTx(); - IncompatibleSchemaAbortException ex = assertWillThrowFast(future, - IncompatibleSchemaAbortException.class); - assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR)); + TransactionAlreadyFinishedException ex = assertWillThrowFast(future, + TransactionAlreadyFinishedException.class); + assertThat(ex.getMessage(), containsString("Commit failed because schema 1 is not forward-compatible with 2")); assertThat(committed.get(), is(false)); @@ -2323,8 +2323,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { localNode.id() ); - IncompatibleSchemaAbortException ex = assertWillThrowFast(future, IncompatibleSchemaAbortException.class); - assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR)); + TransactionAlreadyFinishedException ex = assertWillThrowFast(future, TransactionAlreadyFinishedException.class); + assertThat(ex.getMessage(), is("Commit failed because a table was already dropped [tableId=" + tableToBeDroppedId + "]")); assertThat("The transaction must have been aborted", committed.get(), is(false)); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java index 3fe9a03c89..5032e70747 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.tx.impl; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.tx.TxState.ABANDONED; +import static org.apache.ignite.internal.tx.TxState.FINISHING; import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -30,12 +30,10 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.event.EventListener; -import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.replicator.ReplicaService; -import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.TxStateMetaAbandoned; @@ -59,8 +57,6 @@ public class OrphanDetector { /** Tx messages factory. */ private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); - private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10; - /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -71,7 +67,7 @@ public class OrphanDetector { private final ReplicaService replicaService; /** Placement driver. */ - private final PlacementDriver placementDriver; + private final PlacementDriverHelper placementDriverHelper; /** Lock manager. */ private final LockManager lockManager; @@ -79,9 +75,6 @@ public class OrphanDetector { /** Lock conflict events listener. */ private final EventListener<LockEventParameters> lockConflictListener = this::lockConflictListener; - /** Hybrid clock. */ - private final HybridClock clock; - /** * The time interval in milliseconds in which the orphan resolution sends the recovery message again, in case the transaction is still * not finalized. @@ -96,22 +89,19 @@ public class OrphanDetector { * * @param topologyService Topology service. * @param replicaService Replica service. - * @param placementDriver Placement driver. + * @param placementDriverHelper Placement driver helper. * @param lockManager Lock manager. - * @param clock Clock. */ public OrphanDetector( TopologyService topologyService, ReplicaService replicaService, - PlacementDriver placementDriver, - LockManager lockManager, - HybridClock clock + PlacementDriverHelper placementDriverHelper, + LockManager lockManager ) { this.topologyService = topologyService; this.replicaService = replicaService; - this.placementDriver = placementDriver; + this.placementDriverHelper = placementDriverHelper; this.lockManager = lockManager; - this.clock = clock; } /** @@ -179,7 +169,7 @@ public class OrphanDetector { txState.txCoordinatorId() ); - sentTxRecoveryMessage(txState.commitPartitionId(), txId); + sendTxRecoveryMessage(txState.commitPartitionId(), txId); } // TODO: https://issues.apache.org/jira/browse/IGNITE-21153 @@ -193,37 +183,33 @@ public class OrphanDetector { * @param cmpPartGrp Replication group of commit partition. * @param txId Transaction id. */ - private void sentTxRecoveryMessage(ReplicationGroupId cmpPartGrp, UUID txId) { - placementDriver.awaitPrimaryReplica( - cmpPartGrp, - clock.now(), - AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, - SECONDS - ).thenCompose(replicaMeta -> { - ClusterNode commitPartPrimaryNode = topologyService.getByConsistentId(replicaMeta.getLeaseholder()); - - if (commitPartPrimaryNode == null) { - LOG.warn( - "The primary replica of the commit partition is not available [commitPartGrp={}, tx={}]", - cmpPartGrp, - txId - ); - - return nullCompletedFuture(); - } - - return replicaService.invoke(commitPartPrimaryNode, FACTORY.txRecoveryMessage() - .groupId(cmpPartGrp) - .enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()) - .txId(txId) - .build()); - }).exceptionally(throwable -> { - if (throwable != null) { - LOG.warn("A recovery message for the transaction was handled with the error [tx={}].", throwable, txId); - } - - return null; - }); + private void sendTxRecoveryMessage(TablePartitionId cmpPartGrp, UUID txId) { + placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp) + .thenCompose(replicaMeta -> { + ClusterNode commitPartPrimaryNode = topologyService.getByConsistentId(replicaMeta.getLeaseholder()); + + if (commitPartPrimaryNode == null) { + LOG.warn( + "The primary replica of the commit partition is not available [commitPartGrp={}, tx={}]", + cmpPartGrp, + txId + ); + + return nullCompletedFuture(); + } + + return replicaService.invoke(commitPartPrimaryNode, FACTORY.txRecoveryMessage() + .groupId(cmpPartGrp) + .enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()) + .txId(txId) + .build()); + }).exceptionally(throwable -> { + if (throwable != null) { + LOG.warn("A recovery message for the transaction was handled with the error [tx={}].", throwable, txId); + } + + return null; + }); } /** @@ -270,6 +256,7 @@ public class OrphanDetector { private boolean isRecoveryNeeded(TxStateMeta txState) { return txState != null && !isFinalState(txState.txState()) + && txState.txState() != FINISHING && !isTxAbandonedRecently(txState); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java index e79ed8ab4f..eaab109445 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java @@ -31,7 +31,7 @@ import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.message.TxCleanupMessage; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.message.TxMessagesFactory; -import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkMessage; import org.jetbrains.annotations.Nullable; @@ -42,8 +42,8 @@ public class TxCleanupRequestHandler { /** Tx messages factory. */ private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); - /** Cluster service. */ - private final ClusterService clusterService; + /** Messaging service. */ + private final MessagingService messagingService; /** Lock manager. */ private final LockManager lockManager; @@ -57,18 +57,18 @@ public class TxCleanupRequestHandler { /** * The constructor. * - * @param clusterService Cluster service. + * @param messagingService Messaging service. * @param lockManager Lock manager. * @param clock A hybrid logical clock. * @param writeIntentSwitchProcessor A cleanup processor. */ public TxCleanupRequestHandler( - ClusterService clusterService, + MessagingService messagingService, LockManager lockManager, HybridClock clock, WriteIntentSwitchProcessor writeIntentSwitchProcessor ) { - this.clusterService = clusterService; + this.messagingService = messagingService; this.lockManager = lockManager; this.hybridClock = clock; this.writeIntentSwitchProcessor = writeIntentSwitchProcessor; @@ -78,7 +78,7 @@ public class TxCleanupRequestHandler { * Starts the processor. */ public void start() { - clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> { + messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> { if (msg instanceof TxCleanupMessage) { processTxCleanup((TxCleanupMessage) msg, sender, correlationId); } @@ -133,7 +133,7 @@ public class TxCleanupRequestHandler { }); } - clusterService.messagingService().respond(senderId, msg, correlationId); + messagingService.respond(senderId, msg, correlationId); }); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index f02d0dce80..cd8c65eb67 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -24,7 +24,6 @@ import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITTED; import static org.apache.ignite.internal.tx.TxState.FINISHING; import static org.apache.ignite.internal.tx.TxState.PENDING; -import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness; import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; @@ -86,8 +85,10 @@ import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.TopologyService; import org.jetbrains.annotations.Nullable; /** @@ -146,12 +147,15 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); - /** Cluster service. */ - private final ClusterService clusterService; - /** Detector of transactions that lost the coordinator. */ private final OrphanDetector orphanDetector; + /** Topology service. */ + private final TopologyService topologyService; + + /** Cluster service. */ + private final MessagingService messagingService; + /** Local node network identity. This id is available only after the network has started. */ private String localNodeId; @@ -196,9 +200,10 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { this.lockManager = lockManager; this.clock = clock; this.transactionIdGenerator = transactionIdGenerator; - this.clusterService = clusterService; this.placementDriver = placementDriver; this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; + this.topologyService = clusterService.topologyService(); + this.messagingService = clusterService.messagingService(); placementDriverHelper = new PlacementDriverHelper(placementDriver, clock); @@ -212,14 +217,14 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { new LinkedBlockingQueue<>(), new NamedThreadFactory("tx-async-cleanup", LOG)); - orphanDetector = new OrphanDetector(clusterService.topologyService(), replicaService, placementDriver, lockManager, clock); + orphanDetector = new OrphanDetector(topologyService, replicaService, placementDriverHelper, lockManager); - txMessageSender = new TxMessageSender(clusterService.messagingService(), replicaService, clock); + txMessageSender = new TxMessageSender(messagingService, replicaService, clock); WriteIntentSwitchProcessor writeIntentSwitchProcessor = - new WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, clusterService); + new WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, topologyService); - txCleanupRequestHandler = new TxCleanupRequestHandler(clusterService, lockManager, clock, writeIntentSwitchProcessor); + txCleanupRequestHandler = new TxCleanupRequestHandler(messagingService, lockManager, clock, writeIntentSwitchProcessor); txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, placementDriverHelper, writeIntentSwitchProcessor); } @@ -293,23 +298,13 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } @Override - public TxStateMeta stateMeta(UUID txId) { + public @Nullable TxStateMeta stateMeta(UUID txId) { return inBusyLock(busyLock, () -> txStateVolatileStorage.state(txId)); } @Override public @Nullable <T extends TxStateMeta> T updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater) { - return txStateVolatileStorage.updateMeta(txId, oldMeta -> { - TxStateMeta newMeta = updater.apply(oldMeta); - - if (newMeta == null) { - return null; - } - - TxState oldState = oldMeta == null ? null : oldMeta.txState(); - - return checkTransitionCorrectness(oldState, newMeta.txState()) ? newMeta : oldMeta; - }); + return txStateVolatileStorage.updateMeta(txId, updater); } @Override @@ -590,8 +585,9 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { @Override public CompletableFuture<Void> start() { - localNodeId = clusterService.topologyService().localMember().id(); - clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, this); + localNodeId = topologyService.localMember().id(); + + messagingService.addMessageHandler(ReplicaMessageGroup.class, this); txStateVolatileStorage.start(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java index eccd5d058d..82b7905c1c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.util.CompletableFutures; -import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.TopologyService; import org.jetbrains.annotations.Nullable; /** @@ -42,24 +42,24 @@ public class WriteIntentSwitchProcessor { private final TxMessageSender txMessageSender; - /** Cluster service. */ - private final ClusterService clusterService; + /** Topology service. */ + private final TopologyService topologyService; /** * The constructor. * * @param placementDriverHelper Placement driver helper. * @param txMessageSender Transaction message creator. - * @param clusterService Cluster service. + * @param topologyService Topology service. */ public WriteIntentSwitchProcessor( PlacementDriverHelper placementDriverHelper, TxMessageSender txMessageSender, - ClusterService clusterService + TopologyService topologyService ) { this.placementDriverHelper = placementDriverHelper; this.txMessageSender = txMessageSender; - this.clusterService = clusterService; + this.topologyService = topologyService; } /** @@ -71,7 +71,7 @@ public class WriteIntentSwitchProcessor { boolean commit, @Nullable HybridTimestamp commitTimestamp ) { - String localNodeName = clusterService.topologyService().localMember().name(); + String localNodeName = topologyService.localMember().name(); return txMessageSender.switchWriteIntents(localNodeName, tablePartitionId, txId, commit, commitTimestamp); } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java new file mode 100644 index 0000000000..498d7c2d02 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.tx.impl.PlacementDriverHelper; +import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; +import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender; +import org.apache.ignite.internal.tx.impl.TxMessageSender; +import org.apache.ignite.internal.tx.impl.WriteIntentSwitchProcessor; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.TopologyService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests for a transaction cleanup. + */ +@ExtendWith(MockitoExtension.class) +public class TxCleanupTest extends IgniteAbstractTest { + + private static final ClusterNode LOCAL_NODE = + new ClusterNodeImpl("local_id", "local", new NetworkAddress("127.0.0.1", 2024), null); + + private static final ClusterNode REMOTE_NODE = + new ClusterNodeImpl("remote_id", "remote", new NetworkAddress("127.1.1.1", 2024), null); + + @Mock(answer = RETURNS_DEEP_STUBS) + private MessagingService messagingService; + + @Mock(answer = RETURNS_DEEP_STUBS) + private TopologyService topologyService; + + @Mock(answer = RETURNS_DEEP_STUBS) + private ReplicaService replicaService; + + @Mock + private PlacementDriver placementDriver; + + private final HybridClock clock = new HybridClockImpl(); + + private TxCleanupRequestSender cleanupRequestSender; + + private TransactionIdGenerator idGenerator; + + private WriteIntentSwitchProcessor writeIntentSwitchProcessor; + + private TxMessageSender txMessageSender; + + /** Init test callback. */ + @BeforeEach + public void setup() { + when(topologyService.localMember().address()).thenReturn(LOCAL_NODE.address()); + + when(messagingService.invoke(anyString(), any(), anyLong())).thenReturn(nullCompletedFuture()); + + idGenerator = new TransactionIdGenerator(LOCAL_NODE.name().hashCode()); + + txMessageSender = spy(new TxMessageSender(messagingService, replicaService, clock)); + + PlacementDriverHelper placementDriverHelper = new PlacementDriverHelper(placementDriver, clock); + + writeIntentSwitchProcessor = spy(new WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, topologyService)); + + cleanupRequestSender = new TxCleanupRequestSender(txMessageSender, placementDriverHelper, writeIntentSwitchProcessor); + } + + @Test + void testPrimaryFoundForAllPartitions() { + TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); + TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); + TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); + + Set<TablePartitionId> partitions = Set.of(tablePartitionId1, tablePartitionId2, tablePartitionId3); + + when(placementDriver.getPrimaryReplica(any(), any())) + .thenReturn(completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + + HybridTimestamp beginTimestamp = clock.now(); + UUID txId = idGenerator.transactionIdFor(beginTimestamp); + + HybridTimestamp commitTimestamp = clock.now(); + + CompletableFuture<Void> cleanup = cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId); + + assertThat(cleanup, willCompleteSuccessfully()); + + verifyNoInteractions(writeIntentSwitchProcessor); + verify(txMessageSender, times(1)).cleanup(any(), any(), any(), anyBoolean(), any()); + verifyNoMoreInteractions(txMessageSender); + } + + @Test + void testPrimaryNotFoundForSome() { + TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); + TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); + TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); + + Set<TablePartitionId> partitions = Set.of(tablePartitionId1, tablePartitionId2, tablePartitionId3); + + when(placementDriver.getPrimaryReplica(any(), any())) + .thenReturn(completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any())) + .thenReturn(nullCompletedFuture()); + + when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(), anyLong(), any())) + .thenReturn(completedFuture(new TestReplicaMetaImpl(REMOTE_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + + HybridTimestamp beginTimestamp = clock.now(); + UUID txId = idGenerator.transactionIdFor(beginTimestamp); + + HybridTimestamp commitTimestamp = clock.now(); + + CompletableFuture<Void> cleanup = cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId); + + assertThat(cleanup, willCompleteSuccessfully()); + + verify(txMessageSender, times(1)).switchWriteIntents(any(), any(), any(), anyBoolean(), any()); + verify(txMessageSender, times(1)).cleanup(any(), any(), any(), anyBoolean(), any()); + verifyNoMoreInteractions(txMessageSender); + } + + @Test + void testPrimaryNotFoundForAll() { + TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); + TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); + TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); + + Set<TablePartitionId> partitions = Set.of(tablePartitionId1, tablePartitionId2, tablePartitionId3); + + when(placementDriver.getPrimaryReplica(any(), any())) + .thenReturn(nullCompletedFuture()); + + when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())) + .thenReturn(completedFuture(new TestReplicaMetaImpl(REMOTE_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + + HybridTimestamp beginTimestamp = clock.now(); + UUID txId = idGenerator.transactionIdFor(beginTimestamp); + + HybridTimestamp commitTimestamp = clock.now(); + + CompletableFuture<Void> cleanup = cleanupRequestSender.cleanup(partitions, true, commitTimestamp, txId); + + assertThat(cleanup, willCompleteSuccessfully()); + + verify(txMessageSender, times(3)).switchWriteIntents(any(), any(), any(), anyBoolean(), any()); + verifyNoMoreInteractions(txMessageSender); + } +} diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 22cad83558..1614346449 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx; import static java.lang.Math.abs; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; @@ -39,6 +40,8 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -57,6 +60,7 @@ import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; @@ -95,10 +99,10 @@ public class TxManagerTest extends IgniteAbstractTest { private TxManager txManager; - @Mock + @Mock(answer = RETURNS_DEEP_STUBS) private ClusterService clusterService; - @Mock + @Mock(answer = RETURNS_DEEP_STUBS) private ReplicaService replicaService; private final HybridClock clock = spy(new HybridClockImpl()); @@ -112,12 +116,8 @@ public class TxManagerTest extends IgniteAbstractTest { /** Init test callback. */ @BeforeEach public void setup() { - clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS); - when(clusterService.topologyService().localMember().address()).thenReturn(LOCAL_NODE.address()); - replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS); - when(replicaService.invoke(any(ClusterNode.class), any())).thenReturn(nullCompletedFuture()); when(replicaService.invoke(anyString(), any())).thenReturn(nullCompletedFuture()); @@ -375,18 +375,123 @@ public class TxManagerTest extends IgniteAbstractTest { assertRollbackSucceeds(); } + @Test + public void testPrimaryMissOnFirstCall() { + // First call to the commit partition primary fails with PrimaryReplicaMissException, + // then we retry and the second call succeeds. + when(placementDriver.getPrimaryReplica(any(), any())) + .thenReturn( + completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10))) + ); + when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())) + .thenReturn( + completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10))), + completedFuture(new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(12), HybridTimestamp.MAX_VALUE)) + ); + + HybridTimestamp commitTimestamp = hybridTimestamp(9); + + when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) + .thenReturn( + failedFuture(new PrimaryReplicaMissException( + LOCAL_NODE.name(), + null, + 10L, + null, + null + )), + completedFuture(new TransactionResult(TxState.COMMITTED, commitTimestamp)) + ); + + when(clock.now()).thenReturn(hybridTimestamp(5)); + // Ensure that commit doesn't throw exceptions. + InternalTransaction committedTransaction = prepareTransaction(); + + when(clock.now()).thenReturn(commitTimestamp, hybridTimestamp(13)); + + committedTransaction.commit(); + assertEquals(TxState.COMMITTED, txManager.stateMeta(committedTransaction.id()).txState()); + assertEquals(hybridTimestamp(9), txManager.stateMeta(committedTransaction.id()).commitTimestamp()); + + // Ensure that rollback doesn't throw exceptions. + assertRollbackSucceeds(); + } + @Test public void testFinishExpiredWithNullPrimary() { // Null is returned as primaryReplica during finish phase. when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(nullCompletedFuture()); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); + when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) + .thenReturn(completedFuture(new TransactionResult(TxState.ABORTED, null))); assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause(); assertRollbackSucceeds(); } + @Test + public void testExpiredExceptionDoesNotShadeResponseExceptions() { + // Null is returned as primaryReplica during finish phase. + when(placementDriver.getPrimaryReplica(any(), any())).thenReturn(nullCompletedFuture()); + when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( + new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); + when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) + .thenReturn(failedFuture(new TransactionAlreadyFinishedException( + "TX already finished.", + new TransactionResult(TxState.ABORTED, null) + ))); + + InternalTransaction committedTransaction = prepareTransaction(); + + assertThrowsWithCause(committedTransaction::commit, TransactionAlreadyFinishedException.class); + + assertEquals(TxState.ABORTED, txManager.stateMeta(committedTransaction.id()).txState()); + + assertRollbackSucceeds(); + } + + @Test + public void testOnlyPrimaryExpirationAffectsTransaction() { + // Prepare transaction. + InternalTransaction tx = txManager.begin(hybridTimestampTracker); + + ClusterNode node = mock(ClusterNode.class); + + TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); + tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + tx.assignCommitPartition(tablePartitionId1); + + TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); + tx.enlist(tablePartitionId2, new IgniteBiTuple<>(node, 1L)); + + when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any())) + .thenReturn(completedFuture( + new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(), anyLong(), any())) + .thenReturn(completedFuture( + new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + + lenient().when(placementDriver.getPrimaryReplica(eq(tablePartitionId2), any())) + .thenReturn(nullCompletedFuture()); + lenient().when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId2), any(), anyLong(), any())) + .thenReturn(completedFuture( + new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); + + HybridTimestamp commitTimestamp = clock.now(); + when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) + .thenReturn(completedFuture(new TransactionResult(TxState.COMMITTED, commitTimestamp))); + + // Ensure that commit doesn't throw exceptions. + InternalTransaction committedTransaction = prepareTransaction(); + committedTransaction.commit(); + assertEquals(TxState.COMMITTED, txManager.stateMeta(committedTransaction.id()).txState()); + + // Ensure that rollback doesn't throw exceptions. + assertRollbackSucceeds(); + } + @Test public void testFinishExpiredWithExpiredPrimary() { // Primary with expirationTimestamp less than commitTimestamp is returned. @@ -419,6 +524,8 @@ public class TxManagerTest extends IgniteAbstractTest { new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), HybridTimestamp.MAX_VALUE))); when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), any())).thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2), HybridTimestamp.MAX_VALUE))); + when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) + .thenReturn(completedFuture(new TransactionResult(TxState.ABORTED, null))); assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause(); @@ -438,9 +545,6 @@ public class TxManagerTest extends IgniteAbstractTest { } private void assertCommitThrowsTransactionExceptionWithPrimaryReplicaExpiredExceptionAsCause() { - when(replicaService.invoke(anyString(), any(TxFinishReplicaRequest.class))) - .thenReturn(completedFuture(new TransactionResult(TxState.ABORTED, null))); - InternalTransaction committedTransaction = prepareTransaction(); Throwable throwable = assertThrowsWithCause(committedTransaction::commit, PrimaryReplicaExpiredException.class); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java new file mode 100644 index 0000000000..13b799a1ba --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.Lock; +import org.apache.ignite.internal.tx.LockException; +import org.apache.ignite.internal.tx.LockKey; +import org.apache.ignite.internal.tx.LockMode; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.TopologyService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Test how OrphanDetector reacts on tx lock conflicts. + */ +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) +public class OrphanDetectorTest extends BaseIgniteAbstractTest { + + private static final ClusterNode LOCAL_NODE = + new ClusterNodeImpl("local_id", "local", new NetworkAddress("127.0.0.1", 2024), null); + + private static final ClusterNode REMOTE_NODE = + new ClusterNodeImpl("remote_id", "remote", new NetworkAddress("127.1.1.1", 2024), null); + + @Mock(answer = RETURNS_DEEP_STUBS) + private TopologyService topologyService; + + @Mock(answer = RETURNS_DEEP_STUBS) + private ReplicaService replicaService; + + @Mock + private PlacementDriver placementDriver; + + private final HeapLockManager lockManager = new HeapLockManager(); + + private final HybridClock clock = new HybridClockImpl(); + + @InjectConfiguration + private TransactionConfiguration txConfiguration; + + private VolatileTxStateMetaStorage txStateMetaStorage; + + private TransactionIdGenerator idGenerator; + + @BeforeEach + public void setup() { + idGenerator = new TransactionIdGenerator(LOCAL_NODE.name().hashCode()); + + PlacementDriverHelper placementDriverHelper = new PlacementDriverHelper(placementDriver, clock); + + OrphanDetector orphanDetector = new OrphanDetector(topologyService, replicaService, placementDriverHelper, lockManager); + + txStateMetaStorage = new VolatileTxStateMetaStorage(); + + txStateMetaStorage.start(); + + orphanDetector.start(txStateMetaStorage, txConfiguration.abandonedCheckTs()); + } + + @Test + void testNoTriggerNoState() { + UUID orphanTxId = idGenerator.transactionIdFor(clock.now()); + + RowId rowId = new RowId(0); + + // Coordinator is dead. + when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null); + + lockManager.acquire(orphanTxId, new LockKey(1, rowId), LockMode.X); + + UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); + + // Should trigger lock conflict listener in OrphanDetector. + lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X); + + TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId); + + // OrphanDetector didn't change the state. + assertNull(orphanState); + + verifyNoInteractions(replicaService); + } + + @Test + void testNoTriggerCommittedState() { + UUID orphanTxId = idGenerator.transactionIdFor(clock.now()); + + TablePartitionId tpId = new TablePartitionId(1, 0); + + RowId rowId = new RowId(tpId.partitionId()); + + // Coordinator is dead. + when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null); + + lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), LockMode.X); + + UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); + + TxStateMeta committedState = new TxStateMeta(TxState.COMMITTED, LOCAL_NODE.id(), tpId, clock.now()); + + txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> committedState); + + // Should trigger lock conflict listener in OrphanDetector. + lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X); + + TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId); + + // OrphanDetector didn't change the state. + assertEquals(committedState, orphanState); + + verifyNoInteractions(replicaService); + } + + @Test + void testNoTriggerAbortedState() { + UUID orphanTxId = idGenerator.transactionIdFor(clock.now()); + + TablePartitionId tpId = new TablePartitionId(1, 0); + + RowId rowId = new RowId(tpId.partitionId()); + + // Coordinator is dead. + when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null); + + lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), LockMode.X); + + UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); + + TxStateMeta abortedState = new TxStateMeta(TxState.ABORTED, LOCAL_NODE.id(), tpId, null); + + txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> abortedState); + + // Should trigger lock conflict listener in OrphanDetector. + lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X); + + TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId); + + // OrphanDetector didn't change the state. + assertEquals(abortedState, orphanState); + + verifyNoInteractions(replicaService); + } + + @Test + void testNoTriggerFinishingState() { + UUID orphanTxId = idGenerator.transactionIdFor(clock.now()); + + TablePartitionId tpId = new TablePartitionId(1, 0); + + RowId rowId = new RowId(tpId.partitionId()); + + lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), LockMode.X); + + UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); + + TxStateMeta finishingState = new TxStateMeta(TxState.FINISHING, LOCAL_NODE.id(), tpId, null); + + txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> finishingState); + + // Coordinator is dead. + when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null); + + // Should trigger lock conflict listener in OrphanDetector. + lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X); + + TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId); + + // OrphanDetector didn't change the state. + assertEquals(finishingState, orphanState); + + verifyNoInteractions(replicaService); + } + + @Test + void testNoTriggerCoordinatorAlive() { + UUID orphanTxId = idGenerator.transactionIdFor(clock.now()); + + TablePartitionId tpId = new TablePartitionId(1, 0); + + RowId rowId = new RowId(tpId.partitionId()); + + lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), LockMode.X); + + UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); + + TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null); + + txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState); + + when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(mock(ClusterNode.class)); + + // Should trigger lock conflict listener in OrphanDetector. + lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X); + + TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId); + + // OrphanDetector didn't change the state. + assertEquals(pendingState, orphanState); + + verifyNoInteractions(replicaService); + } + + @Test + void testTriggerOnLockConflictCoordinatorDead() { + UUID orphanTxId = idGenerator.transactionIdFor(clock.now()); + + TablePartitionId tpId = new TablePartitionId(1, 0); + + RowId rowId = new RowId(tpId.partitionId()); + + when(placementDriver.awaitPrimaryReplica(eq(tpId), any(), anyLong(), any())) + .thenReturn(completedFuture(new TestReplicaMetaImpl(REMOTE_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); + + lockManager.acquire(orphanTxId, new LockKey(tpId.tableId(), rowId), LockMode.X); + + UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); + + TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null); + + txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState); + + // Coordinator is dead. + when(topologyService.getById(eq(LOCAL_NODE.id()))).thenReturn(null); + + // Should trigger lock conflict listener in OrphanDetector. + CompletableFuture<Lock> acquire = lockManager.acquire(concurrentTxId, new LockKey(1, rowId), LockMode.X); + + TxStateMeta orphanState = txStateMetaStorage.state(orphanTxId); + + // OrphanDetector didn't change the state. + assertEquals(TxState.ABANDONED, orphanState.txState()); + + // Send tx recovery message. + verify(replicaService).invoke(any(ClusterNode.class), any()); + + assertThat(acquire, willThrow(LockException.class, "Failed to acquire an abandoned lock due to a possible deadlock")); + } +}