This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4c7cf15e52 IGNITE-21071 Rollback the transaction on primary failure if
replication is not finished (#3030)
4c7cf15e52 is described below
commit 4c7cf15e520eee0097712e78591150981307dac6
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Jan 18 19:50:49 2024 +0300
IGNITE-21071 Rollback the transaction on primary failure if replication is
not finished (#3030)
---
.../ignite/client/handler/FakePlacementDriver.java | 11 +
.../internal/compute/ItWorkerShutdownTest.java | 14 +-
.../internal/index/IndexBuildControllerTest.java | 5 +
.../internal/placementdriver/PlacementDriver.java | 10 +
.../placementdriver/TestPlacementDriver.java | 5 +
.../placementdriver/leases/LeaseTracker.java | 64 +++--
.../ignite/internal/raft/RaftGroupServiceImpl.java | 9 +-
.../ignite/internal/table/ItDurableFinishTest.java | 19 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +
.../java/org/apache/ignite/internal/Cluster.java | 25 ++
.../internal/ClusterPerTestIntegrationTest.java | 17 ++
.../ignite/internal/table/ItColocationTest.java | 2 +-
.../internal/table/ItTransactionRecoveryTest.java | 300 ++++++++++++++-------
.../ignite/internal/table/TxAbstractTest.java | 16 +-
.../tx/TransactionAlreadyFinishedException.java | 8 +-
.../tx/impl/PrimaryReplicaExpiredException.java | 2 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 158 ++++++++---
.../tx/AbstractDeadlockPreventionTest.java | 17 ++
.../apache/ignite/internal/tx/TxManagerTest.java | 15 +-
19 files changed, 532 insertions(+), 172 deletions(-)
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 5853de37f1..e110c38fe7 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -103,6 +103,17 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return nullCompletedFuture();
}
+ @Override
+ public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+ TablePartitionId id = (TablePartitionId) groupId;
+
+ if (returnError) {
+ throw new RuntimeException("FakePlacementDriver expected error");
+ } else {
+ return primaryReplicas.get(id.partitionId());
+ }
+ }
+
private static ReplicaMeta getReplicaMeta(String leaseholder, long
leaseStartTime) {
//noinspection serial
return new ReplicaMeta() {
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
index 2dcac4163c..307ae2c627 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
@@ -223,7 +223,7 @@ class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest {
UUID jobIdBeforeFail = idSync(execution);
// When stop worker node.
- stopNode(workerNodeName);
+ stopNodeByName(workerNodeName);
// And remove it from candidates.
remoteWorkerCandidates.remove(workerNodeName);
@@ -273,7 +273,7 @@ class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest {
checkGlobalInteractiveJobAlive(execution);
// When stop worker node.
- stopNode(workerNodeName);
+ stopNodeByName(workerNodeName);
// Then the job is failed, because there is no any failover worker.
assertThat(execution.resultAsync(), willThrow(IgniteException.class));
@@ -295,7 +295,7 @@ class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest {
assertThat(getWorkerNodeNameFromGlobalInteractiveJob(),
equalTo(entryNode.name()));
// When stop entry node.
- stopNode(entryNode.name());
+ stopNodeByName(entryNode.name());
// Then the job is failed, because there is no any failover worker.
assertThat(execution.resultAsync().isCompletedExceptionally(),
equalTo(true));
@@ -321,7 +321,7 @@ class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest {
executions.forEach(ItWorkerShutdownTest::checkInteractiveJobAlive);
// When stop one of workers.
- stopNode(node(1).name());
+ stopNodeByName(node(1).name());
// Then two jobs are alive.
executions.forEach((node, execution) -> {
@@ -355,7 +355,7 @@ class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest {
checkGlobalInteractiveJobAlive(execution);
// When stop worker node.
- stopNode(workerNodeName);
+ stopNodeByName(workerNodeName);
// And remove it from candidates.
remoteWorkerCandidates.remove(workerNodeName);
@@ -417,10 +417,10 @@ class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest {
}
private void stopNode(IgniteImpl ignite) {
- stopNode(ignite.name());
+ stopNodeByName(ignite.name());
}
- private void stopNode(String name) {
+ private void stopNodeByName(String name) {
int ind = NODES_NAMES_TO_INDEXES.get(name);
node(ind).stop();
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 1b5d6f1169..2423e913f7 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -299,6 +299,11 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
throw new UnsupportedOperationException();
}
+ @Override
+ public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+ return primaryReplicaMetaFutureById.get(groupId).join();
+ }
+
CompletableFuture<Void> setPrimaryReplicaMeta(
long causalityToken,
TablePartitionId replicaId,
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index 73d887cbb4..fbc3d564c7 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
/**
* Service that provides an ability to await and retrieve primary replicas for
replication groups.
@@ -77,4 +78,13 @@ public interface PlacementDriver extends
EventProducer<PrimaryReplicaEvent, Prim
* @return Future.
*/
CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId);
+
+ /**
+ * Returns the current knowledge about the lease on the local node.
+ *
+ * @param groupId Group id.
+ * @return Current lease.
+ */
+ @Nullable
+ ReplicaMeta currentLease(ReplicationGroupId groupId);
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 9fd5940041..9ea1b14ce0 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -87,6 +87,11 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return nullCompletedFuture();
}
+ @Override
+ public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+ return getReplicaMetaFuture().join();
+ }
+
@Override
public CompletableFuture<Void> fireEvent(PrimaryReplicaEvent event,
PrimaryReplicaEventParameters parameters) {
return super.fireEvent(event, parameters);
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index d44d491fd7..fc8e50e40c 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -135,6 +135,15 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
return expirationFutureByGroup.getOrDefault(grpId,
nullCompletedFuture());
}
+ @Override
+ public ReplicaMeta currentLease(ReplicationGroupId groupId) {
+ return inBusyLock(busyLock, () -> {
+ Lease lease = getLease(groupId);
+
+ return lease.isAccepted() ? lease : null;
+ });
+ }
+
/**
* Gets a lease for a particular group.
*
@@ -162,6 +171,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
public CompletableFuture<Void> onUpdate(WatchEvent event) {
return inBusyLockAsync(busyLock, () -> {
List<CompletableFuture<?>> fireEventFutures = new
ArrayList<>();
+ List<Lease> expiredLeases = new ArrayList<>();
for (EntryEvent entry : event.entryEvents()) {
Entry msEntry = entry.newEntry();
@@ -188,18 +198,26 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
}
}
- firePrimaryReplicaExpiredEventIfNeeded(grpId,
event.revision(), lease);
+ if (needToFireEventReplicaExpired(grpId, lease)) {
+
expiredLeases.add(leases.leaseByGroupId().get(grpId));
+ }
}
for (ReplicationGroupId grpId :
leases.leaseByGroupId().keySet()) {
if (!leasesMap.containsKey(grpId)) {
tryRemoveTracker(grpId);
- firePrimaryReplicaExpiredEventIfNeeded(grpId,
event.revision(), null);
+ if (needToFireEventReplicaExpired(grpId, null)) {
+
expiredLeases.add(leases.leaseByGroupId().get(grpId));
+ }
}
}
leases = new Leases(unmodifiableMap(leasesMap),
leasesBytes);
+
+ for (Lease expiredLease : expiredLeases) {
+ firePrimaryReplicaExpiredEvent(event.revision(),
expiredLease);
+ }
}
return
allOf(fireEventFutures.toArray(CompletableFuture[]::new));
@@ -308,10 +326,10 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
* Fires the primary replica expire event if it needs.
*
* @param grpId Group id, used for the cases when the {@code lease}
parameter is null. Should be always not null.
- * @param causalityToken Causality token.
* @param lease Lease to check on expiration.
+ * @return Whether the event is needed.
*/
- private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId
grpId, long causalityToken, @Nullable Lease lease) {
+ private boolean needToFireEventReplicaExpired(ReplicationGroupId grpId,
@Nullable Lease lease) {
assert lease == null || lease.replicationGroupId().equals(grpId)
: IgniteStringFormatter.format("Group id mismatch [groupId={},
lease={}]", grpId, lease);
@@ -321,20 +339,34 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
boolean sameLease = lease != null &&
currentLease.getStartTime().equals(lease.getStartTime());
if (!sameLease) {
- CompletableFuture<Void> prev =
expirationFutureByGroup.put(grpId, fireEvent(
- PRIMARY_REPLICA_EXPIRED,
- new PrimaryReplicaEventParameters(
- causalityToken,
- grpId,
- currentLease.getLeaseholderId(),
- currentLease.getLeaseholder(),
- currentLease.getStartTime()
- )
- ));
-
- assert prev == null || prev.isDone() : "Previous lease
expiration process has not completed yet [grpId=" + grpId + ']';
+ return true;
}
}
+
+ return false;
+ }
+
+ /**
+ * Fires the primary replica expire event.
+ *
+ * @param causalityToken Causality token.
+ * @param expiredLease Expired lease.
+ */
+ private void firePrimaryReplicaExpiredEvent(long causalityToken, Lease
expiredLease) {
+ ReplicationGroupId grpId = expiredLease.replicationGroupId();
+
+ CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId,
fireEvent(
+ PRIMARY_REPLICA_EXPIRED,
+ new PrimaryReplicaEventParameters(
+ causalityToken,
+ grpId,
+ expiredLease.getLeaseholderId(),
+ expiredLease.getLeaseholder(),
+ expiredLease.getStartTime()
+ )
+ ));
+
+ assert prev == null || prev.isDone() : "Previous lease expiration
process has not completed yet [grpId=" + grpId + ']';
}
private CompletableFuture<Void> fireEventReplicaBecomePrimary(long
causalityToken, Lease lease) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index f0f3ec1304..1441ca84d7 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -587,15 +587,18 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
CompletableFuture<? extends NetworkMessage> fut
) {
if (recoverable(err)) {
+ Peer randomPeer = randomNode(peer);
+
LOG.warn(
"Recoverable error during the request occurred (will be
retried on the randomly selected node) "
- + "[request={}, peer={}].",
+ + "[request={}, peer={}, newPeer={}].",
err,
sentRequest,
- peer
+ peer,
+ randomPeer
);
- scheduleRetry(() -> sendWithRetry(randomNode(peer),
requestFactory, stopTime, fut));
+ scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory,
stopTime, fut));
} else {
fut.completeExceptionally(err);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index ab22a7f004..90f3e8481b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
@@ -76,9 +77,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
throws ExecutionException, InterruptedException {
createTestTableWith3Replicas();
- TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
-
- var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+ var tblReplicationGrp = defaultTablePartitionId(node(0));
CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
@@ -101,6 +100,8 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
Tuple keyTpl = Tuple.create().set("key", 42);
Tuple tpl = Tuple.create().set("key", 42).set("val", "val 42");
+ TableImpl tbl = (TableImpl) coordinatorNode.tables().table(TABLE_NAME);
+
tbl.recordView().upsert(rwTx, tpl);
msgConf.accept(primaryNode, coordinatorNode, tbl, rwTx);
@@ -108,6 +109,12 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
finisher.accept(rwTx, tbl, keyTpl);
}
+ private TablePartitionId defaultTablePartitionId(IgniteImpl node) {
+ TableImpl table = (TableImpl) node.tables().table(TABLE_NAME);
+
+ return new TablePartitionId(table.tableId(), 0);
+ }
+
private void commitRow(InternalTransaction rwTx, TableImpl tbl, Tuple
keyTpl) {
rwTx.commit();
@@ -139,12 +146,16 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
AtomicBoolean dropMessage = new AtomicBoolean(true);
+ CountDownLatch commitStartedLatch = new CountDownLatch(1);
+
// Make sure the finish message is prepared, i.e. the outcome, commit
timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
coordinatorMessaging.dropMessages((s, networkMessage) -> {
if (networkMessage instanceof TxFinishReplicaRequest &&
dropMessage.get()) {
logger().info("Dropping: {}.", networkMessage);
+ commitStartedLatch.countDown();
+
return true;
}
@@ -156,6 +167,8 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
// will run in the current thread.
CompletableFuture.runAsync(() -> {
try {
+ commitStartedLatch.await();
+
logger().info("Start transferring primary.");
NodeUtils.transferPrimary(tbl, null, this::node);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6afc21aa95..d8611e60dc 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -164,6 +164,7 @@ import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.VaultService;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.DefaultMessagingService;
@@ -1188,6 +1189,12 @@ public class IgniteImpl implements Ignite {
return ((DefaultMessagingService)
clusterSvc.messagingService()).dropMessagesPredicate();
}
+ // TODO IGNITE-18493 - remove/move this
+ @TestOnly
+ public void sendFakeMessage(String recipientConsistentId, NetworkMessage
msg) {
+ clusterSvc.messagingService().send(recipientConsistentId,
ChannelType.DEFAULT, msg);
+ }
+
// TODO: IGNITE-18493 - remove/move this
@TestOnly
public void stopDroppingMessages() {
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index f69509742c..c54a802490 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -378,6 +378,31 @@ public class Cluster {
nodes.set(index, null);
}
+ /**
+ * Stops a node by name.
+ *
+ * @param name Name of the node in the cluster.
+ */
+ public void stopNode(String name) {
+ stopNode(nodeIndex(name));
+ }
+
+ /**
+ * Returns index of the node.
+ *
+ * @param name Node name.
+ * @return Node index.
+ */
+ public int nodeIndex(String name) {
+ for (int i = 0; i < nodes.size(); i++) {
+ if (nodes.get(i) != null && nodes.get(i).name().equals(name)) {
+ return i;
+ }
+ }
+
+ throw new IllegalArgumentException("Node is not found: " + name);
+ }
+
/**
* Restarts a node by index.
*
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index cc1d6fc133..ed657d7e85 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import java.nio.file.Path;
import java.util.List;
+import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -171,6 +172,22 @@ public abstract class ClusterPerTestIntegrationTest
extends IgniteIntegrationTes
cluster.stopNode(nodeIndex);
}
+ /**
+ * Stops a node by name.
+ *
+ * @param name Name of the node in the cluster.
+ */
+ protected void stopNode(String name) {
+ cluster.stopNode(name);
+ }
+
+ /**
+ * Returns nodes that are started and not stopped. This can include
knocked out nodes.
+ */
+ protected final Stream<IgniteImpl> runningNodes() {
+ return cluster.runningNodes();
+ }
+
/**
* Restarts a node by index.
*
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index c72763b031..c0b023a208 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -173,7 +173,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- boolean commit,
+ boolean commitIntent,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
similarity index 77%
rename from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
rename to
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index 694f27620e..d72df1b78e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -26,6 +26,9 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -34,13 +37,19 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.stream.IntStream;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse;
+import
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -60,6 +69,7 @@ import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
@@ -70,6 +80,8 @@ import org.junit.jupiter.api.TestInfo;
* Abandoned transactions integration tests.
*/
public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest {
+ private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
/** Table name. */
private static final String TABLE_NAME = "test_table";
@@ -104,16 +116,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -168,16 +171,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -220,16 +214,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -275,16 +260,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -329,16 +305,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -402,16 +369,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -480,16 +438,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -564,16 +513,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -650,16 +590,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node(0).clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String leaseholder = primaryReplicaFut.join().getLeaseholder();
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
@@ -690,6 +621,156 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
assertThat(finish2,
willThrow(TransactionAlreadyFinishedException.class));
}
+ @Test
+ public void testPrimaryFailureRightAfterCommitMsg() throws Exception {
+ TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+ IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+ log.info("Transaction commit partition is determined [node={}].",
commitPartNode.name());
+
+ IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+ log.info("Transaction coordinator node is determined [node={}].",
txCrdNode.name());
+
+ Transaction rwTx1 = createRwTransaction(txCrdNode);
+
+ CompletableFuture<?> commitMsgSentFut = new CompletableFuture<>();
+ CompletableFuture<?> cancelLeaseFuture = new CompletableFuture<>();
+
+ txCrdNode.dropMessages((nodeName, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ boolean isFirst = !commitMsgSentFut.isDone();
+
+ if (isFirst) {
+ commitMsgSentFut.complete(null);
+
+ return true;
+ } else {
+ cancelLeaseFuture.join();
+
+ return false;
+ }
+ }
+
+ return false;
+ });
+
+ CompletableFuture<Void> commitFut = rwTx1.commitAsync();
+
+ assertThat(commitMsgSentFut, willCompleteSuccessfully());
+
+ cancelLease(commitPartNode, tblReplicationGrp);
+
+ waitAndGetLeaseholder(txCrdNode, tblReplicationGrp);
+
+ cancelLeaseFuture.complete(null);
+
+ assertThat(commitFut, willCompleteSuccessfully());
+
+ RecordView<Tuple> view =
txCrdNode.tables().table(TABLE_NAME).recordView();
+
+ var rec = view.get(null, Tuple.create().set("key", 42));
+
+ assertNotNull(rec);
+ assertEquals((Integer) 42, rec.value("key"));
+ assertEquals("val1", rec.value("val"));
+ }
+
+ @Test
+ public void testPrimaryFailureWhileInflightInProgress() throws Exception {
+ TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+ IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+ log.info("Transaction commit partition is determined [node={}].",
commitPartNode.name());
+
+ IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+ log.info("Transaction coordinator node is determined [node={}].",
txCrdNode.name());
+
+ Transaction rwTx1 = createRwTransaction(txCrdNode);
+
+ txCrdNode.dropMessages((nodeName, msg) -> {
+ if (msg instanceof ReadWriteSingleRowReplicaRequest) {
+ return true;
+ }
+
+ return false;
+ });
+
+ assertThrows(TransactionException.class, () -> {
+ RecordView<Tuple> view =
txCrdNode.tables().table(TABLE_NAME).recordView();
+ view.upsert(rwTx1, Tuple.create().set("key", 1).set("val",
"val1"));
+ });
+
+ CompletableFuture<Void> commitFut = rwTx1.commitAsync();
+
+ commitPartNode.stop();
+
+ assertThat(commitFut, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse()
throws Exception {
+ TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+ IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+ log.info("Transaction commit partition is determined [node={}].",
commitPartNode.name());
+
+ IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+ log.info("Transaction coordinator node is determined [node={}].",
txCrdNode.name());
+
+ CompletableFuture<?> firstResponseSent = new CompletableFuture<>();
+
+ commitPartNode.dropMessages((nodeName, msg) -> {
+ if (msg instanceof TimestampAwareReplicaResponse) {
+ TimestampAwareReplicaResponse response =
(TimestampAwareReplicaResponse) msg;
+
+ if (response.result() == null) {
+ firstResponseSent.complete(null);
+ }
+
+ // This means this is the second response that finishes an
in-flight future.
+ if (response.result() instanceof UUID) {
+ return true;
+ }
+ }
+
+ return false;
+ });
+
+ Transaction rwTx1 = createRwTransaction(txCrdNode);
+
+ CompletableFuture<Void> commitFut = rwTx1.commitAsync();
+
+ assertThat(firstResponseSent, willCompleteSuccessfully());
+
+ cancelLease(commitPartNode, tblReplicationGrp);
+
+ assertThat(commitFut,
willThrow(TransactionAlreadyFinishedException.class, 30, SECONDS));
+
+ RecordView<Tuple> view =
txCrdNode.tables().table(TABLE_NAME).recordView();
+
+ var rec = view.get(null, Tuple.create().set("key", 42));
+
+ assertNull(rec);
+ }
+
private DefaultMessagingService messaging(IgniteImpl node) {
ClusterService coordinatorService =
IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "clusterSvc");
@@ -802,19 +883,42 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
return rwTx1;
}
- private IgniteImpl commitPartitionPrimaryNode(String leaseholder) {
- return IntStream.range(0, initialNodes())
+ private IgniteImpl findNode(int startRange, int endRange,
Predicate<IgniteImpl> filter) {
+ return IntStream.range(startRange, endRange)
.mapToObj(this::node)
- .filter(n -> leaseholder.equals(n.name()))
+ .filter(filter::test)
.findFirst()
.get();
}
+ private IgniteImpl commitPartitionPrimaryNode(String leaseholder) {
+ return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
+ }
+
private IgniteImpl nonPrimaryNode(String leaseholder) {
- return IntStream.range(1, initialNodes())
- .mapToObj(this::node)
- .filter(n -> !leaseholder.equals(n.name()))
- .findFirst()
- .get();
+ return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
+ }
+
+ private String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId
tblReplicationGrp) throws InterruptedException {
+ CompletableFuture<ReplicaMeta> primaryReplicaFut =
node.placementDriver().awaitPrimaryReplica(
+ tblReplicationGrp,
+ node.clock().now(),
+ 10,
+ SECONDS
+ );
+
+ assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+ return primaryReplicaFut.join().getLeaseholder();
+ }
+
+ private void cancelLease(IgniteImpl leaseholder, ReplicationGroupId
groupId) {
+ StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY
+ .stopLeaseProlongationMessage()
+ .groupId(groupId)
+ .build();
+
+ // Just sent it to all nodes to not determine the exact placement
driver active actor.
+ runningNodes().forEach(node ->
leaseholder.sendFakeMessage(node.name(), msg));
}
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5499863538..b3505d2d21 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -429,7 +430,13 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
});
var err = assertThrows(CompletionException.class, fut0::join);
- assertEquals(IllegalArgumentException.class,
err.getCause().getClass());
+
+ try {
+ assertInstanceOf(IllegalArgumentException.class, err.getCause());
+ } catch (AssertionError e) {
+ throw new AssertionError("Unexpected exception type", err);
+ }
+
assertEquals(balance, view.get(null,
makeKey(1)).doubleValue("balance"));
}
@@ -449,7 +456,12 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
});
var err = assertThrows(CompletionException.class, fut0::join);
- assertEquals(NullPointerException.class, err.getCause().getClass());
+
+ try {
+ assertInstanceOf(NullPointerException.class, err.getCause());
+ } catch (AssertionError e) {
+ throw new AssertionError("Unexpected exception type", err);
+ }
}
@Test
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
index 2121aafa3d..6d14d467fa 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAlreadyFinishedException.java
@@ -31,11 +31,15 @@ public class TransactionAlreadyFinishedException extends
TransactionException {
/** Stored transaction result. */
private final TransactionResult transactionResult;
- public TransactionAlreadyFinishedException(String message,
TransactionResult transactionResult) {
- super(TX_UNEXPECTED_STATE_ERR, message);
+ public TransactionAlreadyFinishedException(int errorCode, String message,
TransactionResult transactionResult, Throwable cause) {
+ super(errorCode, message, cause);
this.transactionResult = transactionResult;
}
+ public TransactionAlreadyFinishedException(String message,
TransactionResult transactionResult) {
+ this(TX_UNEXPECTED_STATE_ERR, message, transactionResult, null);
+ }
+
public TransactionResult transactionResult() {
return transactionResult;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
index a684f6fc89..fdbbaffdfb 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
@@ -38,7 +38,7 @@ public class PrimaryReplicaExpiredException extends
IgniteInternalException {
public PrimaryReplicaExpiredException(
ReplicationGroupId groupId,
Long expectedEnlistmentConsistencyToken,
- HybridTimestamp commitTimestamp,
+ @Nullable HybridTimestamp commitTimestamp,
@Nullable ReplicaMeta currentPrimaryReplica
) {
super(
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index cd8c65eb67..e40523e56d 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -25,10 +26,12 @@ import static
org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.io.IOException;
@@ -53,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -60,10 +64,14 @@ import
org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
@@ -174,6 +182,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
*/
private final TxMessageSender txMessageSender;
+ private final EventListener<PrimaryReplicaEventParameters>
primaryReplicaEventListener;
+
/**
* The constructor.
*
@@ -204,6 +214,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
this.idleSafeTimePropagationPeriodMsSupplier =
idleSafeTimePropagationPeriodMsSupplier;
this.topologyService = clusterService.topologyService();
this.messagingService = clusterService.messagingService();
+ this.primaryReplicaEventListener = this::primaryReplicaEventListener;
placementDriverHelper = new PlacementDriverHelper(placementDriver,
clock);
@@ -229,6 +240,30 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, writeIntentSwitchProcessor);
}
+ private CompletableFuture<Boolean>
primaryReplicaEventListener(PrimaryReplicaEventParameters eventParameters,
Throwable err) {
+ return inBusyLock(busyLock, () -> {
+ if (!(eventParameters.groupId() instanceof TablePartitionId)) {
+ return falseCompletedFuture();
+ }
+
+ TablePartitionId groupId = (TablePartitionId)
eventParameters.groupId();
+
+ for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
+ TxContext txContext = ctxEntry.getValue();
+
+ if (txContext.isTxFinishing()) {
+ Long enlistmentConsistencyToken =
txContext.enlistedGroups.get(groupId);
+
+ if (enlistmentConsistencyToken != null) {
+ txContext.cancelWaitingInflights(groupId,
enlistmentConsistencyToken);
+ }
+ }
+ }
+
+ return falseCompletedFuture();
+ });
+ }
+
@Override
public InternalTransaction begin(HybridTimestampTracker timestampTracker) {
return begin(timestampTracker, false, TxPriority.NORMAL);
@@ -330,17 +365,19 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- boolean commit,
+ boolean commitIntent,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
- LOG.debug("Finish [commit={}, txId={}, groups={}].", commit, txId,
enlistedGroups);
+ LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent,
txId, enlistedGroups);
assert enlistedGroups != null;
if (enlistedGroups.isEmpty()) {
// If there are no enlisted groups, just update local state - we
already marked the tx as finished.
- updateTxMeta(txId, old -> new TxStateMeta(commit ? COMMITTED :
ABORTED, localNodeId, commitPartition, commitTimestamp(commit)));
+ updateTxMeta(txId, old -> new TxStateMeta(
+ commitIntent ? COMMITTED : ABORTED, localNodeId,
commitPartition, commitTimestamp(commitIntent)
+ ));
return nullCompletedFuture();
}
@@ -367,27 +404,17 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
// If the state is FINISHING then someone else hase in in the
middle of finishing this tx.
if (stateMeta.txState() == FINISHING) {
return ((TxStateMetaFinishing) stateMeta).txFinishFuture()
- .thenCompose(meta -> checkTxOutcome(commit, txId,
meta));
+ .thenCompose(meta -> checkTxOutcome(commitIntent,
txId, meta));
} else {
// The TX has already been finished. Check whether it finished
with the same outcome.
- return checkTxOutcome(commit, txId, stateMeta);
+ return checkTxOutcome(commitIntent, txId, stateMeta);
}
}
- TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
- if (tuple0 == null) {
- tuple0 = new TxContext(); // No writes enlisted.
- }
-
- assert !tuple0.isTxFinishing() : "Transaction is already finished
[id=" + uuid + "].";
-
- tuple0.finishTx();
-
- return tuple0;
- });
+ TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups,
commitIntent);
// Wait for commit acks first, then proceed with the finish request.
- return tuple.performFinish(commit, ignored ->
+ return txContext.performFinish(commitIntent, commit ->
prepareFinish(
observableTimestampTracker,
commitPartition,
@@ -398,12 +425,26 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
));
}
+ private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId,
Long> enlistedGroups, boolean commitIntent) {
+ return txCtxMap.compute(txId, (uuid, tuple0) -> {
+ if (tuple0 == null) {
+ tuple0 = new TxContext(placementDriver); // No writes enlisted.
+ }
+
+ assert !tuple0.isTxFinishing() : "Transaction is already finished
[id=" + uuid + "].";
+
+ tuple0.finishTx(enlistedGroups);
+
+ return tuple0;
+ });
+ }
+
private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID
txId, TransactionMeta stateMeta) {
if ((stateMeta.txState() == COMMITTED) == commit) {
return nullCompletedFuture();
}
- return CompletableFuture.failedFuture(new
TransactionAlreadyFinishedException(
+ return failedFuture(new TransactionAlreadyFinishedException(
"Failed to change the outcome of a finished transaction
[txId=" + txId + ", txState=" + stateMeta.txState() + "].",
new TransactionResult(stateMeta.txState(),
stateMeta.commitTimestamp()))
);
@@ -440,7 +481,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
txFinishFuture);
})
.thenCompose(Function.identity())
- // verification future is added in order to share proper
exception with the client
+ // Verification future is added in order to share the proper
verification exception with the client.
.thenCompose(r -> verificationFuture);
}
@@ -595,6 +636,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
txCleanupRequestHandler.start();
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaEventListener);
+
return nullCompletedFuture();
}
@@ -614,6 +657,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
txCleanupRequestHandler.stop();
+
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
primaryReplicaEventListener);
+
shutdownAndAwaitTermination(cleanupExecutor, 10, TimeUnit.SECONDS);
}
@@ -683,7 +728,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
txCtxMap.compute(txId, (uuid, tuple) -> {
if (tuple == null) {
- tuple = new TxContext();
+ tuple = new TxContext(placementDriver);
}
if (tuple.isTxFinishing()) {
@@ -727,9 +772,9 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
}
// Process directly sent response.
- ReplicaResponse request = (ReplicaResponse) message;
+ ReplicaResponse response = (ReplicaResponse) message;
- Object result = request.result();
+ Object result = response.result();
if (result instanceof UUID) {
removeInflight((UUID) result);
@@ -769,7 +814,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
"Commit timestamp is greater than
primary replica expiration timestamp:"
+ " [groupId = {}, commit
timestamp = {}, primary replica expiration timestamp = {}]",
groupId, commitTimestamp,
currentPrimaryReplica.getExpirationTime());
-
}
});
}
@@ -780,25 +824,63 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
private static class TxContext {
volatile long inflights = 0; // Updated under lock.
private final CompletableFuture<Void> waitRepFut = new
CompletableFuture<>();
+ private final PlacementDriver placementDriver;
volatile CompletableFuture<Void> finishInProgressFuture = null;
+ volatile Map<TablePartitionId, Long> enlistedGroups;
+
+ private TxContext(PlacementDriver placementDriver) {
+ this.placementDriver = placementDriver;
+ }
- CompletableFuture<Void> performFinish(boolean commit, Function<Void,
CompletableFuture<Void>> finishAction) {
+ CompletableFuture<Void> performFinish(boolean commit,
Function<Boolean, CompletableFuture<Void>> finishAction) {
waitReadyToFinish(commit)
- .thenCompose(finishAction)
- .handle((ignored, err) -> {
- if (err == null) {
- finishInProgressFuture.complete(null);
- } else {
- finishInProgressFuture.completeExceptionally(err);
- }
- return null;
- });
+ .whenComplete((ignoredReadyToFinish, readyException) ->
finishAction.apply(commit && readyException == null)
+ .whenComplete((ignoredFinishActionResult,
finishException) ->
+ completeFinishInProgressFuture(commit,
readyException, finishException))
+ );
return finishInProgressFuture;
}
+ private void completeFinishInProgressFuture(
+ boolean commit,
+ @Nullable Throwable readyToFinishException,
+ @Nullable Throwable finishException
+ ) {
+ if (readyToFinishException == null) {
+ if (finishException == null) {
+ finishInProgressFuture.complete(null);
+ } else {
+
finishInProgressFuture.completeExceptionally(finishException);
+ }
+ } else {
+ if (commit && readyToFinishException instanceof
PrimaryReplicaExpiredException) {
+ finishInProgressFuture.completeExceptionally(new
TransactionAlreadyFinishedException(
+ TX_PRIMARY_REPLICA_EXPIRED_ERR,
+ "Failed to commit the transaction.",
+ new TransactionResult(ABORTED, null),
+ readyToFinishException
+ ));
+ } else {
+
finishInProgressFuture.completeExceptionally(readyToFinishException);
+ }
+ }
+ }
+
private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
- return commit ? waitNoInflights() : nullCompletedFuture();
+ if (commit) {
+ for (Map.Entry<TablePartitionId, Long> e :
enlistedGroups.entrySet()) {
+ ReplicaMeta replicaMeta =
placementDriver.currentLease(e.getKey());
+
+ if (replicaMeta == null ||
!e.getValue().equals(replicaMeta.getStartTime().longValue())) {
+ return failedFuture(new
PrimaryReplicaExpiredException(e.getKey(), e.getValue(), null, replicaMeta));
+ }
+ }
+
+ return waitNoInflights();
+ } else {
+ return nullCompletedFuture();
+ }
}
private CompletableFuture<Void> waitNoInflights() {
@@ -808,13 +890,18 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return waitRepFut;
}
+ private void cancelWaitingInflights(TablePartitionId groupId, Long
enlistmentConsistencyToken) {
+ waitRepFut.completeExceptionally(new
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null,
null));
+ }
+
void onRemovedInflights() {
if (inflights == 0 && finishInProgressFuture != null) {
waitRepFut.complete(null);
}
}
- void finishTx() {
+ void finishTx(Map<TablePartitionId, Long> enlistedGroups) {
+ this.enlistedGroups = enlistedGroups;
finishInProgressFuture = new CompletableFuture<>();
}
@@ -832,6 +919,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
private static final Set<Class<? extends Throwable>> RECOVERABLE =
Set.of(
TimeoutException.class,
IOException.class,
+ ReplicationException.class,
ReplicationTimeoutException.class,
PrimaryReplicaMissException.class
);
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
index f32ef4e0a6..dc66dbec6c 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
@@ -288,6 +288,23 @@ public abstract class AbstractDeadlockPreventionTest
extends AbstractLockingTest
assertThat(futTx1, willSucceedFast());
}
+ @Test
+ public void testIncompatibleLockRetry() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx1, k), willSucceedFast());
+ assertThat(slock(tx2, k), willSucceedFast());
+
+ assertFutureFailsOrWaitsForTimeout(() -> xlock(tx2, k));
+
+ commitTx(tx1);
+
+ assertThat(xlock(tx2, k), willSucceedFast());
+ }
+
/**
* This method checks lock future of conflicting transaction provided by
supplier, in a way depending on deadlock prevention policy.
* If the policy does not allow wait on conflict (wait timeout is equal to
{@code 0}) then the future must be failed with
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 1614346449..287dde26cf 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -360,6 +360,8 @@ public class TxManagerTest extends IgniteAbstractTest {
// Same primary that was enlisted is returned during finish phase and
commitTimestamp is less that primary.expirationTimestamp.
when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
HybridTimestamp.MAX_VALUE)));
+ when(placementDriver.currentLease(any())).thenReturn(
+ new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
HybridTimestamp.MAX_VALUE));
when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
HybridTimestamp.MAX_VALUE)));
@@ -383,6 +385,8 @@ public class TxManagerTest extends IgniteAbstractTest {
.thenReturn(
completedFuture(new TestReplicaMetaImpl(LOCAL_NODE,
hybridTimestamp(1), hybridTimestamp(10)))
);
+ when(placementDriver.currentLease(any()))
+ .thenReturn(new TestReplicaMetaImpl(LOCAL_NODE,
hybridTimestamp(1), hybridTimestamp(10)));
when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any()))
.thenReturn(
completedFuture(new TestReplicaMetaImpl(LOCAL_NODE,
hybridTimestamp(1), hybridTimestamp(10))),
@@ -420,7 +424,7 @@ public class TxManagerTest extends IgniteAbstractTest {
@Test
public void testFinishExpiredWithNullPrimary() {
// Null is returned as primaryReplica during finish phase.
- when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(nullCompletedFuture());
+ when(placementDriver.currentLease(any())).thenReturn(null);
when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
hybridTimestamp(10))));
when(replicaService.invoke(anyString(),
any(TxFinishReplicaRequest.class)))
@@ -434,7 +438,7 @@ public class TxManagerTest extends IgniteAbstractTest {
@Test
public void testExpiredExceptionDoesNotShadeResponseExceptions() {
// Null is returned as primaryReplica during finish phase.
- when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(nullCompletedFuture());
+ when(placementDriver.currentLease(any())).thenReturn(null);
when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
hybridTimestamp(10))));
when(replicaService.invoke(anyString(),
any(TxFinishReplicaRequest.class)))
@@ -469,6 +473,8 @@ public class TxManagerTest extends IgniteAbstractTest {
when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any()))
.thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE,
hybridTimestamp(1), HybridTimestamp.MAX_VALUE)));
+ when(placementDriver.currentLease(eq(tablePartitionId1)))
+ .thenReturn(new TestReplicaMetaImpl(LOCAL_NODE,
hybridTimestamp(1), HybridTimestamp.MAX_VALUE));
when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId1), any(),
anyLong(), any()))
.thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE,
hybridTimestamp(1), HybridTimestamp.MAX_VALUE)));
@@ -499,6 +505,7 @@ public class TxManagerTest extends IgniteAbstractTest {
// given test checks that an assertion exception will be thrown and
wrapped with proper transaction public one.
when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
hybridTimestamp(10))));
+ when(placementDriver.currentLease(any())).thenReturn(new
TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)));
when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1),
hybridTimestamp(10))));
when(replicaService.invoke(anyString(),
any(TxFinishReplicaRequest.class)))
@@ -520,8 +527,8 @@ public class TxManagerTest extends IgniteAbstractTest {
@Test
public void testFinishExpiredWithDifferentEnlistmentConsistencyToken() {
// Primary with another enlistment consistency token is returned.
- when(placementDriver.getPrimaryReplica(any(),
any())).thenReturn(completedFuture(
- new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2),
HybridTimestamp.MAX_VALUE)));
+ when(placementDriver.currentLease(any())).thenReturn(
+ new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2),
HybridTimestamp.MAX_VALUE));
when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(),
any())).thenReturn(completedFuture(
new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(2),
HybridTimestamp.MAX_VALUE)));
when(replicaService.invoke(anyString(),
any(TxFinishReplicaRequest.class)))