This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6acdc63bde IGNITE-20034 Implemented writeIntentResolution coordinator
path (#2585)
6acdc63bde is described below
commit 6acdc63bde9ba55d0b20c17cfc7c6d9ab995bf73
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Sep 18 20:24:39 2023 +0400
IGNITE-20034 Implemented writeIntentResolution coordinator path (#2585)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../org/apache/ignite/network/TopologyService.java | 8 +
.../internal/network/file/TestTopologyService.java | 5 +
.../scalecube/ScaleCubeTopologyService.java | 6 +
.../engine/framework/ClusterServiceFactory.java | 5 +
.../distributed/ItTxDistributedTestSingleNode.java | 6 +
.../ignite/distributed/ItTxStateLocalMapTest.java | 6 +-
.../internal/table/distributed/TableManager.java | 11 +-
.../distributed/replicator/LeaderOrTxState.java | 14 +-
.../replicator/PartitionReplicaListener.java | 118 +++------
.../replicator/TransactionStateResolver.java | 268 ++++++++++++++++++++-
.../apache/ignite/internal/table/TxLocalTest.java | 52 +---
.../replication/PartitionReplicaListenerTest.java | 55 +++--
.../apache/ignite/distributed/ItTxTestCluster.java | 96 ++++++--
.../ignite/internal/table/TxAbstractTest.java | 38 ++-
.../internal/tx/TransactionAbandonedException.java | 54 +++++
...ateReplicaRequest.java => TransactionMeta.java} | 24 +-
.../java/org/apache/ignite/internal/tx/TxMeta.java | 5 +-
.../org/apache/ignite/internal/tx/TxState.java | 18 +-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 12 +-
.../ignite/internal/tx/TxStateMetaFinishing.java | 56 +++++
.../ignite/internal/tx/impl/TxManagerImpl.java | 54 ++++-
.../ignite/internal/tx/message/TxMessageGroup.java | 14 +-
...est.java => TxStateCommitPartitionRequest.java} | 13 +-
...Request.java => TxStateCoordinatorRequest.java} | 6 +-
...ateReplicaRequest.java => TxStateResponse.java} | 25 +-
26 files changed, 713 insertions(+), 259 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 97ce2554e5..bf28673f7e 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -332,6 +332,9 @@ public class ErrorGroups {
/** Failure due to an incompatible schema change. */
public static final int TX_INCOMPATIBLE_SCHEMA_ERR =
TX_ERR_GROUP.registerErrorCode((short) 12);
+
+ /** Failure due to an abandoned transaction. */
+ public static final int TX_ABANDONED_ERR =
TX_ERR_GROUP.registerErrorCode((short) 13);
}
/** Replicator error group. */
diff --git
a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
index da1c3f7ca3..13b647e92d 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -61,4 +61,12 @@ public interface TopologyService {
* @return The node object; {@code null} if the node has not been
discovered or is offline.
*/
@Nullable ClusterNode getByConsistentId(String consistentId);
+
+ /**
+ * Returns a cluster node specified by its ID.
+ *
+ * @param id Node ID.
+ * @return The node object; {@code null} if the node has not been
discovered or is offline.
+ */
+ @Nullable ClusterNode getById(String id);
}
diff --git
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
index a98a849a7e..07e4b95ff7 100644
---
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
+++
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
@@ -48,6 +48,11 @@ public class TestTopologyService extends
AbstractTopologyService {
throw new UnsupportedOperationException();
}
+ @Override
+ public @Nullable ClusterNode getById(String id) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Calls {@link TopologyEventHandler#onAppeared(ClusterNode)} on all
registered event handlers.
*
diff --git
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 8d11ac34d2..9c23f993c5 100644
---
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -182,6 +182,12 @@ final class ScaleCubeTopologyService extends
AbstractTopologyService {
return consistentIdToMemberMap.get(consistentId);
}
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable ClusterNode getById(String id) {
+ return consistentIdToMemberMap.values().stream().filter(member ->
member.id().equals(id)).findFirst().orElse(null);
+ }
+
/**
* Converts the given {@link Member} to a {@link ClusterNode}.
*
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 7cd29156c7..9e04a26b0c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -160,6 +160,11 @@ public class ClusterServiceFactory {
public @Nullable ClusterNode getByConsistentId(String consistentId) {
return allMembers.get(consistentId);
}
+
+ @Override
+ public @Nullable ClusterNode getById(String id) {
+ return allMembers.values().stream().filter(member ->
member.id().equals(id)).findFirst().orElse(null);
+ }
}
private class LocalMessagingService extends AbstractMessagingService {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 05afbc7918..557867356e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Collection;
import java.util.Map;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -224,6 +225,11 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
return true;
}
+ @Override
+ protected Collection<TxManager> txManagers() {
+ return txTestCluster.txManagers.values();
+ }
+
@Test
public void testIgniteTransactionsAndReadTimestamp() {
Transaction readWriteTx = igniteTransactions.begin();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 702401d28e..3d8591000f 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -151,7 +151,11 @@ public class ItTxStateLocalMapTest extends
IgniteAbstractTest {
checkLocalTxStateOnNodes(
tx.id(),
- new TxStateMeta(commit ? COMMITED : ABORTED, coordinatorId,
commit ? testCluster.clocks.get(coord.name()).now() : null)
+ new TxStateMeta(
+ commit ? COMMITED : ABORTED,
+ coordinatorId,
+ commit ? testCluster.clocks.get(coord.name()).now() :
null
+ )
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index aee43081ab..8635c3cd73 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -432,7 +432,14 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
clusterNodeResolver = topologyService::getByConsistentId;
- transactionStateResolver = new TransactionStateResolver(replicaSvc,
clusterNodeResolver);
+ transactionStateResolver = new TransactionStateResolver(
+ replicaSvc,
+ txManager,
+ clock,
+ clusterNodeResolver,
+ topologyService::getById,
+ clusterService.messagingService()
+ );
tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
@@ -495,6 +502,8 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
lowWatermark.start();
+ transactionStateResolver.start();
+
CompletableFuture<Long> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
assert recoveryFinishFuture.isDone();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
index a37a7b512f..b838156230 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
@@ -18,13 +18,13 @@
package org.apache.ignite.internal.table.distributed.replicator;
import java.io.Serializable;
-import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.jetbrains.annotations.Nullable;
/**
- * Response for the {@link TxStateReplicaRequest}. Can contain either the
consistent ID of the Partition Group leader, which should be
- * queried for the TX Meta, or the TX Meta itself.
+ * Response for the {@link TxStateCommitPartitionRequest}. Can contain either
the consistent ID of the Partition Group leader, which should
+ * be queried for the TX Meta, or the TX Meta itself.
*/
public class LeaderOrTxState implements Serializable {
private static final long serialVersionUID = -3555591755828355117L;
@@ -33,7 +33,7 @@ public class LeaderOrTxState implements Serializable {
private final String leaderName;
@Nullable
- private final TxMeta txMeta;
+ private final TransactionMeta txMeta;
/**
* Creates a response.
@@ -41,7 +41,7 @@ public class LeaderOrTxState implements Serializable {
* @param leaderName Leader consistent ID.
* @param txMeta TX meta.
*/
- public LeaderOrTxState(@Nullable String leaderName, @Nullable TxMeta
txMeta) {
+ public LeaderOrTxState(@Nullable String leaderName, @Nullable
TransactionMeta txMeta) {
this.leaderName = leaderName;
this.txMeta = txMeta;
}
@@ -50,7 +50,7 @@ public class LeaderOrTxState implements Serializable {
return leaderName;
}
- public @Nullable TxMeta txMeta() {
+ public @Nullable TransactionMeta txMeta() {
return txMeta;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42e3e3d4c7..0197aa61cf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -25,6 +25,7 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DROP;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -132,14 +133,14 @@ import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
+import org.apache.ignite.internal.tx.TransactionAbandonedException;
+import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
-import org.apache.ignite.internal.tx.message.TxMessagesFactory;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
@@ -168,9 +169,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Factory for creating replica command messages. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
- /** Tx messages factory. */
- private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
-
/** Replication group id. */
private final TablePartitionId replicationGroupId;
@@ -216,12 +214,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Runs async scan tasks for effective tail recursion execution (avoid
deep recursive calls). */
private final Executor scanRequestExecutor;
- /**
- * Map to control clock's update in the read only transactions
concurrently with a commit timestamp.
- * TODO: IGNITE-20034 review this after the commit timestamp will be
provided from a commit request (request.commitTimestamp()).
- */
- private final ConcurrentHashMap<UUID, CompletableFuture<TxMeta>>
txTimestampUpdateMap = new ConcurrentHashMap<>();
-
private final Supplier<Map<Integer, IndexLocker>> indexesLockers;
private final ConcurrentMap<UUID, TxCleanupReadyFutureList>
txCleanupReadyFutures = new ConcurrentHashMap<>();
@@ -336,8 +328,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
@Override
public CompletableFuture<?> invoke(ReplicaRequest request, String
senderId) {
- if (request instanceof TxStateReplicaRequest) {
- return processTxStateReplicaRequest((TxStateReplicaRequest)
request);
+ if (request instanceof TxStateCommitPartitionRequest) {
+ return
processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request);
}
return ensureReplicaIsPrimary(request).thenCompose(isPrimary ->
processRequest(request, isPrimary, senderId));
@@ -424,49 +416,23 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param request Transaction state request.
* @return Result future.
*/
- private CompletableFuture<LeaderOrTxState>
processTxStateReplicaRequest(TxStateReplicaRequest request) {
+ private CompletableFuture<LeaderOrTxState>
processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) {
return placementDriver.getPrimaryReplica(replicationGroupId,
hybridClock.now())
.thenCompose(primaryReplica -> {
if (isLocalPeer(primaryReplica.getLeaseholder())) {
- CompletableFuture<TxMeta> txStateFut =
getTxStateConcurrently(request);
+ TransactionMeta txMeta =
txManager.stateMeta(request.txId());
+
+ if (txMeta == null) {
+ txMeta = txStateStorage.get(request.txId());
+ }
- return txStateFut.thenApply(txMeta -> new
LeaderOrTxState(null, txMeta));
+ return completedFuture(new LeaderOrTxState(null,
txMeta));
} else {
return completedFuture(new
LeaderOrTxState(primaryReplica.getLeaseholder(), null));
}
});
}
- /**
- * Gets a transaction state or {@code null}, if the transaction is not
completed.
- *
- * @param txStateReq Transaction state request.
- * @return Future to transaction state meta or {@code null}.
- */
- private CompletableFuture<TxMeta>
getTxStateConcurrently(TxStateReplicaRequest txStateReq) {
- //TODO: IGNITE-20034 review this after the commit timestamp will be
provided from a commit request (request.commitTimestamp()).
- CompletableFuture<TxMeta> txStateFut = new CompletableFuture<>();
-
- txTimestampUpdateMap.compute(txStateReq.txId(), (uuid, fut) -> {
- if (fut != null) {
- fut.thenAccept(txStateFut::complete);
- } else {
- TxMeta txMeta = txStateStorage.get(txStateReq.txId());
-
- if (txMeta == null) {
- // All future transactions will be committed after the
resolution processed.
- hybridClock.update(txStateReq.readTimestamp());
- }
-
- txStateFut.complete(txMeta);
- }
-
- return null;
- });
-
- return txStateFut;
- }
-
/**
* Processes retrieve batch for read only transaction.
*
@@ -1197,9 +1163,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
UUID txId,
String txCoordinatorId
) {
- CompletableFuture<?> changeStateFuture =
finishTransaction(aggregatedGroupIds, txId, commit, txCoordinatorId);
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ CompletableFuture<?> changeStateFuture =
finishTransaction(aggregatedGroupIds, txId, commit, commitTimestamp,
txCoordinatorId);
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17578 Cleanup
process should be asynchronous.
CompletableFuture<?>[] cleanupFutures = new
CompletableFuture[request.groups().size()];
AtomicInteger cleanupFuturesCnt = new AtomicInteger(0);
@@ -1225,6 +1192,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param aggregatedGroupIds Partition identifies which are enlisted in
the transaction.
* @param txId Transaction id.
* @param commit True is the transaction is committed, false otherwise.
+ * @param commitTimestamp Commit timestamp, if applicable.
* @param txCoordinatorId Transaction coordinator id.
* @return Future to wait of the finish.
*/
@@ -1232,15 +1200,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
List<TablePartitionId> aggregatedGroupIds,
UUID txId,
boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
String txCoordinatorId
) {
- // TODO: IGNITE-20034 Timestamp from request is not using until the
issue has not been fixed (request.commitTimestamp())
- var fut = new CompletableFuture<TxMeta>();
-
- txTimestampUpdateMap.put(txId, fut);
-
HybridTimestamp currentTimestamp = hybridClock.now();
- HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
return reliableCatalogVersionFor(currentTimestamp)
.thenApply(catalogVersion -> {
@@ -1266,11 +1229,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
.whenComplete((o, throwable) -> {
TxState txState = commit ? COMMITED : ABORTED;
- fut.complete(new TxMeta(txState, aggregatedGroupIds,
commitTimestamp));
-
markFinished(txId, txState, commitTimestamp);
-
- txTimestampUpdateMap.remove(txId);
});
}
@@ -1537,8 +1496,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
checkWriteIntentsBelongSameTx(writeIntents);
return resolveTxState(
- new TablePartitionId(writeIntent.commitTableId(),
writeIntent.commitPartitionId()),
writeIntent.transactionId(),
+ new TablePartitionId(writeIntent.commitTableId(),
writeIntent.commitPartitionId()),
ts)
.thenApply(readLastCommitted -> {
if (readLastCommitted) {
@@ -2508,8 +2467,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
Supplier<BinaryRow> lastCommitted
) {
return resolveTxState(
- new TablePartitionId(readResult.commitTableId(),
readResult.commitPartitionId()),
readResult.transactionId(),
+ new TablePartitionId(readResult.commitTableId(),
readResult.commitPartitionId()),
timestamp
).thenApply(readLastCommitted -> {
if (readLastCommitted) {
@@ -2523,34 +2482,33 @@ public class PartitionReplicaListener implements
ReplicaListener {
/**
* Resolve the actual tx state.
*
- * @param commitGrpId Commit partition id.
* @param txId Transaction id.
+ * @param commitGrpId Commit partition id.
* @param timestamp Timestamp.
* @return The future completes with true when the transaction is not
completed yet and false otherwise.
*/
private CompletableFuture<Boolean> resolveTxState(
- TablePartitionId commitGrpId,
UUID txId,
+ TablePartitionId commitGrpId,
HybridTimestamp timestamp
) {
- boolean readLatest = timestamp == null;
-
- return transactionStateResolver.sendMetaRequest(commitGrpId,
FACTORY.txStateReplicaRequest()
- .groupId(commitGrpId)
- .readTimestampLong((readLatest ?
HybridTimestamp.MIN_VALUE : timestamp).longValue())
- .txId(txId)
- .build())
- .thenApply(txMeta -> {
- if (txMeta == null) {
- return true;
- } else if (txMeta.txState() == COMMITED) {
- return !readLatest &&
txMeta.commitTimestamp().compareTo(timestamp) > 0;
- } else {
- assert txMeta.txState() == ABORTED : "Unexpected
transaction state [state=" + txMeta.txState() + ']';
+ return transactionStateResolver.resolveTxState(txId, commitGrpId,
timestamp).thenApply(txMeta -> {
+ if (txMeta == null || txMeta.txState() == ABANDONED) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20427
make the null value returned from commit partition
+ // TODO more determined
+ throw new TransactionAbandonedException(txId, txMeta);
+ } else if (txMeta.txState() == COMMITED) {
+ boolean readLatest = timestamp == null;
+
+ return !readLatest &&
txMeta.commitTimestamp().compareTo(timestamp) > 0;
+ } else if (txMeta.txState() == ABORTED) {
+ return true;
+ } else {
+ assert txMeta.txState() == PENDING : "Unexpected transaction
state [state=" + txMeta.txState() + ']';
- return true;
- }
- });
+ return true;
+ }
+ });
}
private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index 288be7b281..2b95a7e3fa 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -17,24 +17,49 @@
package org.apache.ignite.internal.table.distributed.replicator;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.tx.message.TxStateResponse;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
/**
- * Helper class that allows to resolve transaction state.
+ * Placement driver.
*/
public class TransactionStateResolver {
+ /** Tx messages factory. */
+ private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+ /** Network timeout. */
+ private static final long RPC_TIMEOUT = 3000;
+
/** Assignment node names per replication group. */
private final Map<ReplicationGroupId, LinkedHashSet<String>>
primaryReplicaMapping = new ConcurrentHashMap<>();
@@ -44,29 +69,200 @@ public class TransactionStateResolver {
/** Function that resolves a node consistent ID to a cluster node. */
private final Function<String, ClusterNode> clusterNodeResolver;
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this
ticket this resolver will be no longer needed, as
+ // TODO we will store coordinator as ClusterNode in local tx state map.
+ /** Function that resolves a node non-consistent ID to a cluster node. */
+ private final Function<String, ClusterNode> clusterNodeResolverById;
+
+ private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures
= new ConcurrentHashMap<>();
+
+ private final TxManager txManager;
+
+ private final HybridClock clock;
+
+ private final MessagingService messagingService;
+
/**
* The constructor.
*
* @param replicaService Replication service.
+ * @param txManager Transaction manager.
+ * @param clock Node clock.
+ * @param clusterNodeResolver Cluster node resolver.
+ * @param clusterNodeResolverById Cluster node resolver using
non-consistent id.
+ * @param messagingService Messaging service.
*/
- public TransactionStateResolver(ReplicaService replicaService,
Function<String, ClusterNode> clusterNodeResolver) {
+ public TransactionStateResolver(
+ ReplicaService replicaService,
+ TxManager txManager,
+ HybridClock clock,
+ Function<String, ClusterNode> clusterNodeResolver,
+ Function<String, ClusterNode> clusterNodeResolverById,
+ MessagingService messagingService
+ ) {
this.replicaService = replicaService;
+ this.txManager = txManager;
+ this.clock = clock;
this.clusterNodeResolver = clusterNodeResolver;
+ this.clusterNodeResolverById = clusterNodeResolverById;
+ this.messagingService = messagingService;
+ }
+
+ /**
+ * This should be called in order to allow the transaction state resolver
to listen to {@link TxStateCoordinatorRequest} messages.
+ */
+ public void start() {
+ messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender,
correlationId) -> {
+ if (msg instanceof TxStateCoordinatorRequest) {
+ TxStateCoordinatorRequest req = (TxStateCoordinatorRequest)
msg;
+
+ processTxStateRequest(req)
+ .thenAccept(txStateMeta -> {
+ NetworkMessage response = FACTORY.txStateResponse()
+ .txStateMeta(txStateMeta)
+ .timestampLong(clock.nowLong())
+ .build();
+
+ messagingService.respond(sender, response,
correlationId);
+ });
+ }
+ });
}
/**
- * Sends a transaction state request to the primary replica.
+ * Resolves transaction state locally, if possible, or distributively, if
needed.
*
- * @param replicaGrp Replication group id.
- * @param request Status request.
- * @return Result future.
+ * @param txId Transaction id.
+ * @param commitGrpId Commit partition group id.
+ * @param timestamp Timestamp.
+ * @return Future with the transaction state meta as a result.
*/
- public CompletableFuture<TxMeta> sendMetaRequest(ReplicationGroupId
replicaGrp, TxStateReplicaRequest request) {
- CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
+ public CompletableFuture<TransactionMeta> resolveTxState(
+ UUID txId,
+ ReplicationGroupId commitGrpId,
+ HybridTimestamp timestamp
+ ) {
+ TxStateMeta localMeta = txManager.stateMeta(txId);
+
+ if (localMeta != null && isFinalState(localMeta.txState())) {
+ return completedFuture(localMeta);
+ }
- sendAndRetry(resFut, replicaGrp, request);
+ CompletableFuture<TransactionMeta> future =
txStateFutures.compute(txId, (k, v) -> {
+ if (v == null) {
+ v = new CompletableFuture<>();
- return resFut;
+ resolveDistributiveTxState(txId, localMeta, commitGrpId,
timestamp, v);
+ }
+
+ return v;
+ });
+
+ future.whenComplete((v, e) -> txStateFutures.remove(txId));
+
+ return future;
+ }
+
+ /**
+ * Resolve the transaction state distributively. This method doesn't
process final tx states.
+ *
+ * @param txId Transaction id.
+ * @param localMeta Local tx meta.
+ * @param commitGrpId Commit partition group id.
+ * @param timestamp Timestamp to pass to target node.
+ * @param txMetaFuture Tx meta future to complete with the result.
+ */
+ private void resolveDistributiveTxState(
+ UUID txId,
+ @Nullable TxStateMeta localMeta,
+ ReplicationGroupId commitGrpId,
+ HybridTimestamp timestamp,
+ CompletableFuture<TransactionMeta> txMetaFuture
+ ) {
+ assert localMeta == null || !isFinalState(localMeta.txState()) :
"Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']';
+
+ HybridTimestamp timestamp0 = timestamp == null ?
HybridTimestamp.MIN_VALUE : timestamp;
+
+ if (localMeta == null) {
+ // Fallback to commit partition path, because we don't have
coordinator id.
+ resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
+ } else if (localMeta.txState() == PENDING) {
+ resolveTxStateFromTxCoordinator(txId, localMeta.txCoordinatorId(),
commitGrpId, timestamp0, txMetaFuture);
+ } else if (localMeta.txState() == FINISHING) {
+ assert localMeta instanceof TxStateMetaFinishing;
+
+ ((TxStateMetaFinishing)
localMeta).txFinishFuture().whenComplete((v, e) -> {
+ if (e == null) {
+ txMetaFuture.complete(v);
+ } else {
+ txMetaFuture.completeExceptionally(e);
+ }
+ });
+ } else {
+ assert localMeta.txState() == ABANDONED : "Unexpected transaction
state [txId=" + txId + ", txStateMeta=" + localMeta + ']';
+
+ txMetaFuture.complete(localMeta);
+ }
+ }
+
+ private void resolveTxStateFromTxCoordinator(
+ UUID txId,
+ String coordinatorId,
+ ReplicationGroupId commitGrpId,
+ HybridTimestamp timestamp,
+ CompletableFuture<TransactionMeta> txMetaFuture
+ ) {
+ updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
+
+ ClusterNode coordinator = clusterNodeResolverById.apply(coordinatorId);
+
+ if (coordinator == null) {
+ // This means the coordinator node have either left the cluster or
restarted.
+ resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
+ } else {
+ CompletableFuture<TransactionMeta> coordinatorTxMetaFuture = new
CompletableFuture<>();
+
+ coordinatorTxMetaFuture.whenComplete((v, e) -> {
+ assert v != null : "Unexpected result from transaction
coordinator: unknown transaction state [txId=" + txId
+ + ", transactionCoordinator=" + coordinator + ']';
+
+ if (e == null) {
+ txMetaFuture.complete(v);
+ } else {
+ txMetaFuture.completeExceptionally(e);
+ }
+ });
+
+ TxStateCoordinatorRequest request =
FACTORY.txStateCoordinatorRequest()
+ .readTimestampLong(timestamp.longValue())
+ .txId(txId)
+ .build();
+
+ sendAndRetry(coordinatorTxMetaFuture, coordinator, request);
+ }
+ }
+
+ private void resolveTxStateFromCommitPartition(
+ UUID txId,
+ ReplicationGroupId commitGrpId,
+ CompletableFuture<TransactionMeta> txMetaFuture
+ ) {
+ TxStateCommitPartitionRequest request =
FACTORY.txStateCommitPartitionRequest()
+ .groupId(commitGrpId)
+ .txId(txId)
+ .build();
+
+ updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
+
+ sendAndRetry(txMetaFuture, commitGrpId, request);
+ }
+
+ private void updateLocalTxMapAfterDistributedStateResolved(UUID txId,
CompletableFuture<TransactionMeta> future) {
+ future.thenAccept(txMeta -> {
+ if (txMeta != null && txMeta instanceof TxStateMeta) {
+ txManager.updateTxMeta(txId, old -> (TxStateMeta) txMeta);
+ }
+ });
}
/**
@@ -87,7 +283,11 @@ public class TransactionStateResolver {
* @param replicaGrp Replication group id.
* @param request Request.
*/
- private void sendAndRetry(CompletableFuture<TxMeta> resFut,
ReplicationGroupId replicaGrp, TxStateReplicaRequest request) {
+ private void sendAndRetry(
+ CompletableFuture<TransactionMeta> resFut,
+ ReplicationGroupId replicaGrp,
+ TxStateCommitPartitionRequest request
+ ) {
ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream()
.map(clusterNodeResolver)
.filter(Objects::nonNull)
@@ -115,4 +315,46 @@ public class TransactionStateResolver {
}
});
}
+
+ /**
+ * Tries to send a request to the given node.
+ *
+ * @param resFut Response future.
+ * @param node Node to send to.
+ * @param request Request.
+ */
+ private void sendAndRetry(CompletableFuture<TransactionMeta> resFut,
ClusterNode node, TxStateCoordinatorRequest request) {
+ messagingService.invoke(node, request, RPC_TIMEOUT).thenAccept(resp ->
{
+ assert resp instanceof TxStateResponse : "Unsupported response
type [type=" + resp.getClass().getSimpleName() + ']';
+
+ TxStateResponse response = (TxStateResponse) resp;
+
+ resFut.complete(response.txStateMeta());
+ });
+ }
+
+ /**
+ * Processes the transaction state requests that are used for coordinator
path based write intent resolution. Can't return
+ * {@link org.apache.ignite.internal.tx.TxState#FINISHING}, it waits for
actual completion instead.
+ *
+ * @param request Request.
+ * @return Future that should be completed with transaction state meta.
+ */
+ private CompletableFuture<TransactionMeta>
processTxStateRequest(TxStateCoordinatorRequest request) {
+ clock.update(request.readTimestamp());
+
+ UUID txId = request.txId();
+
+ TxStateMeta txStateMeta = txManager.stateMeta(txId);
+
+ if (txStateMeta != null && txStateMeta.txState() == FINISHING) {
+ assert txStateMeta instanceof TxStateMetaFinishing;
+
+ TxStateMetaFinishing txStateMetaFinishing = (TxStateMetaFinishing)
txStateMeta;
+
+ return txStateMetaFinishing.txFinishFuture();
+ } else {
+ return completedFuture(txStateMeta);
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 7277551497..951e77a0b6 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -25,9 +26,11 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -44,7 +47,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.Table;
@@ -110,14 +112,13 @@ public class TxLocalTest extends TxAbstractTest {
TransactionStateResolver transactionStateResolver =
mock(TransactionStateResolver.class, RETURNS_DEEP_STUBS);
doAnswer(invocationOnMock -> {
- TxStateReplicaRequest request = invocationOnMock.getArgument(1);
+ UUID txId = invocationOnMock.getArgument(0);
- return CompletableFuture.completedFuture(
-
tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
- }).when(transactionStateResolver).sendMetaRequest(any(), any());
+ return completedFuture(txManager.stateMeta(txId));
+ }).when(transactionStateResolver).resolveTxState(any(), any(), any());
- txManager = new TxManagerImpl(replicaSvc, lockManager,
DummyInternalTableImpl.CLOCK,
- new TransactionIdGenerator(0xdeadbeef), () -> localNodeName);
+ txManager = new TxManagerImpl(replicaSvc, lockManager,
DummyInternalTableImpl.CLOCK, new TransactionIdGenerator(0xdeadbeef),
+ () -> localNodeName);
igniteTransactions = new IgniteTransactionsImpl(txManager,
timestampTracker);
@@ -173,39 +174,8 @@ public class TxLocalTest extends TxAbstractTest {
return true;
}
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
@Override
- public void testReadOnlyGet() {
- // No-op
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
- @Override
- public void testReadOnlyScan() throws Exception {
- // No-op
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
- @Override
- public void testReadOnlyGetWriteIntentResolutionUpdate() {
- // No-op
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
- @Override
- public void testReadOnlyGetWriteIntentResolutionRemove() {
- // No-op
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
- @Override
- public void testReadOnlyGetAll() {
- // No-op
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
- @Override
- public void testReadOnlyPendingWriteIntentSkippedCombined() {
- super.testReadOnlyPendingWriteIntentSkippedCombined();
+ protected Collection<TxManager> txManagers() {
+ return List.of(txManager);
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index be9c6890fa..3452330b4b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -153,6 +154,7 @@ import
org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
@@ -170,6 +172,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.sql.ColumnType;
@@ -270,7 +273,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
/** Another (not local) cluster node. */
private final ClusterNode anotherNode = new ClusterNodeImpl("node2",
"node2", NetworkAddress.from("127.0.0.2:127"));
- private final TransactionStateResolver transactionStateResolver =
mock(TransactionStateResolver.class);
+ private TransactionStateResolver transactionStateResolver;
private final PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMvPartitionStorage);
@@ -297,6 +300,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Mock
private CatalogTables catalogTables;
+ @Mock
+ private MessagingService messagingService;
+
/** Schema descriptor for tests. */
private SchemaDescriptor schemaDescriptor;
@@ -381,23 +387,6 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
when(topologySrv.localMember()).thenReturn(localNode);
- HybridTimestamp txFixedTimestamp = clock.now();
-
- when(transactionStateResolver.sendMetaRequest(any(),
any())).thenAnswer(invocationOnMock -> {
- TxMeta txMeta;
-
- if (txState == null) {
- txMeta = null;
- } else if (txState == TxState.COMMITED) {
- txMeta = new TxMeta(TxState.COMMITED, singletonList(grpId),
txFixedTimestamp);
- } else {
- assert txState == TxState.ABORTED : "Sate is " + txState;
-
- txMeta = new TxMeta(TxState.ABORTED, singletonList(grpId),
txFixedTimestamp);
- }
- return completedFuture(txMeta);
- });
-
when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));
@@ -453,6 +442,22 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation -> txStateMeta).when(txManager).stateMeta(any());
+ doAnswer(invocation -> {
+ var resp = new
TxMessagesFactory().txStateResponse().txStateMeta(txStateMeta).build();
+ return completedFuture(resp);
+ }).when(messagingService).invoke(any(ClusterNode.class), any(),
anyLong());
+
+ transactionStateResolver = new TransactionStateResolver(
+ mock(ReplicaService.class),
+ txManager,
+ clock,
+ consistentId -> consistentId.equals(localNode.name()) ?
localNode : anotherNode,
+ id -> id.equals(localNode.id()) ? localNode : anotherNode,
+ messagingService
+ );
+
+ transactionStateResolver.start();
+
partitionReplicaListener = new PartitionReplicaListener(
testMvPartitionStorage,
mockRaftClient,
@@ -525,9 +530,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Test
public void testTxStateReplicaRequestEmptyState() throws Exception {
- CompletableFuture<?> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+ CompletableFuture<?> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
- .readTimestampLong(clock.nowLong())
.txId(newTxId())
.build(), "senderId");
@@ -545,15 +549,14 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
HybridTimestamp readTimestamp = clock.now();
- CompletableFuture<?> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+ CompletableFuture<?> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
- .readTimestampLong(readTimestamp.longValue())
.txId(txId)
.build(), localNode.id());
LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
- TxMeta txMeta = tuple.txMeta();
+ TransactionMeta txMeta = tuple.txMeta();
assertNotNull(txMeta);
assertEquals(TxState.COMMITED, txMeta.txState());
assertNotNull(txMeta.commitTimestamp());
@@ -566,9 +569,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
public void testTxStateReplicaRequestMissLeaderMiss() throws Exception {
localLeader = false;
- CompletableFuture<?> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+ CompletableFuture<?> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(grpId)
- .readTimestampLong(clock.nowLong())
.txId(newTxId())
.build(), localNode.id());
@@ -627,6 +629,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED,
localNode.id(), clock.now()));
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
@@ -649,6 +652,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING,
localNode.id(), null));
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
@@ -672,6 +676,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED,
localNode.id(), null));
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 1b3516092f..67ceb5d659 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
@@ -137,6 +138,10 @@ import org.junit.jupiter.api.TestInfo;
* Class that allows to mock a cluster for transaction tests' purposes.
*/
public class ItTxTestCluster {
+ private final List<NetworkAddress> localAddresses;
+
+ private final NodeFinder nodeFinder;
+
private final RaftConfiguration raftConfig;
private final GcConfiguration gcConfig;
@@ -173,6 +178,8 @@ public class ItTxTestCluster {
protected TxManager clientTxManager;
+ protected TransactionStateResolver clientTxStateResolver;
+
protected Map<String, List<RaftGroupService>> raftClients = new
HashMap<>();
protected Map<String, TxStateStorage> txStateStorages;
@@ -201,6 +208,22 @@ public class ItTxTestCluster {
return null;
};
+ private final Function<String, ClusterNode> idToNode = id -> {
+ for (ClusterService service : cluster) {
+ ClusterNode clusterNode = service.topologyService().localMember();
+
+ if (clusterNode.id().equals(id)) {
+ return clusterNode;
+ }
+ }
+
+ if (client != null &&
client.topologyService().localMember().id().equals(id)) {
+ return client.topologyService().localMember();
+ }
+
+ return null;
+ };
+
private final TestInfo testInfo;
/** Observable timestamp tracker. */
@@ -227,6 +250,9 @@ public class ItTxTestCluster {
this.startClient = startClient;
this.testInfo = testInfo;
this.timestampTracker = timestampTracker;
+
+ localAddresses = findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE +
nodes);
+ nodeFinder = new StaticNodeFinder(localAddresses);
}
/**
@@ -236,10 +262,6 @@ public class ItTxTestCluster {
assertTrue(nodes > 0);
assertTrue(replicas > 0);
- List<NetworkAddress> localAddresses =
findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + nodes);
-
- var nodeFinder = new StaticNodeFinder(localAddresses);
-
clusterServices = new ConcurrentHashMap<>(nodes);
nodeFinder.findNodes().parallelStream()
@@ -258,20 +280,7 @@ public class ItTxTestCluster {
LOG.info("The cluster has been started");
if (startClient) {
- client = startNode(testInfo, "client", NODE_PORT_BASE - 1,
nodeFinder);
-
- assertTrue(waitForTopology(client, nodes + 1, 1000));
-
- clientClock = new HybridClockImpl();
-
- LOG.info("Replica manager has been started, node=[" +
client.topologyService().localMember() + ']');
-
- clientReplicaSvc = new ReplicaService(
- client.messagingService(),
- clientClock
- );
-
- LOG.info("The client has been started");
+ startClient();
}
// Start raft servers. Each raft server can hold multiple groups.
@@ -345,16 +354,15 @@ public class ItTxTestCluster {
localNodeName = cluster.get(0).topologyService().localMember().name();
if (startClient) {
- clientTxManager = new TxManagerImpl(clientReplicaSvc, new
HeapLockManager(), clientClock, new TransactionIdGenerator(-1),
- () -> cluster.get(0).topologyService().localMember().id());
+ initializeClientTxComponents();
} else {
// Collocated mode.
clientTxManager = txManagers.get(localNodeName);
}
- assertNotNull(clientTxManager);
-
igniteTransactions = new IgniteTransactionsImpl(clientTxManager,
timestampTracker);
+
+ assertNotNull(clientTxManager);
}
public IgniteTransactions igniteTransactions() {
@@ -401,7 +409,15 @@ public class ItTxTestCluster {
var mvTableStorage = new TestMvTableStorage(tableId,
DEFAULT_PARTITION_COUNT);
var mvPartStorage = new TestMvPartitionStorage(partId);
var txStateStorage = txStateStorages.get(assignment);
- var transactionStateResolver = new
TransactionStateResolver(replicaServices.get(assignment), consistentIdToNode);
+ var transactionStateResolver = new TransactionStateResolver(
+ replicaServices.get(assignment),
+ txManagers.get(assignment),
+ clocks.get(assignment),
+ consistentIdToNode,
+ idToNode,
+ clusterServices.get(assignment).messagingService()
+ );
+ transactionStateResolver.start();
for (int part = 0; part < assignments.size(); part++) {
transactionStateResolver.updateAssignment(grpIds.get(part),
assignments.get(part));
@@ -680,4 +696,38 @@ public class ItTxTestCluster {
return network;
}
+
+ private void startClient() throws InterruptedException {
+ client = startNode(testInfo, "client", NODE_PORT_BASE - 1, nodeFinder);
+
+ assertTrue(waitForTopology(client, nodes + 1, 1000));
+
+ clientClock = new HybridClockImpl();
+
+ LOG.info("Replica manager has been started, node=[" +
client.topologyService().localMember() + ']');
+
+ clientReplicaSvc = new ReplicaService(
+ client.messagingService(),
+ clientClock
+ );
+
+ LOG.info("The client has been started");
+ }
+
+ private void initializeClientTxComponents() {
+ Supplier<String> localNodeIdSupplier = () ->
client.topologyService().localMember().id();
+
+ clientTxManager = new TxManagerImpl(clientReplicaSvc, new
HeapLockManager(), clientClock, new TransactionIdGenerator(-1),
+ localNodeIdSupplier);
+
+ clientTxStateResolver = new TransactionStateResolver(
+ clientReplicaSvc,
+ clientTxManager,
+ clientClock,
+ consistentIdToNode,
+ idToNode,
+ client.messagingService()
+ );
+ clientTxStateResolver.start();
+ }
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index fdabc84947..69cc7409ef 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -41,6 +41,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CyclicBarrier;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Pair;
@@ -119,8 +121,6 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
protected IgniteTransactions igniteTransactions;
- protected TxManager clientTxManager;
-
@Test
public void testCommitRollbackSameTxDoesNotThrow() throws
TransactionException {
InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
@@ -1633,7 +1633,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
* @param id The id.
* @return The key tuple.
*/
- private Tuple makeKey(long id) {
+ protected Tuple makeKey(long id) {
return Tuple.create().set("accountNumber", id);
}
@@ -1766,7 +1766,6 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
});
}
-
@Test
public void testReadOnlyGet() {
accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1935,6 +1934,30 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(BALANCE_1, accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
}
+ @Test
+ public void testWriteIntentResolutionFallbackToCommitPartitionPath() {
+ accounts.recordView().upsert(null, makeValue(1, 100.));
+
+ // Pending tx
+ Transaction tx = igniteTransactions.begin();
+ accounts.recordView().delete(tx, makeKey(1));
+
+ // Imitate the restart of the client node, which is a tx coordinator,
in order to make its volatile state of unavailable.
+ // Now coordinator path of the write intent resolution has no effect,
and we should fallback to commit partition path.
+ UUID txId = ((ReadWriteTransactionImpl) tx).id();
+
+ for (TxManager txManager : txManagers()) {
+ txManager.updateTxMeta(txId, old -> old == null ? null : new
TxStateMeta(old.txState(), "restarted", old.commitTimestamp()));
+ }
+
+ // Read-only.
+ Transaction readOnlyTx = igniteTransactions.begin(new
TransactionOptions().readOnly(true));
+ assertEquals(100., accounts.recordView().get(readOnlyTx,
makeKey(1)).doubleValue("balance"));
+
+ // Commit pending tx.
+ tx.commit();
+ }
+
/**
* Checks operations that act after a transaction is committed, are
finished with exception.
*
@@ -1992,4 +2015,11 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertThat(res, contains(null, null));
}
}
+
+ /**
+ * Returns server nodes' tx managers.
+ *
+ * @return Server nodes' tx managers.
+ */
+ protected abstract Collection<TxManager> txManagers();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java
new file mode 100644
index 0000000000..c35e1ada47
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ABANDONED_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception that is thrown when write intent can't be resolved because it has
been created by a transaction that is now abandoned
+ * (has a state that currently can't be known).
+ */
+public class TransactionAbandonedException extends TransactionException {
+ private final UUID abandonedTxId;
+
+ @Nullable
+ private final TransactionMeta transactionMeta;
+
+ /**
+ * Constructor.
+ *
+ * @param abandonedTxId ID of the transaction that is abandoned.
+ * @param transactionMeta Transaction meta that was able to be retrieved.
+ */
+ public TransactionAbandonedException(UUID abandonedTxId, @Nullable
TransactionMeta transactionMeta) {
+ super(UUID.randomUUID(), TX_ABANDONED_ERR, null);
+
+ this.abandonedTxId = abandonedTxId;
+ this.transactionMeta = transactionMeta;
+ }
+
+ @Override
+ public String getMessage() {
+ return "Operation failed due to abandoned transaction [abandonedTxId="
+ abandonedTxId
+ + ", transactionMeta=" + transactionMeta + ']';
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
similarity index 60%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
index 26ae27b38f..975a818479 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
@@ -15,25 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.tx.message;
+package org.apache.ignite.internal.tx;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import java.util.UUID;
+import java.io.Serializable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
/**
- * Transaction state request.
+ * Transaction metadata interface.
*/
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
- UUID txId();
-
- long readTimestampLong();
+public interface TransactionMeta extends Serializable {
+ /** Tx state. */
+ TxState txState();
- default HybridTimestamp readTimestamp() {
- return hybridTimestamp(readTimestampLong());
- }
+ /** Commit timestamp. */
+ @Nullable HybridTimestamp commitTimestamp();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index ea312e6f46..8eab25e38a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.tx;
import static java.util.Collections.unmodifiableList;
-import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -28,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/** Transaction meta. */
-public class TxMeta implements Serializable {
+public class TxMeta implements TransactionMeta {
/** Serial version UID. */
private static final long serialVersionUID = -172513482743911860L;
@@ -55,6 +54,7 @@ public class TxMeta implements Serializable {
this.commitTimestamp = commitTimestamp;
}
+ @Override
public TxState txState() {
return txState;
}
@@ -63,6 +63,7 @@ public class TxMeta implements Serializable {
return unmodifiableList(enlistedPartitions);
}
+ @Override
public @Nullable HybridTimestamp commitTimestamp() {
return commitTimestamp;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index f0a866238a..c5a897b647 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -26,16 +26,22 @@ public enum TxState {
PENDING,
FINISHING,
ABORTED,
- COMMITED;
+ COMMITED,
+ ABANDONED;
private static final boolean[][] TRANSITION_MATRIX = {
- { false, true, false, true, true },
- { false, true, true, true, true },
- { false, false, false, true, true },
- { false, false, false, true, false},
- { false, false, false, false, true },
+ { false, true, false, true, true, true },
+ { false, true, true, true, true, true },
+ { false, false, false, true, true, true },
+ { false, false, false, true, false, true },
+ { false, false, false, false, true, true },
+ { true, true, true, true, true, true }
};
+ public static boolean isFinalState(TxState state) {
+ return state == COMMITED || state == ABORTED;
+ }
+
/**
* Checks the correctness of the transition between transaction states.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index dbb3da6133..df1e5f79ce 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -23,7 +23,9 @@ import org.jetbrains.annotations.Nullable;
/**
* Transaction state meta.
*/
-public class TxStateMeta {
+public class TxStateMeta implements TransactionMeta {
+ private static final long serialVersionUID = 8521181896862227127L;
+
private final TxState txState;
private final String txCoordinatorId;
@@ -37,12 +39,17 @@ public class TxStateMeta {
* @param txCoordinatorId Transaction coordinator id.
* @param commitTimestamp Commit timestamp.
*/
- public TxStateMeta(TxState txState, String txCoordinatorId, @Nullable
HybridTimestamp commitTimestamp) {
+ public TxStateMeta(
+ TxState txState,
+ String txCoordinatorId,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
this.txState = txState;
this.txCoordinatorId = txCoordinatorId;
this.commitTimestamp = commitTimestamp;
}
+ @Override
public TxState txState() {
return txState;
}
@@ -51,6 +58,7 @@ public class TxStateMeta {
return txCoordinatorId;
}
+ @Override
public @Nullable HybridTimestamp commitTimestamp() {
return commitTimestamp;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
new file mode 100644
index 0000000000..9b85925d7c
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link TxStateMeta} implementation for {@link TxState#FINISHING} state.
Contains future that is is completed after the state of
+ * corresponding transaction changes to final state.
+ */
+public class TxStateMetaFinishing extends TxStateMeta {
+ private static final long serialVersionUID = 9122953981654023665L;
+
+ /** Future that is completed after the state of corresponding transaction
changes to final state. */
+ private final CompletableFuture<TransactionMeta> txFinishFuture = new
CompletableFuture<>();
+
+ /**
+ * Constructor.
+ *
+ * @param txCoordinatorId Transaction coordinator id.
+ */
+ public TxStateMetaFinishing(String txCoordinatorId) {
+ super(TxState.FINISHING, txCoordinatorId, null);
+ }
+
+ /**
+ * Future that is completed after the state of corresponding transaction
changes to final state.
+ *
+ * @return Future that is completed after the state of corresponding
transaction changes to final state.
+ */
+ public CompletableFuture<TransactionMeta> txFinishFuture() {
+ return txFinishFuture;
+ }
+
+ @Override
+ public @Nullable HybridTimestamp commitTimestamp() {
+ throw new UnsupportedOperationException("Can't get commit timestamp
from FINISHING transaction state meta.");
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 8216b05059..c0a6979335 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -22,9 +22,9 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
-import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.util.Comparator;
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.util.Lazy;
@@ -230,14 +231,28 @@ public class TxManagerImpl implements TxManager {
) {
assert groups != null;
+ // Here we put finishing state meta into the local map, so that all
concurrent operations trying to read tx state
+ // with using read timestamp could see that this transaction is
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
+ // None of them now are able to update node's clock with read
timestamp and we can create the commit timestamp that is greater
+ // than all the read timestamps processed before.
+ // Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
+ // state after the transaction is finished.
+ TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(localNodeId.get());
+ updateTxMeta(txId, old -> finishingStateMeta);
HybridTimestamp commitTimestamp = commit ? clock.now() : null;
// If there are no enlisted groups, just return - we already marked
the tx as finished.
boolean finishRequestNeeded = !groups.isEmpty();
- updateTxMeta(txId, old -> new TxStateMeta(finishRequestNeeded ?
FINISHING : ABORTED, old.txCoordinatorId(), commitTimestamp));
-
if (!finishRequestNeeded) {
+ updateTxMeta(txId, old -> {
+ TxStateMeta finalStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
+
+ finishingStateMeta.txFinishFuture().complete(finalStateMeta);
+
+ return finalStateMeta;
+ });
+
return completedFuture(null);
}
@@ -254,11 +269,23 @@ public class TxManagerImpl implements TxManager {
.build();
return replicaService.invoke(recipientNode, req)
- .thenRun(() -> updateTxMeta(txId, old -> new TxStateMeta(
- commit ? COMMITED : ABORTED,
- old.txCoordinatorId(),
- old.commitTimestamp()
- )));
+ .thenRun(() -> {
+ updateTxMeta(txId, old -> {
+ if (isFinalState(old.txState())) {
+ finishingStateMeta.txFinishFuture().complete(old);
+
+ return old;
+ }
+
+ assert old instanceof TxStateMetaFinishing;
+
+ TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
+
+
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
+
+ return finalTxStateMeta;
+ });
+ });
}
@Override
@@ -353,4 +380,15 @@ public class TxManagerImpl implements TxManager {
lowWatermarkReadWriteLock.writeLock().unlock();
}
}
+
+ /**
+ * Creates final {@link TxStateMeta} for coordinator node.
+ *
+ * @param commit Commit flag.
+ * @param commitTimestamp Commit timestamp.
+ * @return Transaction meta.
+ */
+ private TxStateMeta coordinatorFinalTxStateMeta(boolean commit,
HybridTimestamp commitTimestamp) {
+ return new TxStateMeta(commit ? COMMITED : ABORTED, localNodeId.get(),
commitTimestamp);
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index c9ab6bedec..00abd58a1b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -40,7 +40,17 @@ public class TxMessageGroup {
public static final short TX_CLEANUP_REQUEST = 2;
/**
- * Message type for {@link TxStateReplicaRequest}.
+ * Message type for {@link TxStateCommitPartitionRequest}.
*/
- public static final short TX_STATE_REQUEST = 3;
+ public static final short TX_STATE_COMMIT_PARTITION_REQUEST = 3;
+
+ /**
+ * Message type for {@link TxStateCoordinatorRequest}.
+ */
+ public static final short TX_STATE_COORDINATOR_REQUEST = 4;
+
+ /**
+ * Message type for {@link TxStateResponse}.
+ */
+ public static final short TX_STATE_RESPONSE = 5;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java
similarity index 73%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java
index 26ae27b38f..76ed5803ad 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java
@@ -17,23 +17,14 @@
package org.apache.ignite.internal.tx.message;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.network.annotations.Transferable;
/**
* Transaction state request.
*/
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
+@Transferable(TxMessageGroup.TX_STATE_COMMIT_PARTITION_REQUEST)
+public interface TxStateCommitPartitionRequest extends ReplicaRequest {
UUID txId();
-
- long readTimestampLong();
-
- default HybridTimestamp readTimestamp() {
- return hybridTimestamp(readTimestampLong());
- }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
similarity index 87%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
index 26ae27b38f..80b2495f2a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
@@ -21,14 +21,14 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
* Transaction state request.
*/
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
+@Transferable(TxMessageGroup.TX_STATE_COORDINATOR_REQUEST)
+public interface TxStateCoordinatorRequest extends NetworkMessage {
UUID txId();
long readTimestampLong();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
similarity index 63%
rename from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
rename to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
index 26ae27b38f..9918a171eb 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
@@ -17,23 +17,18 @@
package org.apache.ignite.internal.tx.message;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
/**
- * Transaction state request.
+ * Transaction state response.
*/
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
- UUID txId();
-
- long readTimestampLong();
-
- default HybridTimestamp readTimestamp() {
- return hybridTimestamp(readTimestampLong());
- }
+@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
+public interface TxStateResponse extends TimestampAware {
+ @Marshallable
+ @Nullable
+ TransactionMeta txStateMeta();
}