This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 823142d4b7 IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624) 823142d4b7 is described below commit 823142d4b76bf6a524c672b7b05840ac0e393663 Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Mon Oct 2 12:31:08 2023 +0300 IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624) --- .../apache/ignite/client/fakes/FakeTxManager.java | 17 +++- .../ignite/internal/replicator/ReplicaManager.java | 5 +- .../replicator/PartitionReplicaListener.java | 113 ++++++++++++++------- .../replicator/SchemaCompatValidator.java | 3 +- .../RepeatedFinishReadWriteTransactionTest.java | 6 +- .../internal/table/TxLocalCleanupRecoveryTest.java | 70 +++++++++++++ .../apache/ignite/internal/table/TxLocalTest.java | 34 +++++++ .../replication/PartitionReplicaListenerTest.java | 2 - .../ignite/internal/table/TxAbstractTest.java | 2 +- .../table/impl/DummyInternalTableImpl.java | 9 ++ .../org/apache/ignite/internal/tx/TxManager.java | 8 +- .../internal/tx/impl/ReadWriteTransactionImpl.java | 20 ++-- .../ignite/internal/tx/impl/TxManagerImpl.java | 56 +++++----- .../tx/message/TxCleanupReplicaRequest.java | 11 +- 14 files changed, 246 insertions(+), 110 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index b128afb93d..f8b5487c84 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -17,6 +17,8 @@ package org.apache.ignite.client.fakes; +import static java.util.concurrent.CompletableFuture.completedFuture; + import java.util.List; import java.util.Map; import java.util.UUID; @@ -111,7 +113,7 @@ public class FakeTxManager implements TxManager { @Override public CompletableFuture<Void> commitAsync() { - return CompletableFuture.completedFuture(null); + return completedFuture(null); } @Override @@ -121,7 +123,7 @@ public class FakeTxManager implements TxManager { @Override public CompletableFuture<Void> rollbackAsync() { - return CompletableFuture.completedFuture(null); + return completedFuture(null); } @Override @@ -179,9 +181,14 @@ public class FakeTxManager implements TxManager { } @Override - public CompletableFuture<Void> cleanup(ClusterNode recipientNode, List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds, - UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) { - return null; + public CompletableFuture<Void> cleanup( + String primaryConsistentId, + TablePartitionId tablePartitionId, + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp + ) { + return completedFuture(null); } @Override diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 48a179ad68..c5599f9639 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -212,13 +212,10 @@ public class ReplicaManager implements IgniteComponent { } } ); - - return replicaFut; } else { sendAwaitReplicaResponse(senderConsistentId, correlationId); - - return replicaFut; } + return replicaFut; }); return; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 5421a01ed8..397e416a39 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -32,10 +32,12 @@ import static org.apache.ignite.internal.tx.TxState.COMMITED; import static org.apache.ignite.internal.tx.TxState.PENDING; import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; +import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.internal.util.IgniteUtils.findAny; import static org.apache.ignite.internal.util.IgniteUtils.findFirst; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR; import java.nio.ByteBuffer; @@ -56,11 +58,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -168,6 +171,10 @@ public class PartitionReplicaListener implements ReplicaListener { /** Logger. */ private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class); + private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10; + + private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5; + /** Factory to create RAFT command messages. */ private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); @@ -1173,14 +1180,15 @@ public class PartitionReplicaListener implements ReplicaListener { UUID txId = request.txId(); if (request.commit()) { - return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, request.commitTimestamp()) - .thenCompose(validationResult -> { - return finishAndCleanup(request, validationResult.isSuccessful(), aggregatedGroupIds, txId, txCoordinatorId) - .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)); - }); + HybridTimestamp commitTimestamp = request.commitTimestamp(); + + return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, commitTimestamp) + .thenCompose(validationResult -> + finishAndCleanup(aggregatedGroupIds, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId) + .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult))); } else { // Aborting. - return finishAndCleanup(request, false, aggregatedGroupIds, txId, txCoordinatorId); + return finishAndCleanup(aggregatedGroupIds, false, null, txId, txCoordinatorId); } } @@ -1193,33 +1201,74 @@ public class PartitionReplicaListener implements ReplicaListener { } private CompletableFuture<Void> finishAndCleanup( - TxFinishReplicaRequest request, + Collection<TablePartitionId> enlistedPartitions, boolean commit, - List<TablePartitionId> aggregatedGroupIds, + @Nullable HybridTimestamp commitTimestamp, UUID txId, String txCoordinatorId ) { - HybridTimestamp commitTimestamp = request.commitTimestamp(); - - CompletableFuture<?> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit, commitTimestamp, txCoordinatorId); - - CompletableFuture<?>[] cleanupFutures = new CompletableFuture[request.groups().size()]; - AtomicInteger cleanupFuturesCnt = new AtomicInteger(0); - - request.groups().forEach( - (recipientNode, tablePartitionIds) -> - cleanupFutures[cleanupFuturesCnt.getAndIncrement()] = changeStateFuture.thenCompose(ignored -> - txManager.cleanup( - recipientNode, - tablePartitionIds, - txId, - commit, - request.commitTimestamp() - ) - ) - ); + CompletableFuture<?> changeStateFuture = finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, txCoordinatorId); + + CompletableFuture<?>[] futures = enlistedPartitions.stream() + .map(partitionId -> changeStateFuture.thenCompose(ignored -> + cleanupWithRetry(commit, commitTimestamp, txId, partitionId, ATTEMPTS_TO_CLEANUP_REPLICA))) + .toArray(size -> new CompletableFuture<?>[size]); + + return allOf(futures); + } + + private CompletableFuture<Void> cleanupWithRetry( + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId, + TablePartitionId partitionId, + int attempts) { + HybridTimestamp now = hybridClock.now(); + + return findPrimaryReplica(partitionId, now) + .thenCompose(leaseHolder -> + cleanupWithRetryOnReplica(commit, commitTimestamp, txId, partitionId, leaseHolder, attempts)); + } - return allOf(cleanupFutures); + + private CompletableFuture<Void> cleanupWithRetryOnReplica( + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + UUID txId, + TablePartitionId partitionId, + String primaryConsistentId, + int attempts) { + return txManager.cleanup(primaryConsistentId, partitionId, txId, commit, commitTimestamp) + .handle((res, ex) -> { + if (ex != null) { + LOG.warn("Failed to perform cleanup on Tx {}." + (attempts > 0 ? " The operation will be retried." : ""), txId, ex); + + if (attempts > 0) { + return cleanupWithRetry(commit, commitTimestamp, txId, partitionId, attempts - 1); + } + + return CompletableFuture.<Void>failedFuture(ex); + } + + return CompletableFuture.<Void>completedFuture(null); + }) + .thenCompose(Function.identity()); + } + + private CompletableFuture<String> findPrimaryReplica(TablePartitionId partitionId, HybridTimestamp now) { + return placementDriver.awaitPrimaryReplica(partitionId, now) + .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS) + .handle((primaryReplica, e) -> { + if (e != null) { + LOG.error("Failed to retrieve primary replica for partition {}", partitionId, e); + + throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, + "Failed to get the primary replica" + + " [tablePartitionId=" + partitionId + ", awaitTimestamp=" + now + ']', e); + } + + return primaryReplica.getLeaseholder(); + }); } /** @@ -1233,7 +1282,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @return Future to wait of the finish. */ private CompletableFuture<Object> finishTransaction( - List<TablePartitionId> aggregatedGroupIds, + Collection<TablePartitionId> aggregatedGroupIds, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, @@ -2416,10 +2465,6 @@ public class PartitionReplicaListener implements ReplicaListener { } else if (request instanceof TxFinishReplicaRequest) { expectedTerm = ((TxFinishReplicaRequest) request).term(); - assert expectedTerm != null; - } else if (request instanceof TxCleanupReplicaRequest) { - expectedTerm = ((TxCleanupReplicaRequest) request).term(); - assert expectedTerm != null; } else { expectedTerm = null; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java index 327457633a..9b37043388 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.replicator; import static java.util.stream.Collectors.toSet; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.UUID; @@ -58,7 +59,7 @@ class SchemaCompatValidator { */ CompletableFuture<CompatValidationResult> validateForward( UUID txId, - List<TablePartitionId> enlistedGroupIds, + Collection<TablePartitionId> enlistedGroupIds, @Nullable HybridTimestamp commitTimestamp ) { HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java index d36b9d9349..ed114e14d4 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java @@ -292,13 +292,13 @@ public class RepeatedFinishReadWriteTransactionTest extends BaseIgniteAbstractTe @Override public CompletableFuture<Void> cleanup( - ClusterNode recipientNode, - List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds, + String primaryConsistentId, + TablePartitionId tablePartitionId, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp ) { - return null; + return completedFuture(null); } @Override diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java new file mode 100644 index 0000000000..6e8e36aae2 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.replicator.exception.ReplicationException; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; +import org.apache.ignite.tx.TransactionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; + +/** + * Durable cleanup test with successfull recovery after the fauilures. + */ +public class TxLocalCleanupRecoveryTest extends TxLocalTest { + + private AtomicInteger failureCounter; + + @BeforeEach + void initTest() { + // The value of 3 is less than the allowed number of cleanup retries. + failureCounter = new AtomicInteger(3); + } + + + @Override + protected MethodAnswer invokeOnMessagingMock(InvocationOnMock invocationOnMock) { + ReplicaRequest request = invocationOnMock.getArgument(1); + + if (request instanceof TxCleanupReplicaRequest && failureCounter.getAndDecrement() > 0) { + return new MethodAnswer(CompletableFuture.failedFuture(new ReplicationException( + REPLICA_COMMON_ERR, + "Test Tx Cleanup exception [replicaGroupId=" + request.groupId() + ']'))); + } + // Otherwise use the parent stub. + return super.invokeOnMessagingMock(invocationOnMock); + } + + + @Test + @Override + public void testDeleteUpsertCommit() throws TransactionException { + // The value of 6 is higher than the default retry count. + // So we should give up retrying and crash. + failureCounter = new AtomicInteger(6); + + assertThrows(TransactionException.class, () -> deleteUpsert().commit()); + } +} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java index 149f965642..bc69c327d2 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java @@ -50,9 +50,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.table.Table; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.mockito.invocation.InvocationOnMock; /** * Local table tests. @@ -81,6 +83,12 @@ public class TxLocalTest extends TxAbstractTest { Map<ReplicationGroupId, DummyInternalTableImpl> tables = new HashMap<>(); doAnswer(invocationOnMock -> { + MethodAnswer answer = invokeOnMessagingMock(invocationOnMock); + + if (answer.hasAnswer) { + return answer.answer; + } + ReplicaRequest request = invocationOnMock.getArgument(1); ReplicaListener replicaListener = tables.get(request.groupId()).getReplicaListener(); @@ -151,6 +159,10 @@ public class TxLocalTest extends TxAbstractTest { tables.put(table2.groupId(), table2); } + protected MethodAnswer invokeOnMessagingMock(InvocationOnMock invocationOnMock) { + return MethodAnswer.NO_ANSWER; + } + @AfterEach public void tearDown() throws Exception { if (txManager != null) { @@ -188,4 +200,26 @@ public class TxLocalTest extends TxAbstractTest { protected Collection<TxManager> txManagers() { return List.of(txManager); } + + /** + * A class that represents the result of method execution. + */ + protected static class MethodAnswer { + static final MethodAnswer NO_ANSWER = new MethodAnswer(null, false); + + /** The result of method invocation. */ + final @Nullable Object answer; + + /** If {@code true} the method was called. */ + final boolean hasAnswer; + + public MethodAnswer(@Nullable Object answer, boolean hasAnswer) { + this.answer = answer; + this.hasAnswer = hasAnswer; + } + + public MethodAnswer(@Nullable Object answer) { + this(answer, true); + } + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index a921284989..abc281d7a4 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -1276,7 +1276,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .txId(txId) .commit(true) .commitTimestampLong(now.longValue()) - .term(1L) .build(), localNode.id() ); @@ -2025,7 +2024,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { .txId(txId) .commit(true) .commitTimestampLong(commitTs.longValue()) - .term(1L) .build(), localNode.id() ).join(); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 69cc7409ef..d5f9fa98d1 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -161,7 +161,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); } - private InternalTransaction deleteUpsert() { + protected InternalTransaction deleteUpsert() { accounts.recordView().upsert(null, makeValue(1, 100.)); InternalTransaction tx = (InternalTransaction) igniteTransactions.begin(); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index f7efbf495e..33facb20be 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -257,6 +258,14 @@ public class DummyInternalTableImpl extends InternalTableImpl { return replicaListener.invoke(invocationOnMock.getArgument(1), node.id()); }) .when(replicaSvc).invoke(any(ClusterNode.class), any()); + + lenient() + .doAnswer(invocationOnMock -> { + String nodeId = invocationOnMock.getArgument(0); + + return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId); + }) + .when(replicaSvc).invoke(anyString(), any()); } AtomicLong raftIndex = new AtomicLong(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index c0fbd57a02..280cc91e74 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -128,16 +128,16 @@ public interface TxManager extends IgniteComponent { /** * Sends cleanup request to the specified primary replica. * - * @param recipientNode Primary replica to process given cleanup request. - * @param tablePartitionIds Table partition ids with raft terms. + * @param primaryConsistentId A consistent id of the primary replica node. + * @param tablePartitionId Table partition id. * @param txId Transaction id. * @param commit {@code True} if a commit requested. * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). * @return Completable future of Void. */ CompletableFuture<Void> cleanup( - ClusterNode recipientNode, - List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds, + String primaryConsistentId, + TablePartitionId tablePartitionId, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index e191f8ca2d..f26090ab2f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -120,22 +120,14 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> groups = new LinkedHashMap<>(); if (!enlisted.isEmpty()) { - enlisted.forEach((groupId, groupMeta) -> { - ClusterNode recipientNode = groupMeta.get1(); + enlisted.forEach((groupId, groupMeta) -> + groups.computeIfAbsent(groupMeta.get1(), clusterNode -> new ArrayList<>()) + .add(new IgniteBiTuple<>(groupId, groupMeta.get2()))); - if (groups.containsKey(recipientNode)) { - groups.get(recipientNode).add(new IgniteBiTuple<>(groupId, groupMeta.get2())); - } else { - List<IgniteBiTuple<TablePartitionId, Long>> items = new ArrayList<>(); + IgniteBiTuple<ClusterNode, Long> nodeAndTerm = enlisted.get(commitPart); - items.add(new IgniteBiTuple<>(groupId, groupMeta.get2())); - - groups.put(recipientNode, items); - } - }); - - ClusterNode recipientNode = enlisted.get(commitPart).get1(); - Long term = enlisted.get(commitPart).get2(); + ClusterNode recipientNode = nodeAndTerm.get1(); + Long term = nodeAndTerm.get2(); LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, groups={}", recipientNode, term, commit, id(), groups); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index ca96d6cb3a..f8adbe49a4 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -294,51 +294,43 @@ public class TxManagerImpl implements TxManager { .build(); return replicaService.invoke(recipientNode, req) - .thenRun(() -> { - updateTxMeta(txId, old -> { - if (isFinalState(old.txState())) { - finishingStateMeta.txFinishFuture().complete(old); + .thenRun(() -> + updateTxMeta(txId, old -> { + if (isFinalState(old.txState())) { + finishingStateMeta.txFinishFuture().complete(old); - return old; - } + return old; + } - assert old instanceof TxStateMetaFinishing; + assert old instanceof TxStateMetaFinishing; - TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp); + TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp); - finishingStateMeta.txFinishFuture().complete(finalTxStateMeta); + finishingStateMeta.txFinishFuture().complete(finalTxStateMeta); - return finalTxStateMeta; - }); - }); + return finalTxStateMeta; + }) + ); } @Override public CompletableFuture<Void> cleanup( - ClusterNode recipientNode, - List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds, + String primaryConsistentId, + TablePartitionId tablePartitionId, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp ) { - var cleanupFutures = new CompletableFuture[tablePartitionIds.size()]; - - // TODO: https://issues.apache.org/jira/browse/IGNITE-17582 Grouping replica requests. - for (int i = 0; i < tablePartitionIds.size(); i++) { - cleanupFutures[i] = replicaService.invoke( - recipientNode, - FACTORY.txCleanupReplicaRequest() - .groupId(tablePartitionIds.get(i).get1()) - .timestampLong(clock.nowLong()) - .txId(txId) - .commit(commit) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) - .term(tablePartitionIds.get(i).get2()) - .build() - ); - } - - return allOf(cleanupFutures); + return replicaService.invoke( + primaryConsistentId, + FACTORY.txCleanupReplicaRequest() + .groupId(tablePartitionId) + .timestampLong(clock.nowLong()) + .txId(txId) + .commit(commit) + .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .build() + ); } @Override diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java index d5503e71cf..2032cf1352 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java @@ -34,7 +34,7 @@ import org.jetbrains.annotations.Nullable; * <li>Release all locks that were held on local replica by given transaction.</li> * </ol> */ -@Transferable(value = TxMessageGroup.TX_CLEANUP_REQUEST) +@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST) public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware { /** * Returns transaction Id. @@ -65,13 +65,4 @@ public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware default @Nullable HybridTimestamp commitTimestamp() { return nullableHybridTimestamp(commitTimestampLong()); } - - /** - * Gets a raft term. - * TODO: A temp solution until lease-based engine will be implemented (IGNITE-17256, IGNITE-15083) - * - * @return Raft term. - */ - @Deprecated - Long term(); }