This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 6acdc63bde IGNITE-20034 Implemented writeIntentResolution coordinator path (#2585) 6acdc63bde is described below commit 6acdc63bde9ba55d0b20c17cfc7c6d9ab995bf73 Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Mon Sep 18 20:24:39 2023 +0400 IGNITE-20034 Implemented writeIntentResolution coordinator path (#2585) --- .../java/org/apache/ignite/lang/ErrorGroups.java | 3 + .../org/apache/ignite/network/TopologyService.java | 8 + .../internal/network/file/TestTopologyService.java | 5 + .../scalecube/ScaleCubeTopologyService.java | 6 + .../engine/framework/ClusterServiceFactory.java | 5 + .../distributed/ItTxDistributedTestSingleNode.java | 6 + .../ignite/distributed/ItTxStateLocalMapTest.java | 6 +- .../internal/table/distributed/TableManager.java | 11 +- .../distributed/replicator/LeaderOrTxState.java | 14 +- .../replicator/PartitionReplicaListener.java | 118 +++------ .../replicator/TransactionStateResolver.java | 268 ++++++++++++++++++++- .../apache/ignite/internal/table/TxLocalTest.java | 52 +--- .../replication/PartitionReplicaListenerTest.java | 55 +++-- .../apache/ignite/distributed/ItTxTestCluster.java | 96 ++++++-- .../ignite/internal/table/TxAbstractTest.java | 38 ++- .../internal/tx/TransactionAbandonedException.java | 54 +++++ ...ateReplicaRequest.java => TransactionMeta.java} | 24 +- .../java/org/apache/ignite/internal/tx/TxMeta.java | 5 +- .../org/apache/ignite/internal/tx/TxState.java | 18 +- .../org/apache/ignite/internal/tx/TxStateMeta.java | 12 +- .../ignite/internal/tx/TxStateMetaFinishing.java | 56 +++++ .../ignite/internal/tx/impl/TxManagerImpl.java | 54 ++++- .../ignite/internal/tx/message/TxMessageGroup.java | 14 +- ...est.java => TxStateCommitPartitionRequest.java} | 13 +- ...Request.java => TxStateCoordinatorRequest.java} | 6 +- ...ateReplicaRequest.java => TxStateResponse.java} | 25 +- 26 files changed, 713 insertions(+), 259 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java index 97ce2554e5..bf28673f7e 100755 --- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java @@ -332,6 +332,9 @@ public class ErrorGroups { /** Failure due to an incompatible schema change. */ public static final int TX_INCOMPATIBLE_SCHEMA_ERR = TX_ERR_GROUP.registerErrorCode((short) 12); + + /** Failure due to an abandoned transaction. */ + public static final int TX_ABANDONED_ERR = TX_ERR_GROUP.registerErrorCode((short) 13); } /** Replicator error group. */ diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java index da1c3f7ca3..13b647e92d 100644 --- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java +++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java @@ -61,4 +61,12 @@ public interface TopologyService { * @return The node object; {@code null} if the node has not been discovered or is offline. */ @Nullable ClusterNode getByConsistentId(String consistentId); + + /** + * Returns a cluster node specified by its ID. + * + * @param id Node ID. + * @return The node object; {@code null} if the node has not been discovered or is offline. + */ + @Nullable ClusterNode getById(String id); } diff --git a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java index a98a849a7e..07e4b95ff7 100644 --- a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java +++ b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java @@ -48,6 +48,11 @@ public class TestTopologyService extends AbstractTopologyService { throw new UnsupportedOperationException(); } + @Override + public @Nullable ClusterNode getById(String id) { + throw new UnsupportedOperationException(); + } + /** * Calls {@link TopologyEventHandler#onAppeared(ClusterNode)} on all registered event handlers. * diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java index 8d11ac34d2..9c23f993c5 100644 --- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java +++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java @@ -182,6 +182,12 @@ final class ScaleCubeTopologyService extends AbstractTopologyService { return consistentIdToMemberMap.get(consistentId); } + /** {@inheritDoc} */ + @Override + public @Nullable ClusterNode getById(String id) { + return consistentIdToMemberMap.values().stream().filter(member -> member.id().equals(id)).findFirst().orElse(null); + } + /** * Converts the given {@link Member} to a {@link ClusterNode}. * diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java index 7cd29156c7..9e04a26b0c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java @@ -160,6 +160,11 @@ public class ClusterServiceFactory { public @Nullable ClusterNode getByConsistentId(String consistentId) { return allMembers.get(consistentId); } + + @Override + public @Nullable ClusterNode getById(String id) { + return allMembers.values().stream().filter(member -> member.id().equals(id)).findFirst().orElse(null); + } } private class LocalMessagingService extends AbstractMessagingService { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 05afbc7918..557867356e 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Collection; import java.util.Map; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; @@ -224,6 +225,11 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { return true; } + @Override + protected Collection<TxManager> txManagers() { + return txTestCluster.txManagers.values(); + } + @Test public void testIgniteTransactionsAndReadTimestamp() { Transaction readWriteTx = igniteTransactions.begin(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java index 702401d28e..3d8591000f 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java @@ -151,7 +151,11 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest { checkLocalTxStateOnNodes( tx.id(), - new TxStateMeta(commit ? COMMITED : ABORTED, coordinatorId, commit ? testCluster.clocks.get(coord.name()).now() : null) + new TxStateMeta( + commit ? COMMITED : ABORTED, + coordinatorId, + commit ? testCluster.clocks.get(coord.name()).now() : null + ) ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index aee43081ab..8635c3cd73 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -432,7 +432,14 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa clusterNodeResolver = topologyService::getByConsistentId; - transactionStateResolver = new TransactionStateResolver(replicaSvc, clusterNodeResolver); + transactionStateResolver = new TransactionStateResolver( + replicaSvc, + txManager, + clock, + clusterNodeResolver, + topologyService::getById, + clusterService.messagingService() + ); tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new); @@ -495,6 +502,8 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa lowWatermark.start(); + transactionStateResolver.start(); + CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture(); assert recoveryFinishFuture.isDone(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java index a37a7b512f..b838156230 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java @@ -18,13 +18,13 @@ package org.apache.ignite.internal.table.distributed.replicator; import java.io.Serializable; -import org.apache.ignite.internal.tx.TxMeta; -import org.apache.ignite.internal.tx.message.TxStateReplicaRequest; +import org.apache.ignite.internal.tx.TransactionMeta; +import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; import org.jetbrains.annotations.Nullable; /** - * Response for the {@link TxStateReplicaRequest}. Can contain either the consistent ID of the Partition Group leader, which should be - * queried for the TX Meta, or the TX Meta itself. + * Response for the {@link TxStateCommitPartitionRequest}. Can contain either the consistent ID of the Partition Group leader, which should + * be queried for the TX Meta, or the TX Meta itself. */ public class LeaderOrTxState implements Serializable { private static final long serialVersionUID = -3555591755828355117L; @@ -33,7 +33,7 @@ public class LeaderOrTxState implements Serializable { private final String leaderName; @Nullable - private final TxMeta txMeta; + private final TransactionMeta txMeta; /** * Creates a response. @@ -41,7 +41,7 @@ public class LeaderOrTxState implements Serializable { * @param leaderName Leader consistent ID. * @param txMeta TX meta. */ - public LeaderOrTxState(@Nullable String leaderName, @Nullable TxMeta txMeta) { + public LeaderOrTxState(@Nullable String leaderName, @Nullable TransactionMeta txMeta) { this.leaderName = leaderName; this.txMeta = txMeta; } @@ -50,7 +50,7 @@ public class LeaderOrTxState implements Serializable { return leaderName; } - public @Nullable TxMeta txMeta() { + public @Nullable TransactionMeta txMeta() { return txMeta; } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 42e3e3d4c7..0197aa61cf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -25,6 +25,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DROP; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.tx.TxState.ABANDONED; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITED; import static org.apache.ignite.internal.tx.TxState.PENDING; @@ -132,14 +133,14 @@ import org.apache.ignite.internal.tx.Lock; import org.apache.ignite.internal.tx.LockKey; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.LockMode; +import org.apache.ignite.internal.tx.TransactionAbandonedException; +import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; -import org.apache.ignite.internal.tx.message.TxMessagesFactory; -import org.apache.ignite.internal.tx.message.TxStateReplicaRequest; +import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.CursorUtils; @@ -168,9 +169,6 @@ public class PartitionReplicaListener implements ReplicaListener { /** Factory for creating replica command messages. */ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); - /** Tx messages factory. */ - private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); - /** Replication group id. */ private final TablePartitionId replicationGroupId; @@ -216,12 +214,6 @@ public class PartitionReplicaListener implements ReplicaListener { /** Runs async scan tasks for effective tail recursion execution (avoid deep recursive calls). */ private final Executor scanRequestExecutor; - /** - * Map to control clock's update in the read only transactions concurrently with a commit timestamp. - * TODO: IGNITE-20034 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()). - */ - private final ConcurrentHashMap<UUID, CompletableFuture<TxMeta>> txTimestampUpdateMap = new ConcurrentHashMap<>(); - private final Supplier<Map<Integer, IndexLocker>> indexesLockers; private final ConcurrentMap<UUID, TxCleanupReadyFutureList> txCleanupReadyFutures = new ConcurrentHashMap<>(); @@ -336,8 +328,8 @@ public class PartitionReplicaListener implements ReplicaListener { @Override public CompletableFuture<?> invoke(ReplicaRequest request, String senderId) { - if (request instanceof TxStateReplicaRequest) { - return processTxStateReplicaRequest((TxStateReplicaRequest) request); + if (request instanceof TxStateCommitPartitionRequest) { + return processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request); } return ensureReplicaIsPrimary(request).thenCompose(isPrimary -> processRequest(request, isPrimary, senderId)); @@ -424,49 +416,23 @@ public class PartitionReplicaListener implements ReplicaListener { * @param request Transaction state request. * @return Result future. */ - private CompletableFuture<LeaderOrTxState> processTxStateReplicaRequest(TxStateReplicaRequest request) { + private CompletableFuture<LeaderOrTxState> processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) { return placementDriver.getPrimaryReplica(replicationGroupId, hybridClock.now()) .thenCompose(primaryReplica -> { if (isLocalPeer(primaryReplica.getLeaseholder())) { - CompletableFuture<TxMeta> txStateFut = getTxStateConcurrently(request); + TransactionMeta txMeta = txManager.stateMeta(request.txId()); + + if (txMeta == null) { + txMeta = txStateStorage.get(request.txId()); + } - return txStateFut.thenApply(txMeta -> new LeaderOrTxState(null, txMeta)); + return completedFuture(new LeaderOrTxState(null, txMeta)); } else { return completedFuture(new LeaderOrTxState(primaryReplica.getLeaseholder(), null)); } }); } - /** - * Gets a transaction state or {@code null}, if the transaction is not completed. - * - * @param txStateReq Transaction state request. - * @return Future to transaction state meta or {@code null}. - */ - private CompletableFuture<TxMeta> getTxStateConcurrently(TxStateReplicaRequest txStateReq) { - //TODO: IGNITE-20034 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()). - CompletableFuture<TxMeta> txStateFut = new CompletableFuture<>(); - - txTimestampUpdateMap.compute(txStateReq.txId(), (uuid, fut) -> { - if (fut != null) { - fut.thenAccept(txStateFut::complete); - } else { - TxMeta txMeta = txStateStorage.get(txStateReq.txId()); - - if (txMeta == null) { - // All future transactions will be committed after the resolution processed. - hybridClock.update(txStateReq.readTimestamp()); - } - - txStateFut.complete(txMeta); - } - - return null; - }); - - return txStateFut; - } - /** * Processes retrieve batch for read only transaction. * @@ -1197,9 +1163,10 @@ public class PartitionReplicaListener implements ReplicaListener { UUID txId, String txCoordinatorId ) { - CompletableFuture<?> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit, txCoordinatorId); + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + CompletableFuture<?> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit, commitTimestamp, txCoordinatorId); - // TODO: https://issues.apache.org/jira/browse/IGNITE-17578 Cleanup process should be asynchronous. CompletableFuture<?>[] cleanupFutures = new CompletableFuture[request.groups().size()]; AtomicInteger cleanupFuturesCnt = new AtomicInteger(0); @@ -1225,6 +1192,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param aggregatedGroupIds Partition identifies which are enlisted in the transaction. * @param txId Transaction id. * @param commit True is the transaction is committed, false otherwise. + * @param commitTimestamp Commit timestamp, if applicable. * @param txCoordinatorId Transaction coordinator id. * @return Future to wait of the finish. */ @@ -1232,15 +1200,10 @@ public class PartitionReplicaListener implements ReplicaListener { List<TablePartitionId> aggregatedGroupIds, UUID txId, boolean commit, + @Nullable HybridTimestamp commitTimestamp, String txCoordinatorId ) { - // TODO: IGNITE-20034 Timestamp from request is not using until the issue has not been fixed (request.commitTimestamp()) - var fut = new CompletableFuture<TxMeta>(); - - txTimestampUpdateMap.put(txId, fut); - HybridTimestamp currentTimestamp = hybridClock.now(); - HybridTimestamp commitTimestamp = commit ? currentTimestamp : null; return reliableCatalogVersionFor(currentTimestamp) .thenApply(catalogVersion -> { @@ -1266,11 +1229,7 @@ public class PartitionReplicaListener implements ReplicaListener { .whenComplete((o, throwable) -> { TxState txState = commit ? COMMITED : ABORTED; - fut.complete(new TxMeta(txState, aggregatedGroupIds, commitTimestamp)); - markFinished(txId, txState, commitTimestamp); - - txTimestampUpdateMap.remove(txId); }); } @@ -1537,8 +1496,8 @@ public class PartitionReplicaListener implements ReplicaListener { checkWriteIntentsBelongSameTx(writeIntents); return resolveTxState( - new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()), writeIntent.transactionId(), + new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()), ts) .thenApply(readLastCommitted -> { if (readLastCommitted) { @@ -2508,8 +2467,8 @@ public class PartitionReplicaListener implements ReplicaListener { Supplier<BinaryRow> lastCommitted ) { return resolveTxState( - new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()), readResult.transactionId(), + new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()), timestamp ).thenApply(readLastCommitted -> { if (readLastCommitted) { @@ -2523,34 +2482,33 @@ public class PartitionReplicaListener implements ReplicaListener { /** * Resolve the actual tx state. * - * @param commitGrpId Commit partition id. * @param txId Transaction id. + * @param commitGrpId Commit partition id. * @param timestamp Timestamp. * @return The future completes with true when the transaction is not completed yet and false otherwise. */ private CompletableFuture<Boolean> resolveTxState( - TablePartitionId commitGrpId, UUID txId, + TablePartitionId commitGrpId, HybridTimestamp timestamp ) { - boolean readLatest = timestamp == null; - - return transactionStateResolver.sendMetaRequest(commitGrpId, FACTORY.txStateReplicaRequest() - .groupId(commitGrpId) - .readTimestampLong((readLatest ? HybridTimestamp.MIN_VALUE : timestamp).longValue()) - .txId(txId) - .build()) - .thenApply(txMeta -> { - if (txMeta == null) { - return true; - } else if (txMeta.txState() == COMMITED) { - return !readLatest && txMeta.commitTimestamp().compareTo(timestamp) > 0; - } else { - assert txMeta.txState() == ABORTED : "Unexpected transaction state [state=" + txMeta.txState() + ']'; + return transactionStateResolver.resolveTxState(txId, commitGrpId, timestamp).thenApply(txMeta -> { + if (txMeta == null || txMeta.txState() == ABANDONED) { + // TODO https://issues.apache.org/jira/browse/IGNITE-20427 make the null value returned from commit partition + // TODO more determined + throw new TransactionAbandonedException(txId, txMeta); + } else if (txMeta.txState() == COMMITED) { + boolean readLatest = timestamp == null; + + return !readLatest && txMeta.commitTimestamp().compareTo(timestamp) > 0; + } else if (txMeta.txState() == ABORTED) { + return true; + } else { + assert txMeta.txState() == PENDING : "Unexpected transaction state [state=" + txMeta.txState() + ']'; - return true; - } - }); + return true; + } + }); } private CompletableFuture<Void> validateAtTimestamp(UUID txId) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java index 288be7b281..2b95a7e3fa 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java @@ -17,24 +17,49 @@ package org.apache.ignite.internal.table.distributed.replicator; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.tx.TxState.ABANDONED; +import static org.apache.ignite.internal.tx.TxState.FINISHING; +import static org.apache.ignite.internal.tx.TxState.PENDING; +import static org.apache.ignite.internal.tx.TxState.isFinalState; + import java.util.Collection; import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.tx.TxMeta; -import org.apache.ignite.internal.tx.message.TxStateReplicaRequest; +import org.apache.ignite.internal.tx.TransactionMeta; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.apache.ignite.internal.tx.TxStateMetaFinishing; +import org.apache.ignite.internal.tx.message.TxMessageGroup; +import org.apache.ignite.internal.tx.message.TxMessagesFactory; +import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; +import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest; +import org.apache.ignite.internal.tx.message.TxStateResponse; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkMessage; +import org.jetbrains.annotations.Nullable; /** - * Helper class that allows to resolve transaction state. + * Placement driver. */ public class TransactionStateResolver { + /** Tx messages factory. */ + private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); + + /** Network timeout. */ + private static final long RPC_TIMEOUT = 3000; + /** Assignment node names per replication group. */ private final Map<ReplicationGroupId, LinkedHashSet<String>> primaryReplicaMapping = new ConcurrentHashMap<>(); @@ -44,29 +69,200 @@ public class TransactionStateResolver { /** Function that resolves a node consistent ID to a cluster node. */ private final Function<String, ClusterNode> clusterNodeResolver; + // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this ticket this resolver will be no longer needed, as + // TODO we will store coordinator as ClusterNode in local tx state map. + /** Function that resolves a node non-consistent ID to a cluster node. */ + private final Function<String, ClusterNode> clusterNodeResolverById; + + private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures = new ConcurrentHashMap<>(); + + private final TxManager txManager; + + private final HybridClock clock; + + private final MessagingService messagingService; + /** * The constructor. * * @param replicaService Replication service. + * @param txManager Transaction manager. + * @param clock Node clock. + * @param clusterNodeResolver Cluster node resolver. + * @param clusterNodeResolverById Cluster node resolver using non-consistent id. + * @param messagingService Messaging service. */ - public TransactionStateResolver(ReplicaService replicaService, Function<String, ClusterNode> clusterNodeResolver) { + public TransactionStateResolver( + ReplicaService replicaService, + TxManager txManager, + HybridClock clock, + Function<String, ClusterNode> clusterNodeResolver, + Function<String, ClusterNode> clusterNodeResolverById, + MessagingService messagingService + ) { this.replicaService = replicaService; + this.txManager = txManager; + this.clock = clock; this.clusterNodeResolver = clusterNodeResolver; + this.clusterNodeResolverById = clusterNodeResolverById; + this.messagingService = messagingService; + } + + /** + * This should be called in order to allow the transaction state resolver to listen to {@link TxStateCoordinatorRequest} messages. + */ + public void start() { + messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> { + if (msg instanceof TxStateCoordinatorRequest) { + TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) msg; + + processTxStateRequest(req) + .thenAccept(txStateMeta -> { + NetworkMessage response = FACTORY.txStateResponse() + .txStateMeta(txStateMeta) + .timestampLong(clock.nowLong()) + .build(); + + messagingService.respond(sender, response, correlationId); + }); + } + }); } /** - * Sends a transaction state request to the primary replica. + * Resolves transaction state locally, if possible, or distributively, if needed. * - * @param replicaGrp Replication group id. - * @param request Status request. - * @return Result future. + * @param txId Transaction id. + * @param commitGrpId Commit partition group id. + * @param timestamp Timestamp. + * @return Future with the transaction state meta as a result. */ - public CompletableFuture<TxMeta> sendMetaRequest(ReplicationGroupId replicaGrp, TxStateReplicaRequest request) { - CompletableFuture<TxMeta> resFut = new CompletableFuture<>(); + public CompletableFuture<TransactionMeta> resolveTxState( + UUID txId, + ReplicationGroupId commitGrpId, + HybridTimestamp timestamp + ) { + TxStateMeta localMeta = txManager.stateMeta(txId); + + if (localMeta != null && isFinalState(localMeta.txState())) { + return completedFuture(localMeta); + } - sendAndRetry(resFut, replicaGrp, request); + CompletableFuture<TransactionMeta> future = txStateFutures.compute(txId, (k, v) -> { + if (v == null) { + v = new CompletableFuture<>(); - return resFut; + resolveDistributiveTxState(txId, localMeta, commitGrpId, timestamp, v); + } + + return v; + }); + + future.whenComplete((v, e) -> txStateFutures.remove(txId)); + + return future; + } + + /** + * Resolve the transaction state distributively. This method doesn't process final tx states. + * + * @param txId Transaction id. + * @param localMeta Local tx meta. + * @param commitGrpId Commit partition group id. + * @param timestamp Timestamp to pass to target node. + * @param txMetaFuture Tx meta future to complete with the result. + */ + private void resolveDistributiveTxState( + UUID txId, + @Nullable TxStateMeta localMeta, + ReplicationGroupId commitGrpId, + HybridTimestamp timestamp, + CompletableFuture<TransactionMeta> txMetaFuture + ) { + assert localMeta == null || !isFinalState(localMeta.txState()) : "Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']'; + + HybridTimestamp timestamp0 = timestamp == null ? HybridTimestamp.MIN_VALUE : timestamp; + + if (localMeta == null) { + // Fallback to commit partition path, because we don't have coordinator id. + resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture); + } else if (localMeta.txState() == PENDING) { + resolveTxStateFromTxCoordinator(txId, localMeta.txCoordinatorId(), commitGrpId, timestamp0, txMetaFuture); + } else if (localMeta.txState() == FINISHING) { + assert localMeta instanceof TxStateMetaFinishing; + + ((TxStateMetaFinishing) localMeta).txFinishFuture().whenComplete((v, e) -> { + if (e == null) { + txMetaFuture.complete(v); + } else { + txMetaFuture.completeExceptionally(e); + } + }); + } else { + assert localMeta.txState() == ABANDONED : "Unexpected transaction state [txId=" + txId + ", txStateMeta=" + localMeta + ']'; + + txMetaFuture.complete(localMeta); + } + } + + private void resolveTxStateFromTxCoordinator( + UUID txId, + String coordinatorId, + ReplicationGroupId commitGrpId, + HybridTimestamp timestamp, + CompletableFuture<TransactionMeta> txMetaFuture + ) { + updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture); + + ClusterNode coordinator = clusterNodeResolverById.apply(coordinatorId); + + if (coordinator == null) { + // This means the coordinator node have either left the cluster or restarted. + resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture); + } else { + CompletableFuture<TransactionMeta> coordinatorTxMetaFuture = new CompletableFuture<>(); + + coordinatorTxMetaFuture.whenComplete((v, e) -> { + assert v != null : "Unexpected result from transaction coordinator: unknown transaction state [txId=" + txId + + ", transactionCoordinator=" + coordinator + ']'; + + if (e == null) { + txMetaFuture.complete(v); + } else { + txMetaFuture.completeExceptionally(e); + } + }); + + TxStateCoordinatorRequest request = FACTORY.txStateCoordinatorRequest() + .readTimestampLong(timestamp.longValue()) + .txId(txId) + .build(); + + sendAndRetry(coordinatorTxMetaFuture, coordinator, request); + } + } + + private void resolveTxStateFromCommitPartition( + UUID txId, + ReplicationGroupId commitGrpId, + CompletableFuture<TransactionMeta> txMetaFuture + ) { + TxStateCommitPartitionRequest request = FACTORY.txStateCommitPartitionRequest() + .groupId(commitGrpId) + .txId(txId) + .build(); + + updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture); + + sendAndRetry(txMetaFuture, commitGrpId, request); + } + + private void updateLocalTxMapAfterDistributedStateResolved(UUID txId, CompletableFuture<TransactionMeta> future) { + future.thenAccept(txMeta -> { + if (txMeta != null && txMeta instanceof TxStateMeta) { + txManager.updateTxMeta(txId, old -> (TxStateMeta) txMeta); + } + }); } /** @@ -87,7 +283,11 @@ public class TransactionStateResolver { * @param replicaGrp Replication group id. * @param request Request. */ - private void sendAndRetry(CompletableFuture<TxMeta> resFut, ReplicationGroupId replicaGrp, TxStateReplicaRequest request) { + private void sendAndRetry( + CompletableFuture<TransactionMeta> resFut, + ReplicationGroupId replicaGrp, + TxStateCommitPartitionRequest request + ) { ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream() .map(clusterNodeResolver) .filter(Objects::nonNull) @@ -115,4 +315,46 @@ public class TransactionStateResolver { } }); } + + /** + * Tries to send a request to the given node. + * + * @param resFut Response future. + * @param node Node to send to. + * @param request Request. + */ + private void sendAndRetry(CompletableFuture<TransactionMeta> resFut, ClusterNode node, TxStateCoordinatorRequest request) { + messagingService.invoke(node, request, RPC_TIMEOUT).thenAccept(resp -> { + assert resp instanceof TxStateResponse : "Unsupported response type [type=" + resp.getClass().getSimpleName() + ']'; + + TxStateResponse response = (TxStateResponse) resp; + + resFut.complete(response.txStateMeta()); + }); + } + + /** + * Processes the transaction state requests that are used for coordinator path based write intent resolution. Can't return + * {@link org.apache.ignite.internal.tx.TxState#FINISHING}, it waits for actual completion instead. + * + * @param request Request. + * @return Future that should be completed with transaction state meta. + */ + private CompletableFuture<TransactionMeta> processTxStateRequest(TxStateCoordinatorRequest request) { + clock.update(request.readTimestamp()); + + UUID txId = request.txId(); + + TxStateMeta txStateMeta = txManager.stateMeta(txId); + + if (txStateMeta != null && txStateMeta.txState() == FINISHING) { + assert txStateMeta instanceof TxStateMetaFinishing; + + TxStateMetaFinishing txStateMetaFinishing = (TxStateMetaFinishing) txStateMeta; + + return txStateMetaFinishing.txFinishFuture(); + } else { + return completedFuture(txStateMeta); + } + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java index 7277551497..951e77a0b6 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -25,9 +26,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; @@ -44,7 +47,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; import org.apache.ignite.internal.tx.impl.TxManagerImpl; -import org.apache.ignite.internal.tx.message.TxStateReplicaRequest; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.table.Table; @@ -110,14 +112,13 @@ public class TxLocalTest extends TxAbstractTest { TransactionStateResolver transactionStateResolver = mock(TransactionStateResolver.class, RETURNS_DEEP_STUBS); doAnswer(invocationOnMock -> { - TxStateReplicaRequest request = invocationOnMock.getArgument(1); + UUID txId = invocationOnMock.getArgument(0); - return CompletableFuture.completedFuture( - tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId())); - }).when(transactionStateResolver).sendMetaRequest(any(), any()); + return completedFuture(txManager.stateMeta(txId)); + }).when(transactionStateResolver).resolveTxState(any(), any(), any()); - txManager = new TxManagerImpl(replicaSvc, lockManager, DummyInternalTableImpl.CLOCK, - new TransactionIdGenerator(0xdeadbeef), () -> localNodeName); + txManager = new TxManagerImpl(replicaSvc, lockManager, DummyInternalTableImpl.CLOCK, new TransactionIdGenerator(0xdeadbeef), + () -> localNodeName); igniteTransactions = new IgniteTransactionsImpl(txManager, timestampTracker); @@ -173,39 +174,8 @@ public class TxLocalTest extends TxAbstractTest { return true; } - // TODO: https://issues.apache.org/jira/browse/IGNITE-20355 @Override - public void testReadOnlyGet() { - // No-op - } - - // TODO: https://issues.apache.org/jira/browse/IGNITE-20355 - @Override - public void testReadOnlyScan() throws Exception { - // No-op - } - - // TODO: https://issues.apache.org/jira/browse/IGNITE-20355 - @Override - public void testReadOnlyGetWriteIntentResolutionUpdate() { - // No-op - } - - // TODO: https://issues.apache.org/jira/browse/IGNITE-20355 - @Override - public void testReadOnlyGetWriteIntentResolutionRemove() { - // No-op - } - - // TODO: https://issues.apache.org/jira/browse/IGNITE-20355 - @Override - public void testReadOnlyGetAll() { - // No-op - } - - // TODO: https://issues.apache.org/jira/browse/IGNITE-20355 - @Override - public void testReadOnlyPendingWriteIntentSkippedCombined() { - super.testReadOnlyPendingWriteIntentSkippedCombined(); + protected Collection<TxManager> txManagers() { + return List.of(txManager); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index be9c6890fa..3452330b4b 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -92,6 +92,7 @@ import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; @@ -153,6 +154,7 @@ import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TransactionIds; +import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; @@ -170,6 +172,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; import org.apache.ignite.sql.ColumnType; @@ -270,7 +273,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { /** Another (not local) cluster node. */ private final ClusterNode anotherNode = new ClusterNodeImpl("node2", "node2", NetworkAddress.from("127.0.0.2:127")); - private final TransactionStateResolver transactionStateResolver = mock(TransactionStateResolver.class); + private TransactionStateResolver transactionStateResolver; private final PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(testMvPartitionStorage); @@ -297,6 +300,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Mock private CatalogTables catalogTables; + @Mock + private MessagingService messagingService; + /** Schema descriptor for tests. */ private SchemaDescriptor schemaDescriptor; @@ -381,23 +387,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { when(topologySrv.localMember()).thenReturn(localNode); - HybridTimestamp txFixedTimestamp = clock.now(); - - when(transactionStateResolver.sendMetaRequest(any(), any())).thenAnswer(invocationOnMock -> { - TxMeta txMeta; - - if (txState == null) { - txMeta = null; - } else if (txState == TxState.COMMITED) { - txMeta = new TxMeta(TxState.COMMITED, singletonList(grpId), txFixedTimestamp); - } else { - assert txState == TxState.ABORTED : "Sate is " + txState; - - txMeta = new TxMeta(TxState.ABORTED, singletonList(grpId), txFixedTimestamp); - } - return completedFuture(txMeta); - }); - when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null)); when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null)); @@ -453,6 +442,22 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { doAnswer(invocation -> txStateMeta).when(txManager).stateMeta(any()); + doAnswer(invocation -> { + var resp = new TxMessagesFactory().txStateResponse().txStateMeta(txStateMeta).build(); + return completedFuture(resp); + }).when(messagingService).invoke(any(ClusterNode.class), any(), anyLong()); + + transactionStateResolver = new TransactionStateResolver( + mock(ReplicaService.class), + txManager, + clock, + consistentId -> consistentId.equals(localNode.name()) ? localNode : anotherNode, + id -> id.equals(localNode.id()) ? localNode : anotherNode, + messagingService + ); + + transactionStateResolver.start(); + partitionReplicaListener = new PartitionReplicaListener( testMvPartitionStorage, mockRaftClient, @@ -525,9 +530,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Test public void testTxStateReplicaRequestEmptyState() throws Exception { - CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest() + CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(grpId) - .readTimestampLong(clock.nowLong()) .txId(newTxId()) .build(), "senderId"); @@ -545,15 +549,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { HybridTimestamp readTimestamp = clock.now(); - CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest() + CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(grpId) - .readTimestampLong(readTimestamp.longValue()) .txId(txId) .build(), localNode.id()); LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS); - TxMeta txMeta = tuple.txMeta(); + TransactionMeta txMeta = tuple.txMeta(); assertNotNull(txMeta); assertEquals(TxState.COMMITED, txMeta.txState()); assertNotNull(txMeta.commitTimestamp()); @@ -566,9 +569,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { public void testTxStateReplicaRequestMissLeaderMiss() throws Exception { localLeader = false; - CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest() + CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(grpId) - .readTimestampLong(clock.nowLong()) .txId(newTxId()) .build(), localNode.id()); @@ -627,6 +629,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); + txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, localNode.id(), clock.now())); CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() .groupId(grpId) @@ -649,6 +652,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); + txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), null)); CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() .groupId(grpId) @@ -672,6 +676,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); + txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, localNode.id(), null)); CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() .groupId(grpId) diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 1b3516092f..67ceb5d659 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -47,6 +47,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.IntStream; import org.apache.ignite.internal.affinity.AffinityUtils; import org.apache.ignite.internal.affinity.Assignment; @@ -137,6 +138,10 @@ import org.junit.jupiter.api.TestInfo; * Class that allows to mock a cluster for transaction tests' purposes. */ public class ItTxTestCluster { + private final List<NetworkAddress> localAddresses; + + private final NodeFinder nodeFinder; + private final RaftConfiguration raftConfig; private final GcConfiguration gcConfig; @@ -173,6 +178,8 @@ public class ItTxTestCluster { protected TxManager clientTxManager; + protected TransactionStateResolver clientTxStateResolver; + protected Map<String, List<RaftGroupService>> raftClients = new HashMap<>(); protected Map<String, TxStateStorage> txStateStorages; @@ -201,6 +208,22 @@ public class ItTxTestCluster { return null; }; + private final Function<String, ClusterNode> idToNode = id -> { + for (ClusterService service : cluster) { + ClusterNode clusterNode = service.topologyService().localMember(); + + if (clusterNode.id().equals(id)) { + return clusterNode; + } + } + + if (client != null && client.topologyService().localMember().id().equals(id)) { + return client.topologyService().localMember(); + } + + return null; + }; + private final TestInfo testInfo; /** Observable timestamp tracker. */ @@ -227,6 +250,9 @@ public class ItTxTestCluster { this.startClient = startClient; this.testInfo = testInfo; this.timestampTracker = timestampTracker; + + localAddresses = findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + nodes); + nodeFinder = new StaticNodeFinder(localAddresses); } /** @@ -236,10 +262,6 @@ public class ItTxTestCluster { assertTrue(nodes > 0); assertTrue(replicas > 0); - List<NetworkAddress> localAddresses = findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + nodes); - - var nodeFinder = new StaticNodeFinder(localAddresses); - clusterServices = new ConcurrentHashMap<>(nodes); nodeFinder.findNodes().parallelStream() @@ -258,20 +280,7 @@ public class ItTxTestCluster { LOG.info("The cluster has been started"); if (startClient) { - client = startNode(testInfo, "client", NODE_PORT_BASE - 1, nodeFinder); - - assertTrue(waitForTopology(client, nodes + 1, 1000)); - - clientClock = new HybridClockImpl(); - - LOG.info("Replica manager has been started, node=[" + client.topologyService().localMember() + ']'); - - clientReplicaSvc = new ReplicaService( - client.messagingService(), - clientClock - ); - - LOG.info("The client has been started"); + startClient(); } // Start raft servers. Each raft server can hold multiple groups. @@ -345,16 +354,15 @@ public class ItTxTestCluster { localNodeName = cluster.get(0).topologyService().localMember().name(); if (startClient) { - clientTxManager = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(), clientClock, new TransactionIdGenerator(-1), - () -> cluster.get(0).topologyService().localMember().id()); + initializeClientTxComponents(); } else { // Collocated mode. clientTxManager = txManagers.get(localNodeName); } - assertNotNull(clientTxManager); - igniteTransactions = new IgniteTransactionsImpl(clientTxManager, timestampTracker); + + assertNotNull(clientTxManager); } public IgniteTransactions igniteTransactions() { @@ -401,7 +409,15 @@ public class ItTxTestCluster { var mvTableStorage = new TestMvTableStorage(tableId, DEFAULT_PARTITION_COUNT); var mvPartStorage = new TestMvPartitionStorage(partId); var txStateStorage = txStateStorages.get(assignment); - var transactionStateResolver = new TransactionStateResolver(replicaServices.get(assignment), consistentIdToNode); + var transactionStateResolver = new TransactionStateResolver( + replicaServices.get(assignment), + txManagers.get(assignment), + clocks.get(assignment), + consistentIdToNode, + idToNode, + clusterServices.get(assignment).messagingService() + ); + transactionStateResolver.start(); for (int part = 0; part < assignments.size(); part++) { transactionStateResolver.updateAssignment(grpIds.get(part), assignments.get(part)); @@ -680,4 +696,38 @@ public class ItTxTestCluster { return network; } + + private void startClient() throws InterruptedException { + client = startNode(testInfo, "client", NODE_PORT_BASE - 1, nodeFinder); + + assertTrue(waitForTopology(client, nodes + 1, 1000)); + + clientClock = new HybridClockImpl(); + + LOG.info("Replica manager has been started, node=[" + client.topologyService().localMember() + ']'); + + clientReplicaSvc = new ReplicaService( + client.messagingService(), + clientClock + ); + + LOG.info("The client has been started"); + } + + private void initializeClientTxComponents() { + Supplier<String> localNodeIdSupplier = () -> client.topologyService().localMember().id(); + + clientTxManager = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(), clientClock, new TransactionIdGenerator(-1), + localNodeIdSupplier); + + clientTxStateResolver = new TransactionStateResolver( + clientReplicaSvc, + clientTxManager, + clientClock, + consistentIdToNode, + idToNode, + client.messagingService() + ); + clientTxStateResolver.start(); + } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index fdabc84947..69cc7409ef 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -41,6 +41,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CyclicBarrier; @@ -65,6 +66,7 @@ import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.LockMode; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.internal.util.Pair; @@ -119,8 +121,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { protected IgniteTransactions igniteTransactions; - protected TxManager clientTxManager; - @Test public void testCommitRollbackSameTxDoesNotThrow() throws TransactionException { InternalTransaction tx = (InternalTransaction) igniteTransactions.begin(); @@ -1633,7 +1633,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { * @param id The id. * @return The key tuple. */ - private Tuple makeKey(long id) { + protected Tuple makeKey(long id) { return Tuple.create().set("accountNumber", id); } @@ -1766,7 +1766,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { }); } - @Test public void testReadOnlyGet() { accounts.recordView().upsert(null, makeValue(1, 100.)); @@ -1935,6 +1934,30 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { assertEquals(BALANCE_1, accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); } + @Test + public void testWriteIntentResolutionFallbackToCommitPartitionPath() { + accounts.recordView().upsert(null, makeValue(1, 100.)); + + // Pending tx + Transaction tx = igniteTransactions.begin(); + accounts.recordView().delete(tx, makeKey(1)); + + // Imitate the restart of the client node, which is a tx coordinator, in order to make its volatile state of unavailable. + // Now coordinator path of the write intent resolution has no effect, and we should fallback to commit partition path. + UUID txId = ((ReadWriteTransactionImpl) tx).id(); + + for (TxManager txManager : txManagers()) { + txManager.updateTxMeta(txId, old -> old == null ? null : new TxStateMeta(old.txState(), "restarted", old.commitTimestamp())); + } + + // Read-only. + Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); + assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance")); + + // Commit pending tx. + tx.commit(); + } + /** * Checks operations that act after a transaction is committed, are finished with exception. * @@ -1992,4 +2015,11 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { assertThat(res, contains(null, null)); } } + + /** + * Returns server nodes' tx managers. + * + * @return Server nodes' tx managers. + */ + protected abstract Collection<TxManager> txManagers(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java new file mode 100644 index 0000000000..c35e1ada47 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ABANDONED_ERR; + +import java.util.UUID; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +/** + * Exception that is thrown when write intent can't be resolved because it has been created by a transaction that is now abandoned + * (has a state that currently can't be known). + */ +public class TransactionAbandonedException extends TransactionException { + private final UUID abandonedTxId; + + @Nullable + private final TransactionMeta transactionMeta; + + /** + * Constructor. + * + * @param abandonedTxId ID of the transaction that is abandoned. + * @param transactionMeta Transaction meta that was able to be retrieved. + */ + public TransactionAbandonedException(UUID abandonedTxId, @Nullable TransactionMeta transactionMeta) { + super(UUID.randomUUID(), TX_ABANDONED_ERR, null); + + this.abandonedTxId = abandonedTxId; + this.transactionMeta = transactionMeta; + } + + @Override + public String getMessage() { + return "Operation failed due to abandoned transaction [abandonedTxId=" + abandonedTxId + + ", transactionMeta=" + transactionMeta + ']'; + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java similarity index 60% copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java copy to modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java index 26ae27b38f..975a818479 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java @@ -15,25 +15,19 @@ * limitations under the License. */ -package org.apache.ignite.internal.tx.message; +package org.apache.ignite.internal.tx; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - -import java.util.UUID; +import java.io.Serializable; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.network.annotations.Transferable; +import org.jetbrains.annotations.Nullable; /** - * Transaction state request. + * Transaction metadata interface. */ -@Transferable(TxMessageGroup.TX_STATE_REQUEST) -public interface TxStateReplicaRequest extends ReplicaRequest { - UUID txId(); - - long readTimestampLong(); +public interface TransactionMeta extends Serializable { + /** Tx state. */ + TxState txState(); - default HybridTimestamp readTimestamp() { - return hybridTimestamp(readTimestampLong()); - } + /** Commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java index ea312e6f46..8eab25e38a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.tx; import static java.util.Collections.unmodifiableList; -import java.io.Serializable; import java.util.List; import java.util.Objects; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -28,7 +27,7 @@ import org.apache.ignite.internal.tostring.S; import org.jetbrains.annotations.Nullable; /** Transaction meta. */ -public class TxMeta implements Serializable { +public class TxMeta implements TransactionMeta { /** Serial version UID. */ private static final long serialVersionUID = -172513482743911860L; @@ -55,6 +54,7 @@ public class TxMeta implements Serializable { this.commitTimestamp = commitTimestamp; } + @Override public TxState txState() { return txState; } @@ -63,6 +63,7 @@ public class TxMeta implements Serializable { return unmodifiableList(enlistedPartitions); } + @Override public @Nullable HybridTimestamp commitTimestamp() { return commitTimestamp; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java index f0a866238a..c5a897b647 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java @@ -26,16 +26,22 @@ public enum TxState { PENDING, FINISHING, ABORTED, - COMMITED; + COMMITED, + ABANDONED; private static final boolean[][] TRANSITION_MATRIX = { - { false, true, false, true, true }, - { false, true, true, true, true }, - { false, false, false, true, true }, - { false, false, false, true, false}, - { false, false, false, false, true }, + { false, true, false, true, true, true }, + { false, true, true, true, true, true }, + { false, false, false, true, true, true }, + { false, false, false, true, false, true }, + { false, false, false, false, true, true }, + { true, true, true, true, true, true } }; + public static boolean isFinalState(TxState state) { + return state == COMMITED || state == ABORTED; + } + /** * Checks the correctness of the transition between transaction states. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index dbb3da6133..df1e5f79ce 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -23,7 +23,9 @@ import org.jetbrains.annotations.Nullable; /** * Transaction state meta. */ -public class TxStateMeta { +public class TxStateMeta implements TransactionMeta { + private static final long serialVersionUID = 8521181896862227127L; + private final TxState txState; private final String txCoordinatorId; @@ -37,12 +39,17 @@ public class TxStateMeta { * @param txCoordinatorId Transaction coordinator id. * @param commitTimestamp Commit timestamp. */ - public TxStateMeta(TxState txState, String txCoordinatorId, @Nullable HybridTimestamp commitTimestamp) { + public TxStateMeta( + TxState txState, + String txCoordinatorId, + @Nullable HybridTimestamp commitTimestamp + ) { this.txState = txState; this.txCoordinatorId = txCoordinatorId; this.commitTimestamp = commitTimestamp; } + @Override public TxState txState() { return txState; } @@ -51,6 +58,7 @@ public class TxStateMeta { return txCoordinatorId; } + @Override public @Nullable HybridTimestamp commitTimestamp() { return commitTimestamp; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java new file mode 100644 index 0000000000..9b85925d7c --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.jetbrains.annotations.Nullable; + +/** + * {@link TxStateMeta} implementation for {@link TxState#FINISHING} state. Contains future that is is completed after the state of + * corresponding transaction changes to final state. + */ +public class TxStateMetaFinishing extends TxStateMeta { + private static final long serialVersionUID = 9122953981654023665L; + + /** Future that is completed after the state of corresponding transaction changes to final state. */ + private final CompletableFuture<TransactionMeta> txFinishFuture = new CompletableFuture<>(); + + /** + * Constructor. + * + * @param txCoordinatorId Transaction coordinator id. + */ + public TxStateMetaFinishing(String txCoordinatorId) { + super(TxState.FINISHING, txCoordinatorId, null); + } + + /** + * Future that is completed after the state of corresponding transaction changes to final state. + * + * @return Future that is completed after the state of corresponding transaction changes to final state. + */ + public CompletableFuture<TransactionMeta> txFinishFuture() { + return txFinishFuture; + } + + @Override + public @Nullable HybridTimestamp commitTimestamp() { + throw new UnsupportedOperationException("Can't get commit timestamp from FINISHING transaction state meta."); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 8216b05059..c0a6979335 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -22,9 +22,9 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITED; -import static org.apache.ignite.internal.tx.TxState.FINISHING; import static org.apache.ignite.internal.tx.TxState.PENDING; import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness; +import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR; import java.util.Comparator; @@ -50,6 +50,7 @@ import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; +import org.apache.ignite.internal.tx.TxStateMetaFinishing; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.util.Lazy; @@ -230,14 +231,28 @@ public class TxManagerImpl implements TxManager { ) { assert groups != null; + // Here we put finishing state meta into the local map, so that all concurrent operations trying to read tx state + // with using read timestamp could see that this transaction is finishing, see #transactionMetaReadTimestampAware(txId, timestamp). + // None of them now are able to update node's clock with read timestamp and we can create the commit timestamp that is greater + // than all the read timestamps processed before. + // Every concurrent operation will now use a finish future from the finishing state meta and get only final transaction + // state after the transaction is finished. + TxStateMetaFinishing finishingStateMeta = new TxStateMetaFinishing(localNodeId.get()); + updateTxMeta(txId, old -> finishingStateMeta); HybridTimestamp commitTimestamp = commit ? clock.now() : null; // If there are no enlisted groups, just return - we already marked the tx as finished. boolean finishRequestNeeded = !groups.isEmpty(); - updateTxMeta(txId, old -> new TxStateMeta(finishRequestNeeded ? FINISHING : ABORTED, old.txCoordinatorId(), commitTimestamp)); - if (!finishRequestNeeded) { + updateTxMeta(txId, old -> { + TxStateMeta finalStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp); + + finishingStateMeta.txFinishFuture().complete(finalStateMeta); + + return finalStateMeta; + }); + return completedFuture(null); } @@ -254,11 +269,23 @@ public class TxManagerImpl implements TxManager { .build(); return replicaService.invoke(recipientNode, req) - .thenRun(() -> updateTxMeta(txId, old -> new TxStateMeta( - commit ? COMMITED : ABORTED, - old.txCoordinatorId(), - old.commitTimestamp() - ))); + .thenRun(() -> { + updateTxMeta(txId, old -> { + if (isFinalState(old.txState())) { + finishingStateMeta.txFinishFuture().complete(old); + + return old; + } + + assert old instanceof TxStateMetaFinishing; + + TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp); + + finishingStateMeta.txFinishFuture().complete(finalTxStateMeta); + + return finalTxStateMeta; + }); + }); } @Override @@ -353,4 +380,15 @@ public class TxManagerImpl implements TxManager { lowWatermarkReadWriteLock.writeLock().unlock(); } } + + /** + * Creates final {@link TxStateMeta} for coordinator node. + * + * @param commit Commit flag. + * @param commitTimestamp Commit timestamp. + * @return Transaction meta. + */ + private TxStateMeta coordinatorFinalTxStateMeta(boolean commit, HybridTimestamp commitTimestamp) { + return new TxStateMeta(commit ? COMMITED : ABORTED, localNodeId.get(), commitTimestamp); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java index c9ab6bedec..00abd58a1b 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java @@ -40,7 +40,17 @@ public class TxMessageGroup { public static final short TX_CLEANUP_REQUEST = 2; /** - * Message type for {@link TxStateReplicaRequest}. + * Message type for {@link TxStateCommitPartitionRequest}. */ - public static final short TX_STATE_REQUEST = 3; + public static final short TX_STATE_COMMIT_PARTITION_REQUEST = 3; + + /** + * Message type for {@link TxStateCoordinatorRequest}. + */ + public static final short TX_STATE_COORDINATOR_REQUEST = 4; + + /** + * Message type for {@link TxStateResponse}. + */ + public static final short TX_STATE_RESPONSE = 5; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java similarity index 73% copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java copy to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java index 26ae27b38f..76ed5803ad 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java @@ -17,23 +17,14 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import java.util.UUID; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.network.annotations.Transferable; /** * Transaction state request. */ -@Transferable(TxMessageGroup.TX_STATE_REQUEST) -public interface TxStateReplicaRequest extends ReplicaRequest { +@Transferable(TxMessageGroup.TX_STATE_COMMIT_PARTITION_REQUEST) +public interface TxStateCommitPartitionRequest extends ReplicaRequest { UUID txId(); - - long readTimestampLong(); - - default HybridTimestamp readTimestamp() { - return hybridTimestamp(readTimestampLong()); - } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java similarity index 87% copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java copy to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java index 26ae27b38f..80b2495f2a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java @@ -21,14 +21,14 @@ import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Transaction state request. */ -@Transferable(TxMessageGroup.TX_STATE_REQUEST) -public interface TxStateReplicaRequest extends ReplicaRequest { +@Transferable(TxMessageGroup.TX_STATE_COORDINATOR_REQUEST) +public interface TxStateCoordinatorRequest extends NetworkMessage { UUID txId(); long readTimestampLong(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java similarity index 63% rename from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java rename to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java index 26ae27b38f..9918a171eb 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java @@ -17,23 +17,18 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - -import java.util.UUID; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.replicator.message.TimestampAware; +import org.apache.ignite.internal.tx.TransactionMeta; +import org.apache.ignite.network.annotations.Marshallable; import org.apache.ignite.network.annotations.Transferable; +import org.jetbrains.annotations.Nullable; /** - * Transaction state request. + * Transaction state response. */ -@Transferable(TxMessageGroup.TX_STATE_REQUEST) -public interface TxStateReplicaRequest extends ReplicaRequest { - UUID txId(); - - long readTimestampLong(); - - default HybridTimestamp readTimestamp() { - return hybridTimestamp(readTimestampLong()); - } +@Transferable(TxMessageGroup.TX_STATE_RESPONSE) +public interface TxStateResponse extends TimestampAware { + @Marshallable + @Nullable + TransactionMeta txStateMeta(); }