This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 688158ebab5 IGNITE-27894 Fixed txn cleanup retries on 
TxCleanupMessageErrorResponse (#7616)
688158ebab5 is described below

commit 688158ebab50a9d9e93b00018e51fb8a438b4b74
Author: Denis Chudov <[email protected]>
AuthorDate: Wed Feb 18 18:38:38 2026 +0200

    IGNITE-27894 Fixed txn cleanup retries on TxCleanupMessageErrorResponse 
(#7616)
---
 .../apache/ignite/internal/util/IgniteUtils.java   |  31 +++++
 .../internal/testframework/IgniteTestUtils.java    | 138 +++++++++++++++++++++
 ...tDistributionZoneMetaStorageCompactionTest.java |   6 -
 .../distributionzones/ItEmptyDataNodesTest.java    |   4 -
 .../ItBuildIndexWriteIntentsHandlingTest.java      |  14 ++-
 .../internal/ClusterPerClassIntegrationTest.java   |  19 +--
 .../internal/ClusterPerTestIntegrationTest.java    |  20 +++
 .../tx/distributed/ItTxCleanupFailureTest.java     |  98 +++++++++++++++
 .../internal/tx/impl/TxCleanupRequestSender.java   |  91 +++++++++-----
 .../apache/ignite/internal/tx/TxCleanupTest.java   |   3 +-
 10 files changed, 366 insertions(+), 58 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 63cf65f443d..f67cb78720c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +60,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1339,6 +1341,35 @@ public class IgniteUtils {
         return list.toArray();
     }
 
+    /**
+     * Schedules the provided operation to be retried after the specified 
delay.
+     *
+     * @param operation Operation.
+     * @param delay Delay.
+     * @param unit Time unit of the delay.
+     * @param executor Executor to schedule the retry in.
+     * @return Future that is completed when the operation is successful or 
failed with an exception.
+     */
+    public static <T> CompletableFuture<T> scheduleRetry(
+            Callable<CompletableFuture<T>> operation,
+            long delay,
+            TimeUnit unit,
+            ScheduledExecutorService executor
+    ) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+
+        executor.schedule(() -> operation.call()
+                .whenComplete((res, e) -> {
+                    if (e == null) {
+                        future.complete(res);
+                    } else {
+                        future.completeExceptionally(e);
+                    }
+                }), delay, unit);
+
+        return future;
+    }
+
     private static CompletableFuture<Void> startAsync(ComponentContext 
componentContext, Stream<? extends IgniteComponent> components) {
         return allOf(components
                 .filter(Objects::nonNull)
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index bc104d20fc3..f632b3fdc5c 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -61,8 +61,11 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BooleanSupplier;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -77,6 +80,8 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.ThreadOperation;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
 import org.awaitility.Awaitility;
 import org.hamcrest.CustomMatcher;
 import org.jetbrains.annotations.NotNull;
@@ -1094,6 +1099,29 @@ public final class IgniteTestUtils {
         return new UUID(str.hashCode(), new 
StringBuilder(str).reverse().toString().hashCode());
     }
 
+    /**
+     * Converts a result set to a list of rows.
+     *
+     * @param resultSet Result set to convert.
+     * @return List of rows.
+     */
+    public static List<List<Object>> getAllResultSet(ResultSet<SqlRow> 
resultSet) {
+        List<List<Object>> res = new ArrayList<>();
+
+        while (resultSet.hasNext()) {
+            SqlRow sqlRow = resultSet.next();
+
+            ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
+            for (int i = 0; i < sqlRow.columnCount(); i++) {
+                row.add(sqlRow.value(i));
+            }
+
+            res.add(row);
+        }
+
+        return res;
+    }
+
     /**
      * Non-concurrent executor service for test purposes.
      *
@@ -1132,4 +1160,114 @@ public final class IgniteTestUtils {
             }
         };
     }
+
+    /**
+     * Non-concurrent scheduled executor service for test purposes. Uses 
CompletableFuture#delayedExecutor.
+     *
+     * @return Executor service.
+     */
+    public static ScheduledExecutorService testSyncScheduledExecutorService() {
+        return new ScheduledExecutorService() {
+            private final ExecutorService delegate = testSyncExecutorService();
+
+            @Override
+            public void execute(@NotNull Runnable command) {
+                delegate.execute(command);
+            }
+
+            @Override
+            public void shutdown() {
+                delegate.shutdown();
+            }
+
+            @Override
+            public @NotNull List<Runnable> shutdownNow() {
+                return delegate.shutdownNow();
+            }
+
+            @Override
+            public boolean isShutdown() {
+                return delegate.isShutdown();
+            }
+
+            @Override
+            public boolean isTerminated() {
+                return delegate.isTerminated();
+            }
+
+            @Override
+            public boolean awaitTermination(long timeout, @NotNull TimeUnit 
unit) throws InterruptedException {
+                return delegate.awaitTermination(timeout, unit);
+            }
+
+            @Override
+            public @NotNull <T> Future<T> submit(@NotNull Callable<T> task) {
+                return delegate.submit(task);
+            }
+
+            @Override
+            public @NotNull <T> Future<T> submit(@NotNull Runnable task, T 
result) {
+                return delegate.submit(task, result);
+            }
+
+            @Override
+            public @NotNull Future<?> submit(@NotNull Runnable task) {
+                return delegate.submit(task);
+            }
+
+            @Override
+            public @NotNull <T> List<Future<T>> invokeAll(@NotNull 
Collection<? extends Callable<T>> tasks) throws InterruptedException {
+                return delegate.invokeAll(tasks);
+            }
+
+            @Override
+            public @NotNull <T> List<Future<T>> invokeAll(@NotNull 
Collection<? extends Callable<T>> tasks, long timeout,
+                    @NotNull TimeUnit unit) throws InterruptedException {
+                return delegate.invokeAll(tasks, timeout, unit);
+            }
+
+            @Override
+            public @NotNull <T> T invokeAny(@NotNull Collection<? extends 
Callable<T>> tasks)
+                    throws InterruptedException, ExecutionException {
+                return delegate.invokeAny(tasks);
+            }
+
+            @Override
+            public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> 
tasks, long timeout, @NotNull TimeUnit unit)
+                    throws ExecutionException, InterruptedException, 
TimeoutException {
+                return delegate.invokeAny(tasks, timeout, unit);
+            }
+
+            @Override
+            public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable 
command, long delay, @NotNull TimeUnit unit) {
+                CompletableFuture.delayedExecutor(delay, 
unit).execute(command);
+                return null;
+            }
+
+            @Override
+            public @NotNull <V> ScheduledFuture<V> schedule(@NotNull 
Callable<V> callable, long delay, @NotNull TimeUnit unit) {
+                CompletableFuture.delayedExecutor(delay, unit).execute(() -> {
+                    try {
+                        callable.call();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+                return null;
+            }
+
+            @Override
+            public @NotNull ScheduledFuture<?> scheduleAtFixedRate(@NotNull 
Runnable command, long initialDelay, long period,
+                    @NotNull TimeUnit unit) {
+                throw new UnsupportedOperationException("Not implemented.");
+            }
+
+            @Override
+            public @NotNull ScheduledFuture<?> scheduleWithFixedDelay(@NotNull 
Runnable command, long initialDelay, long delay,
+                    @NotNull TimeUnit unit) {
+                throw new UnsupportedOperationException("Not implemented.");
+            }
+        };
+    }
 }
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
index 3ca9de5aad2..afe5a262b41 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
@@ -186,12 +186,6 @@ public class ItDistributionZoneMetaStorageCompactionTest 
extends ClusterPerTestI
         );
     }
 
-    private void sql(String sql) {
-        cluster.doInSession(0, session -> {
-            executeUpdate(sql, session);
-        });
-    }
-
     private static Set<String> dataNodes(IgniteImpl ignite, int zoneId, 
HybridTimestamp ts) {
         CompletableFuture<Set<String>> dataNodesBeforeStopFut = ignite
                 .distributionZoneManager()
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
index 3513f1a5013..cb57e095e68 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItEmptyDataNodesTest.java
@@ -160,10 +160,6 @@ class ItEmptyDataNodesTest extends 
ClusterPerTestIntegrationTest {
         return nodeFut.join();
     }
 
-    private void sql(String sql) {
-        cluster.aliveNode().sql().execute(sql);
-    }
-
     private CompletableFuture<?> sqlAsync(String sql) {
         return cluster.aliveNode().sql().executeAsync(sql);
     }
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
index 1ce17a8dcbd..64df11ca209 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.lang.Thread.sleep;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.ClusterPerClassIntegrationTest.isIndexAvailable;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
@@ -24,6 +25,7 @@ import static 
org.apache.ignite.internal.index.IndexBuildTestUtils.INDEX_NAME;
 import static org.apache.ignite.internal.index.IndexBuildTestUtils.TABLE_NAME;
 import static 
org.apache.ignite.internal.index.IndexBuildTestUtils.createTestTable;
 import static 
org.apache.ignite.internal.index.WriteIntentSwitchControl.disableWriteIntentSwitchExecution;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -31,6 +33,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
 import 
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.tx.Transaction;
@@ -51,8 +54,17 @@ class ItBuildIndexWriteIntentsHandlingTest extends 
ClusterPerTestIntegrationTest
         cluster.restartNode(txCoordinatorOrdinal);
 
         createIndex(INDEX_NAME);
+
+        // Allow cleanup to be completed after some time. This is required 
because transaction abortion (that is triggered by
+        // write intent resolution that is done in index build task) is 
completed only after successful txn cleanup, and the index
+        // can't become available before building is completed.
+        runAsync(() -> {
+            sleep(5_000);
+            runningNodesIter().forEach(IgniteImpl::stopDroppingMessages);
+        });
+
         await("Index did not become available in time")
-                .atMost(10, SECONDS)
+                .atMost(30, SECONDS)
                 .until(() -> 
isIndexAvailable(unwrapIgniteImpl(cluster.aliveNode()), INDEX_NAME));
 
         verifyNoNodesHaveAnythingInIndex();
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
index 291823da796..b489a8eb517 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
@@ -30,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.time.ZoneId;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -571,23 +571,6 @@ public abstract class ClusterPerClassIntegrationTest 
extends BaseIgniteAbstractT
         sql.executeScript(query, args);
     }
 
-    private static List<List<Object>> getAllResultSet(ResultSet<SqlRow> 
resultSet) {
-        List<List<Object>> res = new ArrayList<>();
-
-        while (resultSet.hasNext()) {
-            SqlRow sqlRow = resultSet.next();
-
-            ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
-            for (int i = 0; i < sqlRow.columnCount(); i++) {
-                row.add(sqlRow.value(i));
-            }
-
-            res.add(row);
-        }
-
-        return res;
-    }
-
     /**
      * Looks up a node by a consistent ID, {@code null} if absent.
      *
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index b3fd3ebb334..23ef2a861b3 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
 
 import static 
org.apache.ignite.internal.ConfigTemplates.NODE_BOOTSTRAP_CFG_TEMPLATE;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
 
 import java.nio.file.Path;
 import java.util.List;
@@ -38,6 +39,8 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.testframework.junit.DumpThreadsOnTimeout;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -183,6 +186,13 @@ public abstract class ClusterPerTestIntegrationTest 
extends BaseIgniteAbstractTe
         return cluster.runningNodes();
     }
 
+    /**
+     * Returns nodes that are started and not stopped. This can include 
knocked out nodes.
+     */
+    protected final Iterable<IgniteImpl> runningNodesIter() {
+        return cluster.runningNodes().map(node -> 
unwrapIgniteImpl(node))::iterator;
+    }
+
     /**
      * Restarts a node by index.
      *
@@ -257,6 +267,16 @@ public abstract class ClusterPerTestIntegrationTest 
extends BaseIgniteAbstractTe
         return 
runningNodes().map(TestWrappers::unwrapIgniteImpl).findFirst().orElseThrow();
     }
 
+    protected final List<List<Object>> sql(String sql) {
+        return sql(null, sql);
+    }
+
+    protected final List<List<Object>> sql(Transaction tx, String sql) {
+        try (ResultSet<SqlRow> rs = anyNode().sql().execute(tx, sql)) {
+            return getAllResultSet(rs);
+        }
+    }
+
     /** Cluster configuration that aggressively increases low watermark to 
speed up data cleanup in tests. */
     public static String aggressiveLowWatermarkIncreaseClusterConfig() {
         return "{\n"
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
new file mode 100644
index 00000000000..723e57df6d5
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx.distributed;
+
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for transaction cleanup failure.
+ */
+public class ItTxCleanupFailureTest extends ClusterPerTestIntegrationTest {
+    /** Table name. */
+    private static final String TABLE_NAME = "test_table";
+    private static final int REPLICAS = 3;
+
+    @BeforeEach
+    public void setup() {
+        String zoneSql = "create zone test_zone (partitions 1, replicas " + 
REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String tableSql = "create table " + TABLE_NAME + " (key bigint primary 
key, val varchar(20)) zone TEST_ZONE";
+
+        sql(zoneSql);
+        sql(tableSql);
+    }
+
+    @Test
+    public void testRetry() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger failedCleanupAttempts = new AtomicInteger();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof WriteIntentSwitchReplicaRequest && 
failedCleanupAttempts.get() == 0) {
+                    // Makes cleanup fail on write intent switch attempt with 
replication timeout, on the first attempt.
+                    return failedCleanupAttempts.incrementAndGet() == 1;
+                }
+
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, TimeUnit.SECONDS).until(() -> 
failedCleanupAttempts.get() == 1);
+
+        // Checks that cleanup finally succeeded.
+        await().timeout(5, TimeUnit.SECONDS).until(() -> 
pendingWriteIntents(node) == 0);
+    }
+
+    private static long pendingWriteIntents(IgniteImpl node) {
+        Iterable<Metric> metrics = node.metricManager()
+                .metricSnapshot()
+                .metrics()
+                .get(TransactionMetricsSource.SOURCE_NAME);
+
+        for (Metric m : metrics) {
+            if (m.name().equals(METRIC_PENDING_WRITE_INTENTS)) {
+                return ((LongMetric) m).value();
+            }
+        }
+
+        fail();
+
+        return -1;
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index c9516d04382..c48d4e655fe 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -17,14 +17,15 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.lang.Math.min;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
-import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
 import static org.apache.ignite.internal.tx.TxStateMeta.builder;
-import static 
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,10 +39,10 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
@@ -62,10 +63,11 @@ import org.jetbrains.annotations.Nullable;
  * Sends TX Cleanup request.
  */
 public class TxCleanupRequestSender {
-    private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRequestSender.class);
-
     private static final int ATTEMPTS_LOG_THRESHOLD = 100;
 
+    private static final int RETRY_INITIAL_TIMEOUT_MS = 20;
+    private static final int RETRY_MAX_TIMEOUT_MS = 30_000;
+
     private final IgniteThrottledLogger throttledLog;
 
     /** Placement driver helper. */
@@ -82,6 +84,9 @@ public class TxCleanupRequestSender {
     /** Executor that executes async cleanup actions. */
     private final ExecutorService cleanupExecutor;
 
+    /** Executor that is used to schedule retries of cleanup messages in case 
of retryable errors. */
+    private final ScheduledExecutorService retryExecutor;
+
     /**
      * The constructor.
      *
@@ -89,20 +94,21 @@ public class TxCleanupRequestSender {
      * @param placementDriverHelper Placement driver helper.
      * @param txStateVolatileStorage Volatile transaction state storage.
      * @param cleanupExecutor Cleanup executor.
-     * @param throttledLogExecutor Executor to clean up the throttled logger 
cache.
+     * @param commonScheduler Common scheduler.
      */
     public TxCleanupRequestSender(
             TxMessageSender txMessageSender,
             PlacementDriverHelper placementDriverHelper,
             VolatileTxStateMetaStorage txStateVolatileStorage,
             ExecutorService cleanupExecutor,
-            Executor throttledLogExecutor
+            ScheduledExecutorService commonScheduler
     ) {
         this.txMessageSender = txMessageSender;
         this.placementDriverHelper = placementDriverHelper;
         this.txStateVolatileStorage = txStateVolatileStorage;
         this.cleanupExecutor = cleanupExecutor;
-        this.throttledLog = 
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), 
throttledLogExecutor);
+        this.retryExecutor = commonScheduler;
+        this.throttledLog = 
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), 
commonScheduler);
     }
 
     /**
@@ -182,7 +188,7 @@ public class TxCleanupRequestSender {
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, 
String node, UUID txId) {
-        return sendCleanupMessageWithRetries(commitPartitionId, false, null, 
txId, node, null, 0);
+        return sendCleanupMessageWithRetries(commitPartitionId, false, null, 
txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
     }
 
     /**
@@ -219,7 +225,7 @@ public class TxCleanupRequestSender {
             enlistedPartitionGroups.add(new 
EnlistedPartitionGroup(partitionId, partition.tableIds()));
         });
 
-        return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, 
commit, commitTimestamp, txId, 0);
+        return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, 
commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0);
     }
 
     /**
@@ -239,7 +245,7 @@ public class TxCleanupRequestSender {
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
-        return cleanup(commitPartitionId, partitions, commit, commitTimestamp, 
txId, 0);
+        return cleanup(commitPartitionId, partitions, commit, commitTimestamp, 
txId, RETRY_INITIAL_TIMEOUT_MS, 0);
     }
 
     private CompletableFuture<Void> cleanup(
@@ -248,6 +254,7 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
+            long timeout,
             int attemptsMade
     ) {
         Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds = 
partitions.stream()
@@ -270,6 +277,7 @@ public class TxCleanupRequestSender {
                             commitTimestamp,
                             txId,
                             
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds),
+                            timeout,
                             attemptsMade
                     );
 
@@ -283,6 +291,7 @@ public class TxCleanupRequestSender {
                             commit,
                             commitTimestamp,
                             txId,
+                            timeout,
                             attemptsMade
                     );
                 });
@@ -311,6 +320,7 @@ public class TxCleanupRequestSender {
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
             List<EnlistedPartitionGroup> partitionsWithoutPrimary,
+            long timeout,
             int attemptsMade
     ) {
         Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds = 
partitionsWithoutPrimary.stream()
@@ -324,7 +334,15 @@ public class TxCleanupRequestSender {
                             partitionIdsByPrimaryName,
                             partitionIds
                     );
-                    return cleanupPartitions(commitPartitionId, 
partitionsByPrimaryName, commit, commitTimestamp, txId, attemptsMade);
+                    return cleanupPartitions(
+                            commitPartitionId,
+                            partitionsByPrimaryName,
+                            commit,
+                            commitTimestamp,
+                            txId,
+                            timeout,
+                            attemptsMade
+                    );
                 });
     }
 
@@ -334,6 +352,7 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
+            long timeout,
             int attemptsMade
     ) {
         List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
@@ -343,7 +362,7 @@ public class TxCleanupRequestSender {
             List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
 
             
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node,
-                    commitPartitionId == null ? null : nodePartitions, 
attemptsMade));
+                    commitPartitionId == null ? null : nodePartitions, 
timeout, attemptsMade));
         }
 
         return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
@@ -356,9 +375,19 @@ public class TxCleanupRequestSender {
             UUID txId,
             String node,
             @Nullable Collection<EnlistedPartitionGroup> partitions,
+            long timeout,
             int attemptsMade
     ) {
         return txMessageSender.cleanup(node, partitions, txId, commit, 
commitTimestamp)
+                .thenApply(response -> {
+                    if (response instanceof TxCleanupMessageErrorResponse) {
+                        TxCleanupMessageErrorResponse errorResponse = 
(TxCleanupMessageErrorResponse) response;
+
+                        sneakyThrow(errorResponse.throwable());
+                    }
+
+                    return response;
+                })
                 .handleAsync((networkMessage, throwable) -> {
                     if (throwable != null) {
                         if 
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
@@ -379,7 +408,7 @@ public class TxCleanupRequestSender {
                             // or will run `switchWriteIntentsOnPartitions` 
for partitions with no primary.
                             // At the end of the day all write intents will be 
properly converted.
                             if (partitions == null) {
-                                // If we don't have any partition, which is 
the recovery or unlock only case,
+                                // If we don't have any partition, which is 
the recovery or "unlock only" case,
                                 // just try again with the same node.
                                 return sendCleanupMessageWithRetries(
                                         commitPartitionId,
@@ -388,28 +417,30 @@ public class TxCleanupRequestSender {
                                         txId,
                                         node,
                                         partitions,
+                                        incrementTimeout(timeout),
                                         attemptsMade + 1
                                 );
                             }
 
                             // Run a cleanup that finds new primaries for the 
given partitions.
                             // This covers the case when a partition primary 
died and we still want to switch write intents.
-                            return cleanup(commitPartitionId, partitions, 
commit, commitTimestamp, txId, attemptsMade + 1);
-                        }
-
-                        return CompletableFuture.<Void>failedFuture(throwable);
-                    }
-
-                    if (networkMessage instanceof 
TxCleanupMessageErrorResponse) {
-                        TxCleanupMessageErrorResponse errorResponse = 
(TxCleanupMessageErrorResponse) networkMessage;
-                        if 
(writeIntentSwitchFailureShouldBeLogged(errorResponse.throwable())) {
-                            LOG.warn(
-                                    "First cleanup attempt failed (the 
transaction outcome is not affected) {}.",
-                                    errorResponse.throwable(), 
formatTxInfo(txId, txStateVolatileStorage)
+                            return scheduleRetry(
+                                    () -> cleanup(
+                                        commitPartitionId,
+                                        partitions,
+                                        commit,
+                                        commitTimestamp,
+                                        txId,
+                                        incrementTimeout(timeout),
+                                        attemptsMade + 1
+                                    ),
+                                    timeout,
+                                    TimeUnit.MILLISECONDS,
+                                    retryExecutor
                             );
                         }
 
-                        // We don't fail the resulting future as a failing 
cleanup is not a problem.
+                        return CompletableFuture.<Void>failedFuture(throwable);
                     }
 
                     return CompletableFutures.<Void>nullCompletedFuture();
@@ -417,6 +448,10 @@ public class TxCleanupRequestSender {
                 .thenCompose(v -> v);
     }
 
+    private static long incrementTimeout(long currentTimeout) {
+        return min(currentTimeout * 2, RETRY_MAX_TIMEOUT_MS);
+    }
+
     private static class CleanupContext {
         private final ZonePartitionId commitPartitionId;
 
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 060af184b42..1732625a81e 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -22,6 +22,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testSyncExecutorService;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testSyncScheduledExecutorService;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -147,7 +148,7 @@ public class TxCleanupTest extends IgniteAbstractTest {
                 placementDriverHelper,
                 mock(VolatileTxStateMetaStorage.class),
                 testSyncExecutorService(),
-                Runnable::run
+                testSyncScheduledExecutorService()
         );
     }
 


Reply via email to