This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 33e83aa9f5 IGNITE-17578 Transactions: async cleanup processing on tx
commit (#2529)
33e83aa9f5 is described below
commit 33e83aa9f5b1a0fd50e6679129d526772d74b985
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Sep 7 18:19:22 2023 +0300
IGNITE-17578 Transactions: async cleanup processing on tx commit (#2529)
---
...ItTxDistributedTestThreeNodesThreeReplicas.java | 8 +++++++
...butedTestThreeNodesThreeReplicasCollocated.java | 8 +++++++
.../replicator/PartitionReplicaListener.java | 27 +++++++++++++++-------
.../apache/ignite/distributed/ItTxTestCluster.java | 8 +++----
4 files changed, 38 insertions(+), 13 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
index 71dbcb273a..966ae8cacb 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInfo;
/**
@@ -49,6 +50,13 @@ public class ItTxDistributedTestThreeNodesThreeReplicas
extends ItTxDistributedT
return 3;
}
+ /** {@inheritDoc} */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20116")
+ @Override
+ public void testBalance() throws InterruptedException {
+ super.testBalance();
+ }
+
@Override
@AfterEach
public void after() throws Exception {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
index 79a6080e6b..e128acac09 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -46,6 +47,13 @@ public class
ItTxDistributedTestThreeNodesThreeReplicasCollocated extends ItTxDi
return false;
}
+ /** {@inheritDoc} */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20116")
+ @Override
+ public void testBalance() throws InterruptedException {
+ super.testBalance();
+ }
+
/** {@inheritDoc} */
@BeforeEach
@Override public void before() throws Exception {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ee9f8de59a..9d5c3faab0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -69,6 +69,8 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -158,6 +160,9 @@ import org.jetbrains.annotations.Nullable;
/** Partition replication listener. */
public class PartitionReplicaListener implements ReplicaListener {
+ /** Logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(PartitionReplicaListener.class);
+
/** Factory to create RAFT command messages. */
private static final TableMessagesFactory MSG_FACTORY = new
TableMessagesFactory();
@@ -1255,9 +1260,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
/**
* Processes transaction cleanup request:
* <ol>
- * <li>Run specific raft {@code TxCleanupCommand} command, that will
convert all pending entries(writeIntents)
- * to either regular values({@link TxState#COMMITED}) or removing them
({@link TxState#ABORTED}).</li>
- * <li>Release all locks that were held on local Replica by given
transaction.</li>
+ * <li>Waits for finishing of local transactional operations;</li>
+ * <li>Runs asynchronously the specific raft {@code TxCleanupCommand}
command, that will convert all pending entries(writeIntents)
+ * to either regular values({@link TxState#COMMITED}) or removing them
({@link TxState#ABORTED});</li>
+ * <li>Releases all locks that were held on local Replica by given
transaction.</li>
* </ol>
* This operation is idempotent, so it's safe to retry it.
*
@@ -1302,7 +1308,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (txUpdateFutures.isEmpty()) {
if (!txReadFutures.isEmpty()) {
- allOffFuturesExceptionIgnored(txReadFutures, request)
+ return allOffFuturesExceptionIgnored(txReadFutures, request)
.thenRun(() -> releaseTxLocks(request.txId()));
}
@@ -1325,10 +1331,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
storageUpdateHandler.handleTransactionCleanup(request.txId(), request.commit(),
request.commitTimestamp());
- return raftClient
- .run(txCleanupCmd)
- .thenCompose(ignored ->
allOffFuturesExceptionIgnored(txReadFutures, request)
- .thenRun(() ->
releaseTxLocks(request.txId())));
+ raftClient.run(txCleanupCmd)
+ .exceptionally(e -> {
+ LOG.warn("Failed to complete transaction
cleanup command [txId=" + request.txId() + ']', e);
+
+ return completedFuture(null);
+ });
+
+ return allOffFuturesExceptionIgnored(txReadFutures,
request)
+ .thenRun(() -> releaseTxLocks(request.txId()));
});
});
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 3d62ed1d4e..3fed2fe27d 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -387,8 +387,10 @@ public class ItTxTestCluster {
TablePartitionId grpId = grpIds.get(p);
for (String assignment : partAssignments) {
+ int partId = p;
+
var mvTableStorage = new TestMvTableStorage(tableId,
DEFAULT_PARTITION_COUNT);
- var mvPartStorage = new TestMvPartitionStorage(0);
+ var mvPartStorage = new TestMvPartitionStorage(partId);
var txStateStorage = txStateStorages.get(assignment);
var placementDriver = new
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
@@ -396,8 +398,6 @@ public class ItTxTestCluster {
placementDriver.updateAssignment(grpIds.get(part),
assignments.get(part));
}
- int partId = p;
-
int indexId = globalIndexId++;
ColumnsExtractor row2Tuple =
BinaryRowConverter.keyExtractor(schemaDescriptor);
@@ -438,8 +438,6 @@ public class ItTxTestCluster {
new RaftGroupEventsClientListener()
);
- TxManager txManager = txManagers.get(assignment);
-
PartitionListener partitionListener = new PartitionListener(
txManagers.get(assignment),
partitionDataStorage,