This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 688158ebab5 IGNITE-27894 Fixed txn cleanup retries on
TxCleanupMessageErrorResponse (#7616)
688158ebab5 is described below
commit 688158ebab50a9d9e93b00018e51fb8a438b4b74
Author: Denis Chudov <[email protected]>
AuthorDate: Wed Feb 18 18:38:38 2026 +0200
IGNITE-27894 Fixed txn cleanup retries on TxCleanupMessageErrorResponse
(#7616)
---
.../apache/ignite/internal/util/IgniteUtils.java | 31 +++++
.../internal/testframework/IgniteTestUtils.java | 138 +++++++++++++++++++++
...tDistributionZoneMetaStorageCompactionTest.java | 6 -
.../distributionzones/ItEmptyDataNodesTest.java | 4 -
.../ItBuildIndexWriteIntentsHandlingTest.java | 14 ++-
.../internal/ClusterPerClassIntegrationTest.java | 19 +--
.../internal/ClusterPerTestIntegrationTest.java | 20 +++
.../tx/distributed/ItTxCleanupFailureTest.java | 98 +++++++++++++++
.../internal/tx/impl/TxCleanupRequestSender.java | 91 +++++++++-----
.../apache/ignite/internal/tx/TxCleanupTest.java | 3 +-
10 files changed, 366 insertions(+), 58 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 63cf65f443d..f67cb78720c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +60,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -1339,6 +1341,35 @@ public class IgniteUtils {
return list.toArray();
}
+ /**
+ * Schedules the provided operation to be retried after the specified
delay.
+ *
+ * @param operation Operation.
+ * @param delay Delay.
+ * @param unit Time unit of the delay.
+ * @param executor Executor to schedule the retry in.
+ * @return Future that is completed when the operation is successful or
failed with an exception.
+ */
+ public static <T> CompletableFuture<T> scheduleRetry(
+ Callable<CompletableFuture<T>> operation,
+ long delay,
+ TimeUnit unit,
+ ScheduledExecutorService executor
+ ) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+
+ executor.schedule(() -> operation.call()
+ .whenComplete((res, e) -> {
+ if (e == null) {
+ future.complete(res);
+ } else {
+ future.completeExceptionally(e);
+ }
+ }), delay, unit);
+
+ return future;
+ }
+
private static CompletableFuture<Void> startAsync(ComponentContext
componentContext, Stream<? extends IgniteComponent> components) {
return allOf(components
.filter(Objects::nonNull)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index bc104d20fc3..f632b3fdc5c 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -61,8 +61,11 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -77,6 +80,8 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
import org.awaitility.Awaitility;
import org.hamcrest.CustomMatcher;
import org.jetbrains.annotations.NotNull;
@@ -1094,6 +1099,29 @@ public final class IgniteTestUtils {
return new UUID(str.hashCode(), new
StringBuilder(str).reverse().toString().hashCode());
}
+ /**
+ * Converts a result set to a list of rows.
+ *
+ * @param resultSet Result set to convert.
+ * @return List of rows.
+ */
+ public static List<List<Object>> getAllResultSet(ResultSet<SqlRow>
resultSet) {
+ List<List<Object>> res = new ArrayList<>();
+
+ while (resultSet.hasNext()) {
+ SqlRow sqlRow = resultSet.next();
+
+ ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
+ for (int i = 0; i < sqlRow.columnCount(); i++) {
+ row.add(sqlRow.value(i));
+ }
+
+ res.add(row);
+ }
+
+ return res;
+ }
+
/**
* Non-concurrent executor service for test purposes.
*
@@ -1132,4 +1160,114 @@ public final class IgniteTestUtils {
}
};
}
+
+ /**
+ * Non-concurrent scheduled executor service for test purposes. Uses
CompletableFuture#delayedExecutor.
+ *
+ * @return Executor service.
+ */
+ public static ScheduledExecutorService testSyncScheduledExecutorService() {
+ return new ScheduledExecutorService() {
+ private final ExecutorService delegate = testSyncExecutorService();
+
+ @Override
+ public void execute(@NotNull Runnable command) {
+ delegate.execute(command);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public @NotNull List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, @NotNull TimeUnit
unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public @NotNull <T> Future<T> submit(@NotNull Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public @NotNull <T> Future<T> submit(@NotNull Runnable task, T
result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public @NotNull Future<?> submit(@NotNull Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public @NotNull <T> List<Future<T>> invokeAll(@NotNull
Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public @NotNull <T> List<Future<T>> invokeAll(@NotNull
Collection<? extends Callable<T>> tasks, long timeout,
+ @NotNull TimeUnit unit) throws InterruptedException {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public @NotNull <T> T invokeAny(@NotNull Collection<? extends
Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(@NotNull Collection<? extends Callable<T>>
tasks, long timeout, @NotNull TimeUnit unit)
+ throws ExecutionException, InterruptedException,
TimeoutException {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable
command, long delay, @NotNull TimeUnit unit) {
+ CompletableFuture.delayedExecutor(delay,
unit).execute(command);
+ return null;
+ }
+
+ @Override
+ public @NotNull <V> ScheduledFuture<V> schedule(@NotNull
Callable<V> callable, long delay, @NotNull TimeUnit unit) {
+ CompletableFuture.delayedExecutor(delay, unit).execute(() -> {
+ try {
+ callable.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ return null;
+ }
+
+ @Override
+ public @NotNull ScheduledFuture<?> scheduleAtFixedRate(@NotNull
Runnable command, long initialDelay, long period,
+ @NotNull TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public @NotNull ScheduledFuture<?> scheduleWithFixedDelay(@NotNull
Runnable command, long initialDelay, long delay,
+ @NotNull TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+ };
+ }
}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
index 3ca9de5aad2..afe5a262b41 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
@@ -186,12 +186,6 @@ public class ItDistributionZoneMetaStorageCompactionTest
extends ClusterPerTestI
);
}
- private void sql(String sql) {
- cluster.doInSession(0, session -> {
- executeUpdate(sql, session);
- });
- }
-
private static Set<String> dataNodes(IgniteImpl ignite, int zoneId,
HybridTimestamp ts) {
CompletableFuture<Set<String>> dataNodesBeforeStopFut = ignite
.distributionZoneManager()
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
index 3513f1a5013..cb57e095e68 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
@@ -160,10 +160,6 @@ class ItEmptyDataNodesTest extends
ClusterPerTestIntegrationTest {
return nodeFut.join();
}
- private void sql(String sql) {
- cluster.aliveNode().sql().execute(sql);
- }
-
private CompletableFuture<?> sqlAsync(String sql) {
return cluster.aliveNode().sql().executeAsync(sql);
}
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
index 1ce17a8dcbd..64df11ca209 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.index;
+import static java.lang.Thread.sleep;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.ClusterPerClassIntegrationTest.isIndexAvailable;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
@@ -24,6 +25,7 @@ import static
org.apache.ignite.internal.index.IndexBuildTestUtils.INDEX_NAME;
import static org.apache.ignite.internal.index.IndexBuildTestUtils.TABLE_NAME;
import static
org.apache.ignite.internal.index.IndexBuildTestUtils.createTestTable;
import static
org.apache.ignite.internal.index.WriteIntentSwitchControl.disableWriteIntentSwitchExecution;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -31,6 +33,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.tx.Transaction;
@@ -51,8 +54,17 @@ class ItBuildIndexWriteIntentsHandlingTest extends
ClusterPerTestIntegrationTest
cluster.restartNode(txCoordinatorOrdinal);
createIndex(INDEX_NAME);
+
+ // Allow cleanup to be completed after some time. This is required
because transaction abortion (that is triggered by
+ // write intent resolution that is done in index build task) is
completed only after successful txn cleanup, and the index
+ // can't become available before building is completed.
+ runAsync(() -> {
+ sleep(5_000);
+ runningNodesIter().forEach(IgniteImpl::stopDroppingMessages);
+ });
+
await("Index did not become available in time")
- .atMost(10, SECONDS)
+ .atMost(30, SECONDS)
.until(() ->
isIndexAvailable(unwrapIgniteImpl(cluster.aliveNode()), INDEX_NAME));
verifyNoNodesHaveAnythingInIndex();
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
index 291823da796..b489a8eb517 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
@@ -30,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.time.ZoneId;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -571,23 +571,6 @@ public abstract class ClusterPerClassIntegrationTest
extends BaseIgniteAbstractT
sql.executeScript(query, args);
}
- private static List<List<Object>> getAllResultSet(ResultSet<SqlRow>
resultSet) {
- List<List<Object>> res = new ArrayList<>();
-
- while (resultSet.hasNext()) {
- SqlRow sqlRow = resultSet.next();
-
- ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
- for (int i = 0; i < sqlRow.columnCount(); i++) {
- row.add(sqlRow.value(i));
- }
-
- res.add(row);
- }
-
- return res;
- }
-
/**
* Looks up a node by a consistent ID, {@code null} if absent.
*
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index b3fd3ebb334..23ef2a861b3 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import static
org.apache.ignite.internal.ConfigTemplates.NODE_BOOTSTRAP_CFG_TEMPLATE;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
import java.nio.file.Path;
import java.util.List;
@@ -38,6 +39,8 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.testframework.junit.DumpThreadsOnTimeout;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -183,6 +186,13 @@ public abstract class ClusterPerTestIntegrationTest
extends BaseIgniteAbstractTe
return cluster.runningNodes();
}
+ /**
+ * Returns nodes that are started and not stopped. This can include
knocked out nodes.
+ */
+ protected final Iterable<IgniteImpl> runningNodesIter() {
+ return cluster.runningNodes().map(node ->
unwrapIgniteImpl(node))::iterator;
+ }
+
/**
* Restarts a node by index.
*
@@ -257,6 +267,16 @@ public abstract class ClusterPerTestIntegrationTest
extends BaseIgniteAbstractTe
return
runningNodes().map(TestWrappers::unwrapIgniteImpl).findFirst().orElseThrow();
}
+ protected final List<List<Object>> sql(String sql) {
+ return sql(null, sql);
+ }
+
+ protected final List<List<Object>> sql(Transaction tx, String sql) {
+ try (ResultSet<SqlRow> rs = anyNode().sql().execute(tx, sql)) {
+ return getAllResultSet(rs);
+ }
+ }
+
/** Cluster configuration that aggressively increases low watermark to
speed up data cleanup in tests. */
public static String aggressiveLowWatermarkIncreaseClusterConfig() {
return "{\n"
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
new file mode 100644
index 00000000000..723e57df6d5
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx.distributed;
+
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for transaction cleanup failure.
+ */
+public class ItTxCleanupFailureTest extends ClusterPerTestIntegrationTest {
+ /** Table name. */
+ private static final String TABLE_NAME = "test_table";
+ private static final int REPLICAS = 3;
+
+ @BeforeEach
+ public void setup() {
+ String zoneSql = "create zone test_zone (partitions 1, replicas " +
REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String tableSql = "create table " + TABLE_NAME + " (key bigint primary
key, val varchar(20)) zone TEST_ZONE";
+
+ sql(zoneSql);
+ sql(tableSql);
+ }
+
+ @Test
+ public void testRetry() {
+ IgniteImpl node = anyNode();
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val)
values (1, 'val-1')");
+
+ AtomicInteger failedCleanupAttempts = new AtomicInteger();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof WriteIntentSwitchReplicaRequest &&
failedCleanupAttempts.get() == 0) {
+ // Makes cleanup fail on write intent switch attempt with
replication timeout, on the first attempt.
+ return failedCleanupAttempts.incrementAndGet() == 1;
+ }
+
+ return false;
+ });
+ }
+
+ tx.commitAsync();
+
+ await().timeout(5, TimeUnit.SECONDS).until(() ->
failedCleanupAttempts.get() == 1);
+
+ // Checks that cleanup finally succeeded.
+ await().timeout(5, TimeUnit.SECONDS).until(() ->
pendingWriteIntents(node) == 0);
+ }
+
+ private static long pendingWriteIntents(IgniteImpl node) {
+ Iterable<Metric> metrics = node.metricManager()
+ .metricSnapshot()
+ .metrics()
+ .get(TransactionMetricsSource.SOURCE_NAME);
+
+ for (Metric m : metrics) {
+ if (m.name().equals(METRIC_PENDING_WRITE_INTENTS)) {
+ return ((LongMetric) m).value();
+ }
+ }
+
+ fail();
+
+ return -1;
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index c9516d04382..c48d4e655fe 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -17,14 +17,15 @@
package org.apache.ignite.internal.tx.impl;
+import static java.lang.Math.min;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
-import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
import static org.apache.ignite.internal.tx.TxStateMeta.builder;
-import static
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,10 +39,10 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
@@ -62,10 +63,11 @@ import org.jetbrains.annotations.Nullable;
* Sends TX Cleanup request.
*/
public class TxCleanupRequestSender {
- private static final IgniteLogger LOG =
Loggers.forClass(TxCleanupRequestSender.class);
-
private static final int ATTEMPTS_LOG_THRESHOLD = 100;
+ private static final int RETRY_INITIAL_TIMEOUT_MS = 20;
+ private static final int RETRY_MAX_TIMEOUT_MS = 30_000;
+
private final IgniteThrottledLogger throttledLog;
/** Placement driver helper. */
@@ -82,6 +84,9 @@ public class TxCleanupRequestSender {
/** Executor that executes async cleanup actions. */
private final ExecutorService cleanupExecutor;
+ /** Executor that is used to schedule retries of cleanup messages in case
of retryable errors. */
+ private final ScheduledExecutorService retryExecutor;
+
/**
* The constructor.
*
@@ -89,20 +94,21 @@ public class TxCleanupRequestSender {
* @param placementDriverHelper Placement driver helper.
* @param txStateVolatileStorage Volatile transaction state storage.
* @param cleanupExecutor Cleanup executor.
- * @param throttledLogExecutor Executor to clean up the throttled logger
cache.
+ * @param commonScheduler Common scheduler.
*/
public TxCleanupRequestSender(
TxMessageSender txMessageSender,
PlacementDriverHelper placementDriverHelper,
VolatileTxStateMetaStorage txStateVolatileStorage,
ExecutorService cleanupExecutor,
- Executor throttledLogExecutor
+ ScheduledExecutorService commonScheduler
) {
this.txMessageSender = txMessageSender;
this.placementDriverHelper = placementDriverHelper;
this.txStateVolatileStorage = txStateVolatileStorage;
this.cleanupExecutor = cleanupExecutor;
- this.throttledLog =
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class),
throttledLogExecutor);
+ this.retryExecutor = commonScheduler;
+ this.throttledLog =
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class),
commonScheduler);
}
/**
@@ -182,7 +188,7 @@ public class TxCleanupRequestSender {
* @return Completable future of Void.
*/
public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId,
String node, UUID txId) {
- return sendCleanupMessageWithRetries(commitPartitionId, false, null,
txId, node, null, 0);
+ return sendCleanupMessageWithRetries(commitPartitionId, false, null,
txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
}
/**
@@ -219,7 +225,7 @@ public class TxCleanupRequestSender {
enlistedPartitionGroups.add(new
EnlistedPartitionGroup(partitionId, partition.tableIds()));
});
- return cleanupPartitions(commitPartitionId, partitionsByPrimaryName,
commit, commitTimestamp, txId, 0);
+ return cleanupPartitions(commitPartitionId, partitionsByPrimaryName,
commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0);
}
/**
@@ -239,7 +245,7 @@ public class TxCleanupRequestSender {
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
- return cleanup(commitPartitionId, partitions, commit, commitTimestamp,
txId, 0);
+ return cleanup(commitPartitionId, partitions, commit, commitTimestamp,
txId, RETRY_INITIAL_TIMEOUT_MS, 0);
}
private CompletableFuture<Void> cleanup(
@@ -248,6 +254,7 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
+ long timeout,
int attemptsMade
) {
Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds =
partitions.stream()
@@ -270,6 +277,7 @@ public class TxCleanupRequestSender {
commitTimestamp,
txId,
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds),
+ timeout,
attemptsMade
);
@@ -283,6 +291,7 @@ public class TxCleanupRequestSender {
commit,
commitTimestamp,
txId,
+ timeout,
attemptsMade
);
});
@@ -311,6 +320,7 @@ public class TxCleanupRequestSender {
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
List<EnlistedPartitionGroup> partitionsWithoutPrimary,
+ long timeout,
int attemptsMade
) {
Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds =
partitionsWithoutPrimary.stream()
@@ -324,7 +334,15 @@ public class TxCleanupRequestSender {
partitionIdsByPrimaryName,
partitionIds
);
- return cleanupPartitions(commitPartitionId,
partitionsByPrimaryName, commit, commitTimestamp, txId, attemptsMade);
+ return cleanupPartitions(
+ commitPartitionId,
+ partitionsByPrimaryName,
+ commit,
+ commitTimestamp,
+ txId,
+ timeout,
+ attemptsMade
+ );
});
}
@@ -334,6 +352,7 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
+ long timeout,
int attemptsMade
) {
List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
@@ -343,7 +362,7 @@ public class TxCleanupRequestSender {
List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node,
- commitPartitionId == null ? null : nodePartitions,
attemptsMade));
+ commitPartitionId == null ? null : nodePartitions,
timeout, attemptsMade));
}
return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
@@ -356,9 +375,19 @@ public class TxCleanupRequestSender {
UUID txId,
String node,
@Nullable Collection<EnlistedPartitionGroup> partitions,
+ long timeout,
int attemptsMade
) {
return txMessageSender.cleanup(node, partitions, txId, commit,
commitTimestamp)
+ .thenApply(response -> {
+ if (response instanceof TxCleanupMessageErrorResponse) {
+ TxCleanupMessageErrorResponse errorResponse =
(TxCleanupMessageErrorResponse) response;
+
+ sneakyThrow(errorResponse.throwable());
+ }
+
+ return response;
+ })
.handleAsync((networkMessage, throwable) -> {
if (throwable != null) {
if
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
@@ -379,7 +408,7 @@ public class TxCleanupRequestSender {
// or will run `switchWriteIntentsOnPartitions`
for partitions with no primary.
// At the end of the day all write intents will be
properly converted.
if (partitions == null) {
- // If we don't have any partition, which is
the recovery or unlock only case,
+ // If we don't have any partition, which is
the recovery or "unlock only" case,
// just try again with the same node.
return sendCleanupMessageWithRetries(
commitPartitionId,
@@ -388,28 +417,30 @@ public class TxCleanupRequestSender {
txId,
node,
partitions,
+ incrementTimeout(timeout),
attemptsMade + 1
);
}
// Run a cleanup that finds new primaries for the
given partitions.
// This covers the case when a partition primary
died and we still want to switch write intents.
- return cleanup(commitPartitionId, partitions,
commit, commitTimestamp, txId, attemptsMade + 1);
- }
-
- return CompletableFuture.<Void>failedFuture(throwable);
- }
-
- if (networkMessage instanceof
TxCleanupMessageErrorResponse) {
- TxCleanupMessageErrorResponse errorResponse =
(TxCleanupMessageErrorResponse) networkMessage;
- if
(writeIntentSwitchFailureShouldBeLogged(errorResponse.throwable())) {
- LOG.warn(
- "First cleanup attempt failed (the
transaction outcome is not affected) {}.",
- errorResponse.throwable(),
formatTxInfo(txId, txStateVolatileStorage)
+ return scheduleRetry(
+ () -> cleanup(
+ commitPartitionId,
+ partitions,
+ commit,
+ commitTimestamp,
+ txId,
+ incrementTimeout(timeout),
+ attemptsMade + 1
+ ),
+ timeout,
+ TimeUnit.MILLISECONDS,
+ retryExecutor
);
}
- // We don't fail the resulting future as a failing
cleanup is not a problem.
+ return CompletableFuture.<Void>failedFuture(throwable);
}
return CompletableFutures.<Void>nullCompletedFuture();
@@ -417,6 +448,10 @@ public class TxCleanupRequestSender {
.thenCompose(v -> v);
}
+ private static long incrementTimeout(long currentTimeout) {
+ return min(currentTimeout * 2, RETRY_MAX_TIMEOUT_MS);
+ }
+
private static class CleanupContext {
private final ZonePartitionId commitPartitionId;
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 060af184b42..1732625a81e 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -22,6 +22,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testSyncExecutorService;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testSyncScheduledExecutorService;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -147,7 +148,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
placementDriverHelper,
mock(VolatileTxStateMetaStorage.class),
testSyncExecutorService(),
- Runnable::run
+ testSyncScheduledExecutorService()
);
}