This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c7908d8f755 IGNITE-27685 Implement exponential backoff for durable 
finish (#7680)
c7908d8f755 is described below

commit c7908d8f75555a9bbd17f462236f30d028560dce
Author: Konstantin Sirotkin <[email protected]>
AuthorDate: Tue Apr 21 16:15:11 2026 +0300

    IGNITE-27685 Implement exponential backoff for durable finish (#7680)
---
 .../apache/ignite/internal/util/IgniteUtils.java   |  31 --
 .../retry/ExponentialBackoffTimeoutStrategy.java   | 132 ++++++
 .../internal/util/retry/KeyBasedRetryContext.java  | 155 +++++++
 .../internal/util/retry/NoopTimeoutStrategy.java   |  40 ++
 .../ignite/internal/util/retry/RetryContext.java   |  74 ++++
 .../ignite/internal/util/retry/RetryUtil.java      | 110 +++++
 .../ignite/internal/util/retry/TimeoutState.java   | 149 +++++++
 .../internal/util/retry/TimeoutStrategy.java       |  47 +++
 .../ExponentialBackoffTimeoutStrategyTest.java     | 129 ++++++
 .../util/retry/KeyBasedRetryContextTest.java       | 386 +++++++++++++++++
 .../internal/util/retry/TimeoutStateTest.java      | 132 ++++++
 .../rebalance/ItRebalanceDistributedTest.java      |   4 +-
 .../partition/replicator/fixtures/Node.java        |   4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |   4 +-
 .../apache/ignite/distributed/ItLockTableTest.java |   4 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   4 +-
 .../ignite/internal/table/ItColocationTest.java    |   4 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |   7 +-
 .../table/impl/DummyInternalTableImpl.java         |   4 +-
 .../tx/distributed/ItDurableFinishFailureTest.java | 468 +++++++++++++++++++++
 .../tx/distributed/ItTxCleanupFailureTest.java     | 406 +++++++++++++++++-
 .../internal/tx/impl/TxCleanupRequestSender.java   | 102 ++---
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  54 ++-
 .../apache/ignite/internal/tx/TxCleanupTest.java   |   5 +-
 .../apache/ignite/internal/tx/TxManagerTest.java   |   4 +-
 27 files changed, 2339 insertions(+), 128 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f67cb78720c..63cf65f443d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -51,7 +51,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,7 +59,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1341,35 +1339,6 @@ public class IgniteUtils {
         return list.toArray();
     }
 
-    /**
-     * Schedules the provided operation to be retried after the specified 
delay.
-     *
-     * @param operation Operation.
-     * @param delay Delay.
-     * @param unit Time unit of the delay.
-     * @param executor Executor to schedule the retry in.
-     * @return Future that is completed when the operation is successful or 
failed with an exception.
-     */
-    public static <T> CompletableFuture<T> scheduleRetry(
-            Callable<CompletableFuture<T>> operation,
-            long delay,
-            TimeUnit unit,
-            ScheduledExecutorService executor
-    ) {
-        CompletableFuture<T> future = new CompletableFuture<>();
-
-        executor.schedule(() -> operation.call()
-                .whenComplete((res, e) -> {
-                    if (e == null) {
-                        future.complete(res);
-                    } else {
-                        future.completeExceptionally(e);
-                    }
-                }), delay, unit);
-
-        return future;
-    }
-
     private static CompletableFuture<Void> startAsync(ComponentContext 
componentContext, Stream<? extends IgniteComponent> components) {
         return allOf(components
                 .filter(Objects::nonNull)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java
new file mode 100644
index 00000000000..58c38cc95b3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@link TimeoutStrategy} that increases retry timeouts exponentially on 
each attempt.
+ *
+ * <p>Each call to {@link #next(int)} multiplies the current timeout by {@code 
backoffCoefficient},
+ * capping the result at {@link #maxTimeout}. Optionally, random jitter can be 
applied to spread
+ * retry attempts across time and avoid thundering herd problems under high 
concurrency.
+ *
+ * <p>When jitter is enabled, the returned timeout is randomized within the 
range
+ * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout}.
+ *
+ * <p>This class is stateless and thread-safe. A single instance can be shared 
across
+ * multiple retry contexts.
+ */
+// TODO: https://issues.apache.org/jira/browse/IGNITE-28481
+public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
+    /**
+     * Default backoff coefficient applied on each retry step. Doubles the 
timeout per attempt.
+     */
+    private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
+
+    /**
+     * Multiplier applied to the current timeout on each call to {@link 
#next(int)}.
+     * Must be greater than {@code 1.0} to produce a growing sequence.
+     */
+    private final double backoffCoefficient;
+
+    /**
+     * Whether to apply random jitter to the computed timeout.
+     * When {@code true}, the result is randomized within {@code [raw / 2, raw 
* 1.5]}.
+     */
+    private final boolean jitter;
+
+    /**
+     * Upper bound for the timeout produced by this strategy, in milliseconds.
+     * The result of {@link #next(int)} is always capped at this value.
+     */
+    private final int maxTimeout;
+
+    /**
+     * Creates a strategy with default max timeout and backoff coefficient, 
and no jitter.
+     *
+     * @see TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX
+     * @see #DEFAULT_BACKOFF_COEFFICIENT
+     */
+    public ExponentialBackoffTimeoutStrategy() {
+        this(DEFAULT_RETRY_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT);
+    }
+
+    /**
+     * Creates a strategy with the given max timeout and backoff coefficient, 
and no jitter.
+     *
+     * @param maxTimeout maximum timeout this strategy may produce, in 
milliseconds.
+     * @param backoffCoefficient multiplier applied to the current timeout on 
each step.
+     *                           Must be greater than {@code 1.0}.
+     */
+    public ExponentialBackoffTimeoutStrategy(
+            int maxTimeout,
+            double backoffCoefficient
+    ) {
+        this(maxTimeout, backoffCoefficient, false);
+    }
+
+    /**
+     * Creates a strategy with the given max timeout, backoff coefficient, and 
jitter setting.
+     *
+     * @param maxTimeout maximum timeout this strategy may produce, in 
milliseconds.
+     * @param backoffCoefficient multiplier applied to the current timeout on 
each step.
+     *                           Must be greater than {@code 1.0}.
+     * @param jitter if {@code true}, random jitter is applied to each 
computed timeout.
+     */
+    public ExponentialBackoffTimeoutStrategy(
+            int maxTimeout,
+            double backoffCoefficient,
+            boolean jitter
+    ) {
+        this.maxTimeout = maxTimeout;
+        this.backoffCoefficient = backoffCoefficient;
+        this.jitter = jitter;
+    }
+
+    /**
+     * Computes the next retry timeout by multiplying {@code currentTimeout} by
+     * {@link #backoffCoefficient}, then capping at {@link #maxTimeout}.
+     * If jitter is enabled, the result is further randomized.
+     *
+     * @param currentTimeout current retry timeout in milliseconds.
+     * @return next retry timeout in milliseconds, capped at {@link 
#maxTimeout}.
+     */
+    @Override
+    public int next(int currentTimeout) {
+        int jitteredTimeout = jitter ? applyJitter(currentTimeout) : 
currentTimeout;
+
+        return (int) Math.min((jitteredTimeout * backoffCoefficient), 
maxTimeout);
+    }
+
+    /**
+     * Applies random jitter to the given timeout value.
+     *
+     * <p>The result is uniformly distributed within {@code [raw / 2, raw * 
1.5]},
+     * then capped at {@link #maxTimeout} to ensure the strategy ceiling is 
never exceeded.
+     *
+     * @param raw computed timeout before jitter, in milliseconds.
+     * @return jittered timeout in milliseconds, capped at {@link #maxTimeout}.
+     */
+    private int applyJitter(int raw) {
+        int lo = raw / 2;
+        int hi = raw + lo;
+
+        return lo + ThreadLocalRandom.current().nextInt(hi - lo + 1);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java
new file mode 100644
index 00000000000..525523d03c6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static 
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_TIMEOUT_MS_MAX;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A retry context that tracks timeout state independently per key.
+ *
+ * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff 
progression
+ * for different retry targets — for example, different replication group IDs 
or transaction IDs.
+ * State updates are performed atomically per key using {@link 
ConcurrentHashMap#compute}.
+ *
+ * <p>To prevent unbounded memory growth, the registry is capped at {@link 
#REGISTRY_SIZE_LIMIT}
+ * entries. Once the limit is reached, untracked keys receive a fixed {@link 
#fallbackTimeoutState}
+ * that always returns {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}. 
The limit is a soft cap and may be
+ * slightly exceeded under concurrent insertions.
+ *
+ * <p>This class is thread-safe.
+ */
+public class KeyBasedRetryContext implements RetryContext {
+    /**
+     * Maximum number of keys tracked in {@link #registry}.
+     * Once the limit is reached, untracked keys receive a fixed {@link 
#fallbackTimeoutState}.
+     * Can be slightly exceeded under concurrent insertions.
+     */
+    private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+    /** Strategy used to compute the next timeout from the current one on each 
advancement. */
+    private final TimeoutStrategy timeoutStrategy;
+
+    /**
+     * Sentinel state returned for keys that cannot be tracked because the 
registry is full.
+     * Initialized with {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX} 
and attempt {@code -1}
+     * to distinguish it from legitimately tracked states.
+     */
+    private final TimeoutState fallbackTimeoutState;
+
+    /** Per-key timeout state registry. Keys are typically transaction IDs or 
replication group IDs. */
+    private final ConcurrentHashMap<String, TimeoutState> registry = new 
ConcurrentHashMap<>();
+
+    /**
+     * Creates a new context with the given initial timeout and strategy.
+     *
+     * @param timeoutStrategy strategy used to compute subsequent timeout 
values.
+     */
+    public KeyBasedRetryContext(TimeoutStrategy timeoutStrategy) {
+        this.timeoutStrategy = timeoutStrategy;
+
+        this.fallbackTimeoutState = new 
TimeoutState(DEFAULT_RETRY_TIMEOUT_MS_MAX, -1);
+    }
+
+    /**
+     * Returns the current {@link TimeoutState} for the given key, if tracked.
+     *
+     * <p>Returns an empty {@link Optional} if the key has no recorded state 
yet.
+     * If the registry is full and the key is not yet tracked, returns an 
{@link Optional}
+     * containing a fallback state initialized to {@link 
TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
+     *
+     * <p>This method does not insert the key into the registry.
+     *
+     * @param key the key to look up, typically a transaction ID or 
replication group ID.
+     * @return current state for the key, fallback state if registry is full, 
or empty if not tracked.
+     */
+    @Override
+    public Optional<TimeoutState> getState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return of(fallbackTimeoutState);
+        }
+
+        return ofNullable(registry.get(key));
+    }
+
+    /**
+     * Atomically advances the retry state for the given key and returns the 
updated state.
+     *
+     * <p>The update is performed inside {@link ConcurrentHashMap#compute}, 
which holds
+     * an exclusive per-key lock for the duration of the lambda, ensuring that
+     * {@link TimeoutState#update(TimeoutStrategy)} is never called 
concurrently on the same instance.
+     *
+     * <p>When the registry is full, untracked keys receive the maximum 
timeout.
+     * This acts as implicit backpressure: if enough keys are actively 
retrying to fill
+     * the registry, the system is under a heavy load and new operations 
should retry conservatively.
+     *
+     * @param key the key to advance state for, typically a transaction ID or 
replication group ID.
+     * @return updated {@link TimeoutState} for the key, or {@link 
#fallbackTimeoutState}
+     *         if the registry is full.
+     */
+    @Override
+    public TimeoutState updateAndGetState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return fallbackTimeoutState;
+        }
+
+        return registry.compute(key, (k, state) -> {
+            if (state == null) {
+                state = new TimeoutState();
+            }
+
+            state.update(timeoutStrategy);
+
+            return state;
+        });
+    }
+
+    /**
+     * Removes the retry state for the given key, resetting it as if no 
retries had occurred.
+     *
+     * @param key the key whose state should be removed.
+     */
+    @Override
+    public void resetState(String key) {
+        registry.remove(key);
+    }
+
+    /**
+     * Returns an unmodifiable snapshot of the current registry contents.
+     *
+     * <p>The snapshot is a point-in-time copy of the registry map. The 
returned
+     * {@link TimeoutState} values are live references — their internal state 
may
+     * continue to change concurrently after the snapshot is taken.
+     *
+     * <p>This method is intended for testing only and should not be used in 
production code.
+     *
+     * @return unmodifiable copy of the current key-to-state mappings.
+     */
+    @TestOnly
+    public Map<String, TimeoutState> snapshot() {
+        return unmodifiableMap(new HashMap<>(registry));
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java
new file mode 100644
index 00000000000..d77a83d9edc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+/**
+ * A {@link TimeoutStrategy} that returns the current timeout unchanged on 
every call.
+ *
+ * <p>Useful when retry backoff is not desired — for example, in tests or when 
a flat
+ * retry interval is intentional. The timeout passed to {@link #next(int)} is 
returned
+ * as-is, so the retry interval remains constant across all attempts.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+public class NoopTimeoutStrategy implements TimeoutStrategy {
+    /**
+     * Returns {@code currentTimeout} unchanged.
+     *
+     * @param currentTimeout current retry timeout in milliseconds.
+     * @return the same {@code currentTimeout} value, unmodified.
+     */
+    @Override
+    public int next(int currentTimeout) {
+        return currentTimeout;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryContext.java
new file mode 100644
index 00000000000..ce09996fd7d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.Optional;
+
+/**
+ * Manages per-key retry state for tracking timeout progression across retry 
attempts.
+ *
+ * <p>Each key (typically a transaction ID or replication group ID) maps to an 
independent
+ * {@link TimeoutState} that records the current retry timeout and attempt 
count. The context
+ * is responsible for creating, advancing, and removing that state.
+ *
+ * <p>Implementations must be thread-safe.
+ *
+ * @see KeyBasedRetryContext
+ * @see TimeoutState
+ */
+public interface RetryContext {
+
+    /**
+     * Returns the current {@link TimeoutState} for the given key, if one 
exists.
+     *
+     * <p>Returns an empty {@link Optional} if the key has not yet been 
tracked.
+     * Implementations may also return a fallback state when internal capacity 
limits are
+     * reached, in which case the returned state reflects the maximum 
permissible timeout.
+     *
+     * <p>This method must not modify the registry — it is a read-only lookup.
+     *
+     * @param key the key to look up, typically a transaction ID or 
replication group ID.
+     * @return current {@link TimeoutState} for the key, or empty if not yet 
tracked.
+     */
+    Optional<TimeoutState> getState(String key);
+
+    /**
+     * Atomically advances the retry state for the given key and returns the 
updated state.
+     *
+     * <p>If no state exists for the key yet, a fresh {@link TimeoutState} is 
created.
+     * Otherwise, the existing state is advanced using the configured
+     * {@link TimeoutStrategy} and the attempt count is incremented.
+     *
+     * <p>When internal capacity limits prevent tracking new keys, a fallback 
state with the
+     * maximum timeout is returned instead.
+     *
+     * @param key the key to advance state for, typically a transaction ID or 
replication group ID.
+     * @return updated {@link TimeoutState} for the key, or a fallback state 
if capacity is exhausted.
+     */
+    TimeoutState updateAndGetState(String key);
+
+    /**
+     * Removes the retry state for the given key, resetting it as if no 
retries had occurred.
+     *
+     * <p>This allows future calls to {@link #updateAndGetState(String)} for 
the same key to
+     * start fresh with the initial timeout.
+     *
+     * @param key the key whose state should be removed.
+     */
+    void resetState(String key);
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java
new file mode 100644
index 00000000000..80ce941f172
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for scheduling asynchronous retry operations.
+ *
+ * <p>Provides overloaded {@code scheduleRetry} methods that schedule an async 
operation
+ * to run after a delay, optionally invoking callbacks on success or failure. 
The returned
+ * {@link CompletableFuture} is completed when the scheduled operation 
completes.
+ *
+ * <p>This class is not intended to manage the retry loop itself — it 
schedules a single
+ * delayed execution. The caller is responsible for chaining successive calls 
to build a
+ * full retry loop, typically driven by a retry context and a timeout strategy.
+ */
+public class RetryUtil {
+    /**
+     * Schedules the provided operation to run once after the specified delay.
+     *
+     * @param <T>       result type of the operation.
+     * @param operation the async operation to schedule. Must return a 
non-null {@link CompletableFuture}.
+     * @param delay     delay before execution.
+     * @param unit      time unit of {@code delay}.
+     * @param executor  executor used to schedule the operation.
+     * @return a {@link CompletableFuture} completed with the operation's 
result on success,
+     *     or completed exceptionally if the operation fails.
+     */
+    public static <T> CompletableFuture<T> scheduleRetry(
+            Callable<CompletableFuture<T>> operation,
+            long delay,
+            TimeUnit unit,
+            ScheduledExecutorService executor
+    ) {
+        return scheduleRetry(operation, delay, unit, executor, null);
+    }
+
+    /**
+     * Schedules the provided operation to run once after the specified delay,
+     * invoking separate callbacks on success and failure.
+     *
+     * <p>The provided completion callback is invoked regardless of whether the
+     * operation succeeds or fails.
+     *
+     * @param <T>        result type of the operation.
+     * @param operation  the async operation to schedule. Must return a 
non-null
+     *                   {@link CompletableFuture}.
+     * @param delay      delay before execution.
+     * @param unit       time unit of {@code delay}.
+     * @param executor   executor used to schedule the operation.
+     * @param onComplete optional callback invoked after the operation 
completes, whether
+     *                   successfully or exceptionally — for example, to reset 
a retry context.
+     * @return a {@link CompletableFuture} completed with the operation's 
result on success,
+     *     or completed exceptionally if the operation fails.
+     */
+    public static <T> CompletableFuture<T> scheduleRetry(
+            Callable<CompletableFuture<T>> operation,
+            long delay,
+            TimeUnit unit,
+            ScheduledExecutorService executor,
+            @Nullable Runnable onComplete
+    ) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+
+        executor.schedule(() -> {
+            try {
+                operation.call()
+                        .whenComplete((res, e) -> {
+                            if (e == null) {
+                                future.complete(res);
+                            } else {
+                                future.completeExceptionally(e);
+                            }
+
+                            if (onComplete != null) {
+                                onComplete.run();
+                            }
+                        });
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+
+                if (onComplete != null) {
+                    onComplete.run();
+                }
+            }
+        }, delay, unit);
+
+        return future;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java
new file mode 100644
index 00000000000..519e8a31f96
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static 
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Mutable holder for retry timeout and attempt count.
+ *
+ * <p>Both fields are packed into a single {@link AtomicLong} to allow a 
consistent
+ * atomic read of the combined state via a single {@code get()}. The high 32 
bits store
+ * the timeout in milliseconds; the low 32 bits store the attempt count.
+ *
+ * <p>This class intentionally does not override {@link Object#equals(Object)} 
or
+ * {@link Object#hashCode()}. Because the state is mutable, value-based 
equality
+ * would break the contracts required by {@link java.util.HashMap} and similar
+ * collections. Reference equality (the {@link Object} default) is correct 
here.
+ *
+ * <p>The static helper methods {@link #timeout(long)} and {@link 
#attempt(long)}
+ * are package-private to allow callers that hold a raw snapshot to extract 
fields
+ * without additional reads.
+ */
+public class TimeoutState {
+    /**
+     * Packed representation of timeout and attempt count.
+     * High 32 bits: timeout (ms). Low 32 bits: attempt count.
+     */
+    private final AtomicLong state = new AtomicLong();
+
+    /**
+     * Creates a new {@code TimeoutState} with the default initial timeout and 
attempt count of {@code 0}.
+     *
+     * <p>Attempt count {@code 0} acts as a sentinel indicating the state has 
been initialized
+     * but not yet advanced. The first call to {@link 
#update(TimeoutStrategy)} will set the
+     * timeout to {@link TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and 
increment the count to {@code 1}.
+     */
+    public TimeoutState() {
+        this(DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 0);
+    }
+
+    /**
+     * Creates a new {@code TimeoutState} with the given initial timeout and 
attempt count.
+     *
+     * @param timeout initial timeout in milliseconds.
+     * @param attempt attempt count. Use {@code 0} as a sentinel to indicate
+     *                "initialized but not yet advanced" when lazy 
initialization is needed.
+     */
+    public TimeoutState(int timeout, int attempt) {
+        state.set(pack(timeout, attempt));
+    }
+
+    /**
+     * Returns the current retry timeout in milliseconds.
+     *
+     * <p>This is a single atomic read.
+     *
+     * @return current timeout in milliseconds.
+     */
+    public int getTimeout() {
+        return timeout(state.get());
+    }
+
+    /**
+     * Returns the current attempt count.
+     *
+     * <p>This is a single atomic read.
+     *
+     * @return current attempt count.
+     */
+    public int getAttempt() {
+        return attempt(state.get());
+    }
+
+    /**
+     * Advances the retry state using the given strategy.
+     *
+     * <p>If the current attempt count is {@code 0} (the initial sentinel), 
the timeout is reset
+     * to {@link TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and the 
attempt count is set to {@code 1}.
+     * On subsequent calls, the timeout is computed by {@link 
TimeoutStrategy#next(int)} and the
+     * attempt count is incremented.
+     *
+     * <p>This method is package-private because callers are responsible for 
external synchronization.
+     * The only intended call site is inside {@link 
java.util.concurrent.ConcurrentHashMap#compute} in
+     * {@link KeyBasedRetryContext#updateAndGetState}, which holds an 
exclusive per-key lock for the
+     * duration of the lambda, so no concurrent access to the same instance is 
possible.
+     *
+     * @param timeoutStrategy strategy used to compute the next timeout value.
+     */
+    void update(TimeoutStrategy timeoutStrategy) {
+        long raw = state.get();
+
+        int nextTimeout = attempt(raw) == 0
+                ? DEFAULT_RETRY_INITIAL_TIMEOUT_MS
+                : timeoutStrategy.next(timeout(raw));
+
+        state.set(pack(nextTimeout, attempt(raw) + 1));
+    }
+
+    /**
+     * Packs timeout and attempt count into a single {@code long}.
+     * Timeout occupies the high 32 bits; attempt occupies the low 32 bits.
+     *
+     * @param timeout timeout in milliseconds.
+     * @param attempt attempt count.
+     * @return packed {@code long} value.
+     */
+    static long pack(int timeout, int attempt) {
+        return ((long) timeout << 32) | (attempt & 0xFFFFFFFFL);
+    }
+
+    /**
+     * Extracts the timeout from a packed raw state value.
+     *
+     * @param packed raw state value produced by {@link #pack(int, int)} or 
read directly
+     *               from the underlying {@link AtomicLong}.
+     * @return timeout in milliseconds.
+     */
+    static int timeout(long packed) {
+        return (int) (packed >>> 32);
+    }
+
+    /**
+     * Extracts the attempt count from a packed raw state value.
+     *
+     * @param packed raw state value produced by {@link #pack(int, int)} or 
read directly
+     *               from the underlying {@link AtomicLong}.
+     * @return attempt count.
+     */
+    static int attempt(long packed) {
+        return (int) packed;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java
new file mode 100644
index 00000000000..f213c2493f8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+/**
+ * Stateless strategy for computing the next retry timeout based on the 
current one.
+ *
+ * <p>Implementations must be stateless and thread-safe — the same instance is 
expected
+ * to be shared across multiple callers and retry contexts. All mutable state 
(current
+ * timeout, attempt count) is maintained externally by the caller or a retry 
context.
+ *
+ * @see ExponentialBackoffTimeoutStrategy
+ * @see NoopTimeoutStrategy
+ */
+public interface TimeoutStrategy {
+    /** Default maximum timeout that a strategy may produce, in milliseconds. 
*/
+    int DEFAULT_RETRY_TIMEOUT_MS_MAX = 11_000;
+
+    /** Initial timeout used at the start of a retry sequence, before any 
backoff is applied, in milliseconds. */
+    int DEFAULT_RETRY_INITIAL_TIMEOUT_MS = 20;
+
+    /**
+     * Computes the next retry timeout based on the current one.
+     *
+     * <p>Implementations must not produce a value exceeding {@link 
#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
+     * The returned value is used directly as the delay before the next retry 
attempt.
+     *
+     * @param currentTimeout current retry timeout in milliseconds.
+     * @return next retry timeout in milliseconds, capped at {@link 
#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
+     */
+    int next(int currentTimeout);
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java
new file mode 100644
index 00000000000..c5498b39029
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link ExponentialBackoffTimeoutStrategy}.
+ *
+ * <p>Verifies the correctness of exponential timeout progression, the maximum 
timeout
+ * ceiling, and optional jitter behavior. Tests use predictable integer 
arithmetic to
+ * make expected values easy to verify by hand.
+ */
+public class ExponentialBackoffTimeoutStrategyTest {
+    /** Initial timeout passed to {@link TimeoutStrategy#next(int)} as the 
starting value. */
+    private static final int INITIAL_TIMEOUT = 20;
+
+    /** Maximum timeout the strategy is allowed to produce. */
+    private static final int MAX_TIMEOUT = 100;
+
+    /** Backoff coefficient used by the default strategy instance. Doubles 
each timeout step. */
+    private static final double BACKOFF_COEFFICIENT = 2.0;
+
+    /** Strategy instance under test, recreated before each test. */
+    private TimeoutStrategy timeoutStrategy;
+
+    /**
+     * Creates a fresh {@link ExponentialBackoffTimeoutStrategy} with {@link 
#MAX_TIMEOUT}
+     * and {@link #BACKOFF_COEFFICIENT}, without jitter, before each test.
+     */
+    @BeforeEach
+    void setUp() {
+        timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, 
BACKOFF_COEFFICIENT);
+    }
+
+    /**
+     * Verifies that a single call to {@link TimeoutStrategy#next(int)} returns
+     * {@code currentTimeout * backoffCoefficient}.
+     *
+     * <p>This is the core contract of the exponential strategy — each step 
multiplies
+     * the current timeout by the configured coefficient.
+     */
+    @Test
+    void testGettingNextTimeout() {
+        assertEquals(BACKOFF_COEFFICIENT * INITIAL_TIMEOUT, 
timeoutStrategy.next(INITIAL_TIMEOUT));
+    }
+
+    /**
+     * Verifies that the timeout progression reaches {@link #MAX_TIMEOUT} 
within the
+     * expected number of steps and does not exceed it on subsequent calls.
+     *
+     * <p>The upper bound on steps is computed from the coefficient and the 
ratio of
+     * {@code MAX_TIMEOUT} to {@code INITIAL_TIMEOUT}. If the strategy fails 
to reach
+     * the cap within this bound, the test fails with a descriptive message. 
Once the
+     * cap is reached, a further call to {@link TimeoutStrategy#next(int)} 
must return
+     * exactly {@link #MAX_TIMEOUT}.
+     */
+    @Test
+    void testMaxTimeoutNotExceeded() {
+        int maxSteps = 3;
+        int steps = 0;
+
+        int timeout = INITIAL_TIMEOUT;
+        do {
+            timeout  = timeoutStrategy.next(timeout);
+
+            assertTrue(++steps <= maxSteps,
+                    "Strategy did not reach MAX_TIMEOUT within expected steps, 
last timeout: " + timeout);
+        } while (timeout < MAX_TIMEOUT);
+
+        assertEquals(MAX_TIMEOUT, timeout);
+        assertEquals(MAX_TIMEOUT, timeoutStrategy.next(timeout));
+    }
+
+    /**
+     * Verifies that when jitter is enabled, the returned timeout falls within 
the
+     * expected randomized range {@code [raw / 2, raw * 1.5]}, capped at 
{@link #MAX_TIMEOUT}.
+     *
+     * <p>A separate strategy instance with jitter enabled is created for this 
test.
+     */
+    @Test
+    void testJitterApplying() {
+        timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, 
BACKOFF_COEFFICIENT, true);
+
+        for (int i = 0; i < 100; i++) {
+            int timeout = timeoutStrategy.next(INITIAL_TIMEOUT);
+            int raw = (int) (INITIAL_TIMEOUT * BACKOFF_COEFFICIENT);
+            int lo = raw / 2;
+            int hi = raw + lo;
+
+            assertTrue(
+                    lo <= timeout && timeout <= Math.min(hi, MAX_TIMEOUT),
+                    "Timeout is out of range: " + timeout + " (expected [" + 
lo + ", " + Math.min(hi, MAX_TIMEOUT) + "])"
+            );
+        }
+    }
+
+    /**
+     * Verifies that when jitter is enabled, the returned timeout does not 
exceed
+     * the maximum timeout set for the strategy.
+     */
+    @Test
+    void testJitterDoesntCauseMaxTimeoutExceeded() {
+        timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, 
BACKOFF_COEFFICIENT, true);
+
+        int timeout = MAX_TIMEOUT;
+        timeout = timeoutStrategy.next(timeout);
+        assertEquals(MAX_TIMEOUT, timeout);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java
new file mode 100644
index 00000000000..6cd8dc1fda1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS;
+import static 
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_TIMEOUT_MS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Unit tests for {@link KeyBasedRetryContext}.
+ *
+ * <p>Verifies per-key state isolation, sequential timeout progression, reset 
behavior,
+ * registry size limit enforcement, fallback state for overflow keys, and 
thread safety
+ * under concurrent updates to both the same and different keys.
+ *
+ * <p>A deterministic {@link TestProgressiveTimeoutStrategy} with a fixed 
multiplier is
+ * used to make expected timeout values easy to compute by hand.
+ */
+public class KeyBasedRetryContextTest {
+    /** Message included in exceptions thrown when an expected state is 
absent. */
+    private static final String MISSING_STATE_MESSAGE = "TimeoutState is 
missing!";
+
+    /**
+     * Multiplier applied by {@link TestProgressiveTimeoutStrategy} on each 
step.
+     * Used to compute expected timeout values in assertions.
+     */
+    private static final int MULTIPLYING_COEFFICIENT = 4;
+
+    /** Initial timeout passed to the {@link KeyBasedRetryContext} under test. 
*/
+    private static final int INITIAL_TIMEOUT = 20;
+
+    /**
+     * Maximum timeout configured in {@link TestProgressiveTimeoutStrategy}.
+     * The progression is capped at this value.
+     */
+    private static final int MAX_TIMEOUT = 1_000;
+
+    /**
+     * Registry size limit mirrored from {@link KeyBasedRetryContext}.
+     * Used in tests that fill the registry to verify fallback behavior.
+     */
+    private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+    /** Primary key used in single-key tests. */
+    private static final String KEY = "key";
+
+    /** Secondary key used in isolation and reset tests. */
+    private static final String OTHER_KEY = "other-key";
+
+    /** Retry context under test, recreated before each test. */
+    private KeyBasedRetryContext retryContext;
+
+    /**
+     * Creates a fresh {@link KeyBasedRetryContext} with
+     * a {@link TestProgressiveTimeoutStrategy} before each test.
+     */
+    @BeforeEach
+    void setUp() {
+        retryContext = new KeyBasedRetryContext(new 
TestProgressiveTimeoutStrategy());
+    }
+
+    /**
+     * Verifies that {@link KeyBasedRetryContext#getState(String)} returns an 
empty
+     * {@link java.util.Optional} for an untracked key, and that after the 
first call
+     * to {@link KeyBasedRetryContext#updateAndGetState(String)}, the state is 
present
+     * with {@link TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and 
attempt count {@code 1}.
+     */
+    @Test
+    void testGettingState() {
+        assertFalse(retryContext.getState(KEY).isPresent());
+
+        retryContext.updateAndGetState(KEY);
+
+        assertTrue(retryContext.getState(KEY).isPresent());
+
+        TimeoutState state = retryContext.getState(KEY).get();
+
+        assertEquals(DEFAULT_RETRY_INITIAL_TIMEOUT_MS, state.getTimeout());
+        assertEquals(1, state.getAttempt());
+    }
+
+    /**
+     * Verifies that {@link KeyBasedRetryContext#updateAndGetState(String)} 
returns the
+     * same object reference as {@link KeyBasedRetryContext#getState(String)}, 
and that
+     * the timeout advances correctly after multiple calls for the same key.
+     *
+     * <p>After three updates, the expected timeout is
+     * {@code DEFAULT_RETRY_INITIAL_TIMEOUT_MS * MULTIPLYING_COEFFICIENT^2} 
with attempt count {@code 3}.
+     */
+    @Test
+    void testUpdatingAndGettingState() {
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+        TimeoutState returnedState = retryContext.updateAndGetState(KEY);
+        TimeoutState observedState = retryContext.getState(KEY)
+                .orElseThrow(() -> new 
IllegalStateException(MISSING_STATE_MESSAGE));
+
+        assertSame(returnedState, observedState);
+
+        checkRetryContextState(KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS * 
MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3);
+    }
+
+    /**
+     * Verifies that each key maintains its own independent backoff 
progression.
+     *
+     * <p>Advances {@link #KEY} three times and {@link #OTHER_KEY} once, then 
asserts
+     * that each key holds the timeout and attempt count corresponding only to 
its own
+     * update history, with no cross-key interference.
+     */
+    @Test
+    void testStatesAreIsolatedPerKey() {
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+
+        retryContext.updateAndGetState(OTHER_KEY);
+
+        checkRetryContextState(KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS * 
MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3);
+        checkRetryContextState(OTHER_KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 1);
+    }
+
+    /**
+     * Verifies that {@link KeyBasedRetryContext#resetState(String)} removes 
the state
+     * for the given key, causing {@link 
KeyBasedRetryContext#getState(String)} to return
+     * an empty {@link java.util.Optional} after the reset.
+     */
+    @Test
+    void testResettingState() {
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+
+        checkRetryContextState(KEY, INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT 
* MULTIPLYING_COEFFICIENT, 3);
+
+        retryContext.resetState(KEY);
+
+        assertFalse(retryContext.getState(KEY).isPresent());
+    }
+
+    /**
+     * Verifies that resetting one key does not affect the state of other keys.
+     *
+     * <p>Advances {@link #KEY} twice and {@link #OTHER_KEY} once, resets 
{@link #KEY},
+     * then asserts that {@link #KEY} is absent while {@link #OTHER_KEY} 
retains its state.
+     */
+    @Test
+    void testResettingStateDoesNotAffectOtherKeys() {
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(OTHER_KEY);
+
+        retryContext.resetState(KEY);
+
+        assertFalse(retryContext.getState(KEY).isPresent());
+        checkRetryContextState(OTHER_KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 1);
+    }
+
+    /**
+     * Verifies that when the registry is at capacity, a new key passed to
+     * {@link KeyBasedRetryContext#updateAndGetState(String)} receives the 
fallback
+     * {@link TimeoutState} with {@link 
TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX} and attempt {@code -1}.
+     */
+    @Test
+    void testFallbackWhenRegistryIsFull() {
+        // Fill registry to the limit
+        IntStream.range(0, REGISTRY_SIZE_LIMIT)
+                .mapToObj(i -> "key-" + i)
+                .forEach(retryContext::updateAndGetState);
+
+        // New key should get fallback
+        TimeoutState state = 
retryContext.updateAndGetState("new-key-beyond-limit");
+
+        assertEquals(DEFAULT_RETRY_TIMEOUT_MS_MAX, state.getTimeout());
+        assertEquals(-1, state.getAttempt());
+    }
+
+    /**
+     * Verifies that a key already tracked in the registry continues to 
progress normally
+     * even when the registry has reached its size limit.
+     *
+     * <p>This guards against the regression where the overflow check 
incorrectly
+     * blocked updates for existing keys rather than only for new ones.
+     */
+    @Test
+    void testExistingKeyStillProgressesWhenRegistryIsFull() {
+        retryContext.updateAndGetState(KEY);
+
+        // Fill registry to the limit with other keys
+        IntStream.range(0, REGISTRY_SIZE_LIMIT - 1)
+                .mapToObj(i -> "key-" + i)
+                .forEach(retryContext::updateAndGetState);
+
+        // Existing key should still progress normally
+        retryContext.updateAndGetState(KEY);
+        retryContext.updateAndGetState(KEY);
+
+        checkRetryContextState(KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS * 
MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3);
+    }
+
+    /**
+     * Verifies that {@link KeyBasedRetryContext#getState(String)} returns the 
fallback
+     * {@link TimeoutState} (with {@link 
TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX} and attempt {@code -1}) for a key
+     * that is not tracked when the registry is full.
+     */
+    @Test
+    void testGetStateReturnsFallbackWhenRegistryIsFull() {
+        IntStream.range(0, REGISTRY_SIZE_LIMIT)
+                .mapToObj(i -> "key-" + i)
+                .forEach(retryContext::updateAndGetState);
+
+        Optional<TimeoutState> state = 
retryContext.getState("new-key-beyond-limit");
+
+        assertTrue(state.isPresent());
+        assertEquals(DEFAULT_RETRY_TIMEOUT_MS_MAX, state.get().getTimeout());
+        assertEquals(-1, state.get().getAttempt());
+    }
+
+    /**
+     * Verifies that concurrent calls to {@link 
KeyBasedRetryContext#updateAndGetState(String)}
+     * for the same key from multiple threads all succeed, and that the final 
state reflects
+     * exactly {@code attemptsNumber} advancements.
+     *
+     * <p>This exercises the per-key locking in {@link 
java.util.concurrent.ConcurrentHashMap#compute}
+     * and the CAS loop inside {@link TimeoutState}.
+     *
+     * @throws Exception if the thread pool is interrupted during shutdown.
+     */
+    @Test
+    @Timeout(value = 5, unit = SECONDS)
+    void testConcurrentStateUpdatingSameKey() throws Exception {
+        ExecutorService threadPool = Executors.newFixedThreadPool(5);
+
+        int attemptsNumber = 20;
+
+        List<Future<TimeoutState>> futures = IntStream.range(0, attemptsNumber)
+                .mapToObj(i -> threadPool.submit(() -> 
retryContext.updateAndGetState(KEY)))
+                .collect(toList());
+
+        try {
+            futures.forEach(fut -> {
+                try {
+                    fut.get();
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
+                }
+            });
+
+            checkRetryContextState(KEY, MAX_TIMEOUT, attemptsNumber);
+        } finally {
+            threadPool.shutdown();
+            threadPool.awaitTermination(5, SECONDS);
+        }
+    }
+
+    /**
+     * Verifies that concurrent calls to {@link 
KeyBasedRetryContext#updateAndGetState(String)}
+     * for different keys do not interfere with each other.
+     *
+     * <p>Each key is updated exactly once from a dedicated task. After all 
tasks complete,
+     * each key must hold {@link 
TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and attempt count {@code 1}, 
confirming
+     * no cross-key state contamination under concurrency.
+     *
+     * @throws Exception if the thread pool is interrupted during shutdown.
+     */
+    @Test
+    @Timeout(value = 5, unit = SECONDS)
+    void testConcurrentStateUpdatingDifferentKeys() throws Exception {
+        ExecutorService threadPool = Executors.newFixedThreadPool(5);
+
+        List<String> keys = List.of("key-1", "key-2", "key-3", "key-4", 
"key-5");
+
+        List<Future<TimeoutState>> futures = keys.stream()
+                .map(key -> threadPool.submit(() -> 
retryContext.updateAndGetState(key)))
+                .collect(toList());
+
+        try {
+            futures.forEach(KeyBasedRetryContextTest::getQuietly);
+
+            keys.forEach(key -> checkRetryContextState(key, 
DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 1));
+        } finally {
+            threadPool.shutdown();
+            threadPool.awaitTermination(5, SECONDS);
+        }
+    }
+
+    /**
+     * Asserts that the retry context holds the expected timeout and attempt 
count for
+     * the given key.
+     *
+     * <p>Fails with {@link #MISSING_STATE_MESSAGE} if the state is absent.
+     *
+     * @param key              the key whose state to check.
+     * @param expectedTimeout  expected current timeout in milliseconds.
+     * @param expectedAttempts expected current attempt count.
+     */
+    private void checkRetryContextState(String key, int expectedTimeout, int 
expectedAttempts) {
+        retryContext.getState(key).ifPresentOrElse(state -> {
+            assertEquals(expectedTimeout, state.getTimeout(), "[DEBUG_LOG] 
Timeout mismatch for key: " + key);
+            assertEquals(expectedAttempts, state.getAttempt(), "[DEBUG_LOG] 
Attempt mismatch for key: " + key);
+        }, () -> {
+            throw new IllegalStateException(MISSING_STATE_MESSAGE);
+        });
+    }
+
+    /**
+     * A deterministic {@link TimeoutStrategy} that multiplies the current 
timeout by
+     * {@link #MULTIPLYING_COEFFICIENT} on each step, capped at {@link 
#MAX_TIMEOUT}.
+     *
+     * <p>Using integer multiplication rather than a floating-point 
coefficient avoids
+     * rounding ambiguity, making expected values in test assertions exact and 
easy to
+     * compute by hand.
+     */
+    private static class TestProgressiveTimeoutStrategy implements 
TimeoutStrategy {
+        /**
+         * {@inheritDoc}
+         *
+         * <p>Multiplies {@code currentTimeout} by {@link 
#MULTIPLYING_COEFFICIENT},
+         * capped at {@link #MAX_TIMEOUT}.
+         */
+        @Override
+        public int next(int currentTimeout) {
+            return Math.min(currentTimeout * MULTIPLYING_COEFFICIENT, 
MAX_TIMEOUT);
+        }
+    }
+
+    /**
+     * Waits for the given {@link Future} to complete and returns its result.
+     *
+     * <p>Wraps checked exceptions as {@link AssertionError} so they propagate 
cleanly
+     * through {@link java.util.function.Consumer} lambdas in test code 
without requiring
+     * explicit try-catch blocks.
+     *
+     * <ul>
+     *     <li>{@link ExecutionException} — wraps the cause as an {@link 
AssertionError},
+     *         preserving the original exception for diagnosis.</li>
+     *     <li>{@link InterruptedException} — restores the interrupt flag and 
wraps
+     *         as an {@link AssertionError}.</li>
+     * </ul>
+     *
+     * @param <T>    the future's result type.
+     * @param future the future to wait for.
+     * @return the future's result.
+     * @throws AssertionError if the future completed exceptionally or the 
thread was interrupted.
+     */
+    private static <T> T getQuietly(Future<T> future) {
+        try {
+            return future.get();
+        } catch (ExecutionException e) {
+            throw new AssertionError("Future completed exceptionally", 
e.getCause());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AssertionError("Interrupted while waiting for future", 
e);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java
new file mode 100644
index 00000000000..5672d0c88ac
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link TimeoutState}.
+ *
+ * <p>Verifies the correctness of initial state construction, atomic CAS 
updates,
+ * stale-snapshot rejection, and the consistency between the raw packed {@code 
long}
+ * and the individual accessor methods.
+ */
+public class TimeoutStateTest {
+    /** Timeout value used to construct the shared {@link TimeoutState} 
instance. */
+    private static final int TIMEOUT = 20;
+
+    /** Attempt count used to construct the shared {@link TimeoutState} 
instance. */
+    private static final int ATTEMPT = 10;
+
+    /** Shared state instance recreated before each test. */
+    private TimeoutState state;
+
+    /**
+     * Creates a fresh {@link TimeoutState} with {@link #TIMEOUT} and {@link 
#ATTEMPT}
+     * before each test to ensure full isolation.
+     */
+    @BeforeEach
+    void setUp() {
+        state = new TimeoutState(TIMEOUT, ATTEMPT);
+    }
+
+    /**
+     * Verifies that newly constructed {@link TimeoutState} returns the 
default initial timeout
+     * and attempt count of {@code 0}.
+     */
+    @Test
+    void testDefaultInitialState() {
+        TimeoutState defaultState = new TimeoutState();
+        assertEquals(TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 
defaultState.getTimeout());
+        assertEquals(0, defaultState.getAttempt());
+    }
+
+    /**
+     * Verifies that a newly constructed {@link TimeoutState} returns the 
timeout and
+     * attempt values it was initialized with.
+     */
+    @Test
+    void testInitialState() {
+        assertEquals(TIMEOUT, state.getTimeout());
+        assertEquals(ATTEMPT, state.getAttempt());
+    }
+
+    /**
+     * Verifies that {@link TimeoutState#update(TimeoutStrategy)} correctly 
advances
+     * both timeout and attempt count.
+     */
+    @Test
+    void testUpdate() {
+        int nextTimeout = 100;
+        TimeoutStrategy strategy = current -> nextTimeout;
+
+        state.update(strategy);
+
+        assertEquals(nextTimeout, state.getTimeout());
+        assertEquals(ATTEMPT + 1, state.getAttempt());
+    }
+
+    /**
+     * Verifies that {@link TimeoutState#update(TimeoutStrategy)} starting 
from attempt {@code 0}
+     * resets the timeout to {@link 
TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS}
+     * and sets attempt count to {@code 1}.
+     */
+    @Test
+    void testUpdateFromZeroAttempt() {
+        TimeoutState zeroState = new TimeoutState(1000, 0);
+        TimeoutStrategy strategy = current -> 2000; // Should be ignored
+
+        zeroState.update(strategy);
+
+        assertEquals(TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 
zeroState.getTimeout());
+        assertEquals(1, zeroState.getAttempt());
+    }
+
+    /**
+     * Verifies that {@link TimeoutState#getTimeout()} and {@link 
TimeoutState#getAttempt()}
+     * are consistent with the raw packed value.
+     */
+    @Test
+    void testGetTimeoutAndGetAttemptAreConsistentWithPacked() {
+        long packed = TimeoutState.pack(TIMEOUT, ATTEMPT);
+
+        assertEquals(TIMEOUT, timeout(packed));
+        assertEquals(ATTEMPT, attempt(packed));
+    }
+
+    /**
+     * Verifies that {@link TimeoutState#pack(int, int)} followed by
+     * {@link TimeoutState#timeout(long)} and {@link 
TimeoutState#attempt(long)}
+     * recovers the original values exactly.
+     *
+     * <p>Tests the bit-level correctness of the packing scheme independently 
of
+     * the {@link TimeoutState} object lifecycle.
+     */
+    @Test
+    void testPackUnpackRoundtrip() {
+        long packed = TimeoutState.pack(TIMEOUT, ATTEMPT);
+
+        assertEquals(TIMEOUT, timeout(packed));
+        assertEquals(ATTEMPT, attempt(packed));
+    }
+}
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 8696887c30e..cf8c5778268 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -260,6 +260,7 @@ import 
org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.network.NetworkAddress;
@@ -1476,7 +1477,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     transactionInflights,
                     lowWatermark,
                     commonScheduledExecutorService,
-                    metricManager
+                    metricManager,
+                    new NoopTimeoutStrategy()
             );
 
             replicaManager = spy(new ReplicaManager(
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 51a7a9ffeef..389bd47b822 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -197,6 +197,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -644,7 +645,8 @@ public class Node {
                 transactionInflights,
                 lowWatermark,
                 threadPoolsManager.commonScheduler(),
-                metricManager
+                metricManager,
+                new NoopTimeoutStrategy()
         );
 
         volatileLogStorageManagerCreator = new 
VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" 
+ name));
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index b35f59aaae1..a5233c7c47a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -239,6 +239,7 @@ import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource;
 import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
@@ -697,7 +698,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 lowWatermark,
                 threadPoolsManager.commonScheduler(),
                 failureProcessor,
-                metricManager
+                metricManager,
+                new NoopTimeoutStrategy()
         );
 
         ResourceVacuumManager resourceVacuumManager = new 
ResourceVacuumManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index afe9e7d4cd0..8d29f79d49a 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -302,6 +302,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import org.apache.ignite.internal.util.retry.ExponentialBackoffTimeoutStrategy;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource;
@@ -1127,7 +1128,8 @@ public class IgniteImpl implements Ignite {
                 lowWatermark,
                 threadPoolsManager.commonScheduler(),
                 failureManager,
-                metricManager
+                metricManager,
+                new ExponentialBackoffTimeoutStrategy()
         );
 
         sharedTxStateStorage = new TxStateRocksDbSharedStorage(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 5d3941cc864..2eb12260ea8 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -101,6 +101,7 @@ import 
org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.type.StructNativeType;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
@@ -197,7 +198,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     transactionInflights,
                     new TestLowWatermark(),
                     commonExecutor,
-                    new NoOpMetricManager()
+                    new NoOpMetricManager(),
+                    new NoopTimeoutStrategy()
             );
 
             assertThat(txManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index 024a2d6aee9..ad0224c6794 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
@@ -172,7 +173,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
                         transactionInflights,
                         lowWatermark,
                         commonExecutor,
-                        new NoOpMetricManager()
+                        new NoOpMetricManager(),
+                        new NoopTimeoutStrategy()
                 );
             }
         };
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index b37ea5bfb5e..a68006cc8e0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -81,6 +81,7 @@ import 
org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaReques
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
@@ -162,7 +163,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         transactionInflights,
                         lowWatermark,
                         commonExecutor,
-                        new NoOpMetricManager()
+                        new NoOpMetricManager(),
+                        new NoopTimeoutStrategy()
                 ) {
                     @Override
                     public Executor writeIntentSwitchExecutor() {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index c02dd8f7d6a..d100c358722 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -120,6 +120,7 @@ import 
org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.QualifiedName;
@@ -224,7 +225,8 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 transactionInflights,
                 new TestLowWatermark(),
                 commonExecutor,
-                new NoOpMetricManager()
+                new NoOpMetricManager(),
+                new NoopTimeoutStrategy()
         ) {
             @Override
             public CompletableFuture<Void> finish(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a19ca2b08eb..007b0f5cea4 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -197,6 +197,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.IgniteSql;
@@ -654,7 +655,8 @@ public class ItTxTestCluster {
                 lowWatermark,
                 executor,
                 new NoOpFailureManager(),
-                new TestMetricManager()
+                new TestMetricManager(),
+                new NoopTimeoutStrategy()
         );
     }
 
@@ -1341,7 +1343,8 @@ public class ItTxTestCluster {
                 lowWatermark,
                 executor,
                 new NoOpFailureManager(),
-                new TestMetricManager()
+                new TestMetricManager(),
+                new NoopTimeoutStrategy()
         );
 
         clientResourceVacuumManager = new ResourceVacuumManager(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 56852c04886..de75440356a 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -150,6 +150,7 @@ import 
org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
@@ -719,7 +720,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 transactionInflights,
                 new TestLowWatermark(),
                 COMMON_SCHEDULER,
-                new NoOpMetricManager()
+                new NoOpMetricManager(),
+                new NoopTimeoutStrategy()
         );
 
         assertThat(txManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java
new file mode 100644
index 00000000000..bec5f443802
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx.distributed;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.TimeoutState;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests verifying the retry behavior of durable transaction finish
+ * ({@link TxFinishReplicaRequest}) in a 3-node Apache Ignite cluster.
+ *
+ * <p>Each test drops or intercepts {@link TxFinishReplicaRequest} messages on 
all nodes
+ * to simulate network failures, then verifies that:
+ * <ul>
+ *     <li>the finish operation is retried as expected;</li>
+ *     <li>retry timeouts grow exponentially between attempts;</li>
+ *     <li>the retry context is cleaned up after success;</li>
+ *     <li>no unnecessary retries occur when the first attempt succeeds.</li>
+ * </ul>
+ *
+ * <p>Each test creates a single-partition, 3-replica zone and table in {@code 
@BeforeEach}.
+ * Tests that require additional zones/tables create them locally.
+ */
+public class ItDurableFinishFailureTest extends ClusterPerTestIntegrationTest {
+    /**
+     * Thread name fragment identifying the scheduler thread that sends retry 
attempts.
+     * Used to distinguish retried messages from the original commit attempt 
in message interceptors.
+     */
+    private static final String RETRY_THREAD_NAME = "common-scheduler";
+
+    /** Metric name for the total number of committed transactions, used to 
verify commit completion. */
+    private static final String TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME = 
"TotalCommits";
+
+    /** Name of the default test table created in {@code @BeforeEach}. */
+    private static final String TABLE_NAME = "test_table";
+
+    /** Number of replicas for all test zones. */
+    private static final int REPLICAS = 3;
+
+    /**
+     * Creates a single-partition, 3-replica zone and a test table before each 
test.
+     * Tests that require additional tables or zones create them locally.
+     */
+    @BeforeEach
+    public void setup() {
+        String zoneSql = "create zone test_zone (partitions 1, replicas " + 
REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String tableSql = "create table " + TABLE_NAME + " (key bigint primary 
key, val varchar(20)) zone TEST_ZONE";
+
+        sql(zoneSql);
+        sql(tableSql);
+    }
+
+    /**
+     * Verifies that no retry occurs when the durable finish succeeds on the 
first attempt.
+     *
+     * <p>Installs a message interceptor that counts {@link 
TxFinishReplicaRequest} messages
+     * without dropping any of them. After the commit completes, asserts that 
exactly one
+     * finish message was sent across all nodes.
+     */
+    @Test
+    public void testNoRetryOnSuccessfulFinish() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger finishAttempts = new AtomicInteger();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof TxFinishReplicaRequest) {
+                    finishAttempts.incrementAndGet();
+                }
+
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 
1);
+
+        await().timeout(1, SECONDS).until(() -> finishAttempts.get() >= 1);
+
+        assertEquals(1, finishAttempts.get());
+    }
+
+    /**
+     * Verifies that the durable finish is retried after a single failure.
+     *
+     * <p>Drops the first {@link TxFinishReplicaRequest} on all nodes to 
simulate a transient
+     * network failure. On the subsequent retry, captures a snapshot of the 
retry context
+     * across all nodes and asserts that exactly one node has an active retry 
entry for the
+     * transaction. After the commit completes, asserts that the retry context 
is fully cleaned up.
+     */
+    @Test
+    public void testDurableFinishRetry() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger failedFinishAttempts = new AtomicInteger();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        AtomicLong expectedSizeOfRetryContext = new AtomicLong(0);
+
+        for (IgniteImpl n : runningNodesIter()) {
+            retryContexts.add((KeyBasedRetryContext) ((TxManagerImpl) 
n.txManager()).retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof TxFinishReplicaRequest) {
+                    if (failedFinishAttempts.get() == 0) {
+                        // Makes durable finish fail with replication timeout, 
on the first attempt.
+                        return failedFinishAttempts.incrementAndGet() == 1;
+                    }
+
+                    expectedSizeOfRetryContext.compareAndSet(0, 
retryContexts.stream()
+                            .map(KeyBasedRetryContext::snapshot)
+                            .filter(retryContextSnapshot -> 
retryContextSnapshot.size() == 1)
+                            .count());
+                }
+
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, SECONDS).until(() -> failedFinishAttempts.get() == 
1);
+
+        await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 
1);
+
+        assertEquals(1, expectedSizeOfRetryContext.get());
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Verifies that retry timeouts grow monotonically between consecutive 
retry attempts.
+     *
+     * <p>Drops the first 3 {@link TxFinishReplicaRequest} messages. For each 
retry attempt
+     * arriving on the scheduler thread, captures the current timeout from the 
retry context
+     * snapshot. After all drops and the final successful commit, asserts that 
the captured
+     * timeout sequence is strictly increasing, confirming exponential backoff 
behavior.
+     *
+     * <p>Timeout samples are captured only on the retry scheduler thread
+     * (identified by {@link #RETRY_THREAD_NAME}) to ensure we observe 
post-advancement values
+     * rather than the stale pre-drop state.
+     */
+    @Test
+    public void testRetryWithGrowingTimeout() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger failedFinishAttempts = new AtomicInteger();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        List<Integer> timeoutSamples = new CopyOnWriteArrayList<>();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+            retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof TxFinishReplicaRequest) {
+                    if 
(Thread.currentThread().getName().contains(RETRY_THREAD_NAME)) {
+                        retryContexts.stream()
+                                .map(KeyBasedRetryContext::snapshot)
+                                .filter(retryContextSnapshot -> 
retryContextSnapshot.size() == 1)
+                                .flatMap(retryContextSnapshot -> 
retryContextSnapshot.values().stream())
+                                .forEach(timeoutState -> 
timeoutSamples.add(timeoutState.getTimeout()));
+                    }
+
+                    if (failedFinishAttempts.getAndIncrement() < 3) {
+                        return true;
+                    }
+
+                    return false;
+                }
+
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, SECONDS).until(() -> failedFinishAttempts.get() == 
3);
+
+        await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 
1);
+
+        assertTrue(timeoutSamples.size() > 1, "Expected at least 2 timeout 
samples, got: " + timeoutSamples.size());
+
+        for (int i = 1; i < timeoutSamples.size(); i++) {
+            assertTrue(timeoutSamples.get(i - 1) < timeoutSamples.get(i), 
"timeout increasing is expected!");
+        }
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Verifies that 100 concurrent transactions are all eventually committed 
after transient failures.
+     *
+     * <p>Creates a zone with 25 partitions to distribute load. For each 
transaction, drops
+     * the first {@link TxFinishReplicaRequest} (keyed by transaction ID) to 
force one retry per
+     * transaction. After all tasks are submitted and the thread pool shuts 
down, waits for all
+     * 100 transactions to complete and for all retry context entries to be 
cleaned up.
+     *
+     * @throws Exception if the thread pool is interrupted during shutdown.
+     */
+    @Test
+    public void testRetryManyConcurrentDurableFinishes() throws Exception {
+        IgniteImpl node = anyNode();
+
+        String zone1Sql = "create zone test_zone_1 (partitions 25, replicas " 
+ REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String table1Sql = "create table test_table_1 (key bigint primary key, 
val varchar(20)) zone TEST_ZONE_1";
+
+        sql(zone1Sql);
+        sql(table1Sql);
+
+        Map<String, AtomicInteger> failedCleanupAttempts = new 
ConcurrentHashMap<>();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+            retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof TxFinishReplicaRequest) {
+                    String txId = ((TxFinishReplicaRequest) 
msg).txId().toString();
+
+                    if (failedCleanupAttempts.computeIfAbsent(txId, key -> new 
AtomicInteger(0)).getAndIncrement() == 0) {
+                        return true;
+                    }
+                }
+
+                return false;
+            });
+        }
+
+        ExecutorService threadPool = newFixedThreadPool(10);
+
+        List<? extends Future<?>> futures = IntStream.range(0, 100).mapToObj(i 
-> threadPool.submit(() -> {
+            Transaction tx = node.transactions().begin();
+            node.sql().execute(tx, "insert into test_table_1 (key, val) values 
(?, ?)", i, "val-" + i);
+
+            tx.commitAsync();
+        })).collect(toList());
+
+        try {
+            futures.forEach(ItDurableFinishFailureTest::getQuietly);
+        } finally {
+            threadPool.shutdown();
+            threadPool.awaitTermination(5, SECONDS);
+        }
+
+        await().timeout(15, SECONDS).until(() -> commitedTransactions(node) == 
100);
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Verifies that transactions targeting different replication groups are 
tracked
+     * independently in the retry context and converge to the same retry 
timeout value.
+     *
+     * <p>Creates two separate single-partition zones with dedicated tables. 
Drops the first
+     * two {@link TxFinishReplicaRequest} attempts for each transaction (keyed 
by transaction ID)
+     * to advance both retry contexts by the same number of steps. On the 
third attempt, captures
+     * a snapshot of all retry context entries across all nodes.
+     *
+     * <p>After both transactions commit, asserts that:
+     * <ul>
+     *     <li>exactly two distinct entries are present in the captured 
snapshot — one per transaction,
+     *         confirming that each transaction is tracked under its own 
key;</li>
+     *     <li>both entries carry the same timeout value, confirming that 
independently progressed
+     *         retry contexts reach identical timeouts when starting from the 
same initial value
+     *         and advancing the same number of steps.</li>
+     * </ul>
+     *
+     * <p>Finally, asserts that both retry context entries are cleaned up 
after successful commit.
+     */
+    @Test
+    public void testRetryDurableFinishForDifferentZones() {
+        IgniteImpl node = anyNode();
+
+        String zone1Sql = "create zone test_zone_1 (partitions 1, replicas " + 
REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String table1Sql = "create table test_table_1 (key bigint primary key, 
val varchar(20)) zone TEST_ZONE_1";
+
+        sql(zone1Sql);
+        sql(table1Sql);
+
+        String zone2Sql = "create zone test_zone_2 (partitions 1, replicas " + 
REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String table2Sql = "create table test_table_2 (key bigint primary key, 
val varchar(20)) zone TEST_ZONE_2";
+
+        sql(zone2Sql);
+        sql(table2Sql);
+
+        Map<String, AtomicInteger> failedFinishAttempts = new 
ConcurrentHashMap<>();
+        Map<String, TimeoutState> timeoutSamples = new ConcurrentHashMap<>();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+            retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof TxFinishReplicaRequest) {
+                    String txId = ((TxFinishReplicaRequest) 
msg).txId().toString();
+
+                    if (failedFinishAttempts.computeIfAbsent(txId, key -> new 
AtomicInteger(0)).getAndIncrement() < 2) {
+                        return true;
+                    }
+
+                    timeoutSamples.putAll(
+                            retryContexts.stream()
+                                    .map(KeyBasedRetryContext::snapshot)
+                                    .filter(snapshot -> !snapshot.isEmpty())
+                                    .flatMap(snapshot -> 
snapshot.entrySet().stream())
+                                    .collect(toMap(Entry::getKey, 
Entry::getValue, (existing, replacement) -> replacement))
+                    );
+                }
+
+                return false;
+            });
+        }
+
+        Transaction tx1 = node.transactions().begin();
+        node.sql().execute(tx1, "insert into test_table_1 (key, val) values 
(1, 'val-1')");
+
+        tx1.commitAsync();
+
+        Transaction tx2 = node.transactions().begin();
+        node.sql().execute(tx2, "insert into TEST_TABLE_2 (key, val) values 
(1, 'val-1')");
+
+        tx2.commitAsync();
+
+        await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 
2);
+
+        assertEquals(2, timeoutSamples.size(),
+                "Expected timeout state for both transactions, but got: " + 
timeoutSamples.keySet());
+
+        List<Integer> collectedTimeouts = timeoutSamples.values().stream()
+                .map(TimeoutState::getTimeout)
+                .distinct()
+                .collect(toList());
+
+        assertEquals(1, collectedTimeouts.size(),
+                "Expected both transactions to have the same timeout value, 
but got: " + collectedTimeouts);
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Returns the total number of committed transactions on the given node by 
reading
+     * the {@value #TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME} metric from
+     * {@link TransactionMetricsSource}.
+     *
+     * <p>Fails the test immediately if the metric is not found, as this 
indicates
+     * a misconfiguration or an unexpected change in metric naming.
+     *
+     * @param node the node to read the metric from.
+     * @return total number of committed transactions.
+     */
+    private static long commitedTransactions(IgniteImpl node) {
+        Iterable<Metric> metrics = node.metricManager()
+                .metricSnapshot()
+                .metrics()
+                .get(TransactionMetricsSource.SOURCE_NAME);
+
+        for (Metric m : metrics) {
+            if (TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME.equals(m.name())) {
+                return ((LongMetric) m).value();
+            }
+        }
+
+        fail();
+
+        return -1;
+    }
+
+    /**
+     * Waits for the given {@link Future} to complete and returns its result.
+     *
+     * <p>Wraps checked exceptions as {@link AssertionError} so they propagate 
cleanly
+     * through {@link java.util.function.Consumer} lambdas in test code 
without requiring
+     * explicit try-catch blocks.
+     *
+     * <ul>
+     *     <li>{@link ExecutionException} — wraps the cause as an {@link 
AssertionError},
+     *         preserving the original exception for diagnosis.</li>
+     *     <li>{@link InterruptedException} — restores the interrupt flag and 
wraps
+     *         as an {@link AssertionError}.</li>
+     * </ul>
+     *
+     * @param <T>    the future's result type.
+     * @param future the future to wait for.
+     * @return the future's result.
+     * @throws AssertionError if the future completed exceptionally or the 
thread was interrupted.
+     */
+    private static <T> T getQuietly(Future<T> future) {
+        try {
+            return future.get();
+        } catch (ExecutionException e) {
+            throw new AssertionError("Future completed exceptionally", 
e.getCause());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AssertionError("Interrupted while waiting for future", 
e);
+        }
+    }
+}
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
index 723e57df6d5..a153c05d332 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
@@ -17,31 +17,76 @@
 
 package org.apache.ignite.tx.distributed;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS;
 import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.metrics.LongMetric;
 import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.TimeoutState;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Tests for transaction cleanup failure.
+ * Integration tests verifying the retry behavior of transaction write intent 
cleanup
+ * ({@link WriteIntentSwitchReplicaRequest}) in a 3-node Apache Ignite cluster.
+ *
+ * <p>Each test drops or intercepts {@link WriteIntentSwitchReplicaRequest} 
messages on all nodes
+ * to simulate transient network failures during the cleanup phase that 
follows a committed
+ * transaction. Tests verify that:
+ * <ul>
+ *     <li>cleanup is retried after a transient failure and eventually 
succeeds;</li>
+ *     <li>retry timeouts grow monotonically between consecutive attempts;</li>
+ *     <li>the retry context is cleaned up after successful cleanup;</li>
+ *     <li>no unnecessary retries occur when the first cleanup attempt 
succeeds.</li>
+ * </ul>
+ *
+ * <p>Each test creates a single-partition, 3-replica zone and table in {@link 
#setup()}.
+ * Tests that require additional zones or tables create them locally.
  */
 public class ItTxCleanupFailureTest extends ClusterPerTestIntegrationTest {
-    /** Table name. */
+    /**
+     * Thread name fragment identifying the thread that sends retry attempts.
+     * Used to distinguish retried messages from the original cleanup attempt 
in message interceptors.
+     */
+    private static final String CLEANUP_THREAD_NAME = "tx-async-write-intent";
+
+    /** Name of the default test table created in {@link #setup()}. */
     private static final String TABLE_NAME = "test_table";
+
+    /** Number of replicas for all test zones. */
     private static final int REPLICAS = 3;
 
+    /**
+     * Creates a single-partition, 3-replica zone and a test table before each 
test.
+     * Tests that require additional tables or zones create them locally.
+     */
     @BeforeEach
     public void setup() {
         String zoneSql = "create zone test_zone (partitions 1, replicas " + 
REPLICAS
@@ -52,6 +97,50 @@ public class ItTxCleanupFailureTest extends 
ClusterPerTestIntegrationTest {
         sql(tableSql);
     }
 
+    /**
+     * Verifies that no retry occurs when the write intent cleanup succeeds on 
the first attempt.
+     *
+     * <p>Installs a message interceptor that counts {@link 
WriteIntentSwitchReplicaRequest}
+     * messages without dropping any of them. After all write intents are 
resolved, asserts
+     * that exactly one cleanup message was sent across all nodes.
+     */
+    @Test
+    public void testNoRetryOnSuccessfulCleanup() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger cleanupAttempts = new AtomicInteger();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof WriteIntentSwitchReplicaRequest) {
+                    cleanupAttempts.incrementAndGet();
+                }
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, SECONDS)
+                .until(() -> pendingWriteIntents(node) == 0);
+
+        await().timeout(1, SECONDS).until(() -> cleanupAttempts.get() >= 1);
+
+        assertEquals(1, cleanupAttempts.get());
+    }
+
+    /**
+     * Verifies that the write intent cleanup is retried after a single 
failure.
+     *
+     * <p>Drops the first {@link WriteIntentSwitchReplicaRequest} on all nodes 
to simulate
+     * a transient failure. On the subsequent retry — arriving on the write 
intent switch
+     * executor thread — captures a snapshot of the retry context across all 
nodes and records
+     * how many nodes have an active entry for this transaction. After cleanup 
completes,
+     * asserts that exactly one node had an active retry entry during the 
retry, and that
+     * the retry context is fully cleaned up.
+     */
     @Test
     public void testRetry() {
         IgniteImpl node = anyNode();
@@ -60,11 +149,26 @@ public class ItTxCleanupFailureTest extends 
ClusterPerTestIntegrationTest {
 
         AtomicInteger failedCleanupAttempts = new AtomicInteger();
 
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        AtomicLong expectedSizeOfRetryContext = new AtomicLong(0);
+
         for (IgniteImpl n : runningNodesIter()) {
+            retryContexts.add((KeyBasedRetryContext) ((TxManagerImpl) 
n.txManager()).retryContext());
+
             n.dropMessages((dest, msg) -> {
-                if (msg instanceof WriteIntentSwitchReplicaRequest && 
failedCleanupAttempts.get() == 0) {
-                    // Makes cleanup fail on write intent switch attempt with 
replication timeout, on the first attempt.
-                    return failedCleanupAttempts.incrementAndGet() == 1;
+                if (msg instanceof WriteIntentSwitchReplicaRequest) {
+                    if (failedCleanupAttempts.get() == 0) {
+                        // Makes cleanup fail on write intent switch attempt 
with replication timeout, on the first attempt.
+                        return failedCleanupAttempts.incrementAndGet() == 1;
+                    }
+
+                    if 
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+                        expectedSizeOfRetryContext.compareAndSet(0, 
retryContexts.stream()
+                                .map(KeyBasedRetryContext::snapshot)
+                                .filter(retryContextSnapshot -> 
retryContextSnapshot.size() == 1)
+                                .count());
+                    }
                 }
 
                 return false;
@@ -73,12 +177,266 @@ public class ItTxCleanupFailureTest extends 
ClusterPerTestIntegrationTest {
 
         tx.commitAsync();
 
-        await().timeout(5, TimeUnit.SECONDS).until(() -> 
failedCleanupAttempts.get() == 1);
+        await().timeout(5, SECONDS).until(() -> failedCleanupAttempts.get() == 
1);
+
+        await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 
0);
 
-        // Checks that cleanup finally succeeded.
-        await().timeout(5, TimeUnit.SECONDS).until(() -> 
pendingWriteIntents(node) == 0);
+        assertEquals(1, expectedSizeOfRetryContext.get());
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
     }
 
+    /**
+     * Verifies that retry timeouts grow monotonically between consecutive 
retry attempts.
+     *
+     * <p>Drops the first 3 {@link WriteIntentSwitchReplicaRequest} messages 
arriving on the
+     * write intent switch executor thread (identified by the executor's 
thread name prefix).
+     * For each dropped attempt, captures the current timeout from the retry 
context snapshot.
+     * After all drops and the final successful cleanup, asserts that the 
captured timeout
+     * sequence is strictly increasing, confirming exponential backoff 
behavior.
+     *
+     * <p>Sampling is restricted to the write intent switch executor thread to 
ensure timeouts
+     * are captured after the retry context has been advanced for the current 
attempt, rather
+     * than observing a stale pre-drop state.
+     */
+    @Test
+    public void testRetryWithGrowingTimeout() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger failedCleanupAttempts = new AtomicInteger();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        List<Integer> timeoutSamples = new CopyOnWriteArrayList<>();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+            retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof WriteIntentSwitchReplicaRequest) {
+                    if 
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+                        if (failedCleanupAttempts.getAndIncrement() < 3) {
+                            retryContexts.stream()
+                                    .map(KeyBasedRetryContext::snapshot)
+                                    .filter(retryContextSnapshot -> 
retryContextSnapshot.size() == 1)
+                                    .flatMap(retryContextSnapshot -> 
retryContextSnapshot.values().stream())
+                                    .forEach(timeoutState -> 
timeoutSamples.add(timeoutState.getTimeout()));
+
+                            return true;
+                        }
+                    }
+
+                    return false;
+                }
+
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, SECONDS).until(() -> failedCleanupAttempts.get() == 
3);
+
+        await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 
0);
+
+        assertTrue(timeoutSamples.size() > 1, "Expected at least 2 timeout 
samples, got: " + timeoutSamples.size());
+
+        for (int i = 1; i < timeoutSamples.size(); i++) {
+            assertTrue(timeoutSamples.get(i - 1) < timeoutSamples.get(i), 
"timeout increasing is expected!");
+        }
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Verifies that 100 concurrent transactions all have their write intents 
cleaned up
+     * after transient failures.
+     *
+     * <p>Creates a zone with 25 partitions to distribute transaction load. 
For each
+     * transaction, drops the first {@link WriteIntentSwitchReplicaRequest} on 
the write
+     * intent switch executor thread (keyed by transaction ID) to force 
exactly one retry
+     * per transaction. After all 100 tasks are submitted and the thread pool 
shuts down,
+     * waits for all pending write intents to reach zero and for all per-node 
retry context
+     * entries to be cleaned up.
+     *
+     * @throws Exception if the thread pool is interrupted during shutdown.
+     */
+    @Test
+    public void testRetryManyConcurrentCleanUps() throws Exception {
+        IgniteImpl node = anyNode();
+
+        String zone1Sql = "create zone test_zone_1 (partitions 25, replicas " 
+ REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String table1Sql = "create table test_table_1 (key bigint primary key, 
val varchar(20)) zone TEST_ZONE_1";
+
+        sql(zone1Sql);
+        sql(table1Sql);
+
+        Map<String, AtomicInteger> failedCleanupAttempts = new 
ConcurrentHashMap<>();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+            retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof WriteIntentSwitchReplicaRequest) {
+                    if 
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+                        String txId = ((WriteIntentSwitchReplicaRequest) 
msg).txId().toString();
+
+                        return failedCleanupAttempts.computeIfAbsent(txId, key 
-> new AtomicInteger(0)).getAndIncrement() == 0;
+                    }
+
+                    return false;
+                }
+
+                return false;
+            });
+        }
+
+        ExecutorService threadPool = Executors.newFixedThreadPool(10);
+
+        List<? extends Future<?>> futures = IntStream.range(0, 100).mapToObj(i 
-> threadPool.submit(() -> {
+            Transaction tx = node.transactions().begin();
+            node.sql().execute(tx, "insert into test_table_1 (key, val) values 
(?, ?)", i, "val-" + i);
+
+            tx.commitAsync();
+        })).collect(toList());
+
+        try {
+            futures.forEach(ItTxCleanupFailureTest::getQuietly);
+        } finally {
+            threadPool.shutdown();
+            threadPool.awaitTermination(5, SECONDS);
+        }
+
+        await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 
0);
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Verifies that cleanup retries for transactions targeting different 
replication groups
+     * are tracked independently in each node's retry context, and that 
independently
+     * progressed contexts converge to the same timeout value.
+     *
+     * <p>Creates two separate single-partition zones with dedicated tables. 
For each
+     * transaction, drops the first two {@link 
WriteIntentSwitchReplicaRequest} messages
+     * arriving on the cleanup thread (keyed by transaction ID), forcing both 
retry contexts
+     * to advance by the same number of steps. On the third attempt for each 
transaction,
+     * captures a snapshot of all retry context entries across all nodes, 
merging per-node
+     * entries by transaction ID — values are identical across nodes for the 
same transaction
+     * since each node applies the same backoff logic independently.
+     *
+     * <p>After both transactions' write intents are resolved, asserts that:
+     * <ul>
+     *     <li>exactly two distinct transaction IDs are present in the merged 
snapshot —
+     *         one per transaction, confirming each transaction is tracked 
under its own key
+     *         in every node's retry context;</li>
+     *     <li>both entries carry the same timeout value, confirming that 
independently
+     *         progressed per-node retry contexts reach identical timeouts 
when starting
+     *         from the same initial value and advancing the same number of 
steps.</li>
+     * </ul>
+     *
+     * <p>Finally, asserts that all per-node retry context entries are cleaned 
up after
+     * successful cleanup.
+     */
+    @Test
+    public void testRetryCleanUpsForDifferentZones() {
+        IgniteImpl node = anyNode();
+
+        String zone1Sql = "create zone test_zone_1 (partitions 1, replicas " + 
REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String table1Sql = "create table test_table_1 (key bigint primary key, 
val varchar(20)) zone TEST_ZONE_1";
+
+        sql(zone1Sql);
+        sql(table1Sql);
+
+        String zone2Sql = "create zone test_zone_2 (partitions 1, replicas " + 
REPLICAS
+                + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+        String table2Sql = "create table test_table_2 (key bigint primary key, 
val varchar(20)) zone TEST_ZONE_2";
+
+        sql(zone2Sql);
+        sql(table2Sql);
+
+        Map<String, AtomicInteger> failedCleanupAttempts = new 
ConcurrentHashMap<>();
+        Map<String, TimeoutState> timeoutSamples = new ConcurrentHashMap<>();
+
+        List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+            retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof WriteIntentSwitchReplicaRequest) {
+                    if 
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+                        String txId = ((WriteIntentSwitchReplicaRequest) 
msg).txId().toString();
+
+                        if (failedCleanupAttempts.computeIfAbsent(txId, key -> 
new AtomicInteger(0)).getAndIncrement() < 2) {
+                            return true;
+                        }
+
+                        timeoutSamples.putAll(
+                                retryContexts.stream()
+                                        .map(KeyBasedRetryContext::snapshot)
+                                        .filter(snapshot -> 
!snapshot.isEmpty())
+                                        .flatMap(snapshot -> 
snapshot.entrySet().stream())
+                                        .collect(toMap(Entry::getKey, 
Entry::getValue, (existing, replacement) -> replacement))
+                        );
+                    }
+
+                    return false;
+                }
+
+                return false;
+            });
+        }
+
+        Transaction tx1 = node.transactions().begin();
+        node.sql().execute(tx1, "insert into test_table_1 (key, val) values 
(1, 'val-1')");
+
+        tx1.commitAsync();
+
+        Transaction tx2 = node.transactions().begin();
+        node.sql().execute(tx2, "insert into TEST_TABLE_2 (key, val) values 
(1, 'val-1')");
+
+        tx2.commitAsync();
+
+        await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 
0);
+
+        assertEquals(2, timeoutSamples.size(),
+                "Expected timeout state for both transactions, but got: " + 
timeoutSamples.keySet());
+
+        List<Integer> collectedTimeouts = timeoutSamples.values().stream()
+                .map(TimeoutState::getTimeout)
+                .distinct()
+                .collect(toList());
+
+        assertEquals(1, collectedTimeouts.size(),
+                "Expected both transactions to have the same timeout value, 
but got: " + collectedTimeouts);
+
+        await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+                .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+    }
+
+    /**
+     * Returns the number of pending write intents on the given node by 
reading the
+     * {@link TransactionMetricsSource#METRIC_PENDING_WRITE_INTENTS} metric.
+     *
+     * <p>Fails the test immediately if the metric is not found, as this 
indicates
+     * a misconfiguration or an unexpected change in metric naming.
+     *
+     * @param node the node to read the metric from.
+     * @return number of pending write intents.
+     */
     private static long pendingWriteIntents(IgniteImpl node) {
         Iterable<Metric> metrics = node.metricManager()
                 .metricSnapshot()
@@ -95,4 +453,34 @@ public class ItTxCleanupFailureTest extends 
ClusterPerTestIntegrationTest {
 
         return -1;
     }
+
+    /**
+     * Waits for the given {@link Future} to complete and returns its result.
+     *
+     * <p>Wraps checked exceptions as {@link AssertionError} so they propagate 
cleanly
+     * through {@link java.util.function.Consumer} lambdas in test code 
without requiring
+     * explicit try-catch blocks.
+     *
+     * <ul>
+     *     <li>{@link ExecutionException} — wraps the cause as an {@link 
AssertionError},
+     *         preserving the original exception for diagnosis.</li>
+     *     <li>{@link InterruptedException} — restores the interrupt flag and 
wraps
+     *         as an {@link AssertionError}.</li>
+     * </ul>
+     *
+     * @param <T>    the future's result type.
+     * @param future the future to wait for.
+     * @return the future's result.
+     * @throws AssertionError if the future completed exceptionally or the 
thread was interrupted.
+     */
+    private static <T> T getQuietly(Future<T> future) {
+        try {
+            return future.get();
+        } catch (ExecutionException e) {
+            throw new AssertionError("Future completed exceptionally", 
e.getCause());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AssertionError("Interrupted while waiting for future", 
e);
+        }
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 953d0baf249..1c46039a245 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.tx.impl;
 
-import static java.lang.Math.min;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
@@ -25,7 +24,7 @@ import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
 import static org.apache.ignite.internal.tx.TxStateMeta.builder;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
-import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry;
+import static org.apache.ignite.internal.util.retry.RetryUtil.scheduleRetry;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,6 +56,7 @@ import 
org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.retry.RetryContext;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -65,9 +65,6 @@ import org.jetbrains.annotations.Nullable;
 public class TxCleanupRequestSender {
     private static final int ATTEMPTS_LOG_THRESHOLD = 100;
 
-    private static final int RETRY_INITIAL_TIMEOUT_MS = 20;
-    private static final int RETRY_MAX_TIMEOUT_MS = 30_000;
-
     private static final String UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY = 
"Unsuccessful transaction cleanup after N attempts";
 
     private final IgniteThrottledLogger throttledLog;
@@ -91,6 +88,9 @@ public class TxCleanupRequestSender {
     /** Executor that is used to schedule retries of cleanup messages in case 
of retryable errors. */
     private final ScheduledExecutorService retryExecutor;
 
+    /** The retry context for handling retries of cleanup messages. */
+    private final RetryContext retryContext;
+
     /**
      * The constructor.
      *
@@ -100,6 +100,7 @@ public class TxCleanupRequestSender {
      * @param cleanupExecutor Cleanup executor.
      * @param commonScheduler Common scheduler.
      * @param clusterNodeResolver Cluster node resolver.
+     * @param retryContext retry context.
      */
     public TxCleanupRequestSender(
             TxMessageSender txMessageSender,
@@ -107,7 +108,8 @@ public class TxCleanupRequestSender {
             VolatileTxStateMetaStorage txStateVolatileStorage,
             ExecutorService cleanupExecutor,
             ScheduledExecutorService commonScheduler,
-            ClusterNodeResolver clusterNodeResolver
+            ClusterNodeResolver clusterNodeResolver,
+            RetryContext retryContext
     ) {
         this.txMessageSender = txMessageSender;
         this.placementDriverHelper = placementDriverHelper;
@@ -116,6 +118,7 @@ public class TxCleanupRequestSender {
         this.retryExecutor = commonScheduler;
         this.clusterNodeResolver = clusterNodeResolver;
         this.throttledLog = 
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), 
commonScheduler);
+        this.retryContext = retryContext;
     }
 
     /**
@@ -185,7 +188,7 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
     ) {
-        return sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
+        return sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node, null);
     }
 
     /**
@@ -227,7 +230,7 @@ public class TxCleanupRequestSender {
             enlistedPartitionGroups.add(new 
EnlistedPartitionGroup(partitionId, partition.tableIds()));
         });
 
-        return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, 
commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0);
+        return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, 
commit, commitTimestamp, txId);
     }
 
     /**
@@ -246,18 +249,6 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
-    ) {
-        return cleanup(commitPartitionId, partitions, commit, commitTimestamp, 
txId, RETRY_INITIAL_TIMEOUT_MS, 0);
-    }
-
-    private CompletableFuture<Void> cleanup(
-            @Nullable ZonePartitionId commitPartitionId,
-            Collection<EnlistedPartitionGroup> partitions,
-            boolean commit,
-            @Nullable HybridTimestamp commitTimestamp,
-            UUID txId,
-            long timeout,
-            int attemptsMade
     ) {
         Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds = 
partitions.stream()
                 .collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -282,9 +273,7 @@ public class TxCleanupRequestSender {
                             commit,
                             commitTimestamp,
                             txId,
-                            
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds),
-                            timeout,
-                            attemptsMade
+                            
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds)
                     );
 
                     Map<String, List<EnlistedPartitionGroup>> 
partitionsByPrimaryName = toPartitionInfosByPrimaryName(
@@ -296,9 +285,7 @@ public class TxCleanupRequestSender {
                             partitionsByPrimaryName,
                             commit,
                             commitTimestamp,
-                            txId,
-                            timeout,
-                            attemptsMade
+                            txId
                     );
                 });
     }
@@ -325,9 +312,7 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
-            List<EnlistedPartitionGroup> partitionsWithoutPrimary,
-            long timeout,
-            int attemptsMade
+            List<EnlistedPartitionGroup> partitionsWithoutPrimary
     ) {
         Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds = 
partitionsWithoutPrimary.stream()
                 .collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -345,9 +330,7 @@ public class TxCleanupRequestSender {
                             partitionsByPrimaryName,
                             commit,
                             commitTimestamp,
-                            txId,
-                            timeout,
-                            attemptsMade
+                            txId
                     );
                 });
     }
@@ -357,9 +340,7 @@ public class TxCleanupRequestSender {
             Map<String, List<EnlistedPartitionGroup>> partitionsByNode,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
-            UUID txId,
-            long timeout,
-            int attemptsMade
+            UUID txId
     ) {
         List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
 
@@ -368,7 +349,7 @@ public class TxCleanupRequestSender {
             List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
 
             
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node,
-                    commitPartitionId == null ? null : nodePartitions, 
timeout, attemptsMade));
+                    commitPartitionId == null ? null : nodePartitions));
         }
 
         return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
@@ -380,9 +361,7 @@ public class TxCleanupRequestSender {
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
             String node,
-            @Nullable Collection<EnlistedPartitionGroup> partitions,
-            long timeout,
-            int attemptsMade
+            @Nullable Collection<EnlistedPartitionGroup> partitions
     ) {
         return txMessageSender.cleanup(node, partitions, txId, commit, 
commitTimestamp)
                 .thenApply(response -> {
@@ -396,16 +375,21 @@ public class TxCleanupRequestSender {
                 })
                 .handleAsync((networkMessage, throwable) -> {
                     if (throwable != null) {
+                        String timeoutKey = commitPartitionId == null ? 
txId.toString() : commitPartitionId.toString();
+
                         if 
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
-                            if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) {
-                                throttledLog.warn(
-                                        UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY,
-                                        "Unsuccessful transaction cleanup 
after {} attempts, keep retrying [txId={}]",
-                                        throwable,
-                                        ATTEMPTS_LOG_THRESHOLD,
-                                        txId
-                                );
-                            }
+                            
retryContext.getState(timeoutKey).ifPresent(timeoutState -> {
+                                if (timeoutState.getAttempt() > 
ATTEMPTS_LOG_THRESHOLD || timeoutState.getAttempt() < 0) {
+                                    throttledLog.warn(
+                                            UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY,
+                                            "Unsuccessful transaction cleanup 
after {} attempts for key {}, keep retrying [txId={}]",
+                                            throwable,
+                                            ATTEMPTS_LOG_THRESHOLD,
+                                            timeoutKey,
+                                            txId
+                                    );
+                                }
+                            });
 
                             // In the case of a failure we repeat the process, 
but start with finding correct primary replicas
                             // for this subset of partitions. If nothing 
changed in terms of the nodes and primaries
@@ -429,13 +413,12 @@ public class TxCleanupRequestSender {
                                                 commitTimestamp,
                                                 txId,
                                                 node,
-                                                partitions,
-                                                incrementTimeout(timeout),
-                                                attemptsMade + 1
+                                                partitions
                                         ),
-                                        timeout,
+                                        
retryContext.updateAndGetState(timeoutKey).getTimeout(),
                                         TimeUnit.MILLISECONDS,
-                                        retryExecutor
+                                        retryExecutor,
+                                        () -> 
retryContext.resetState(timeoutKey)
                                 );
                             }
 
@@ -447,13 +430,12 @@ public class TxCleanupRequestSender {
                                         partitions,
                                         commit,
                                         commitTimestamp,
-                                        txId,
-                                        incrementTimeout(timeout),
-                                        attemptsMade + 1
+                                        txId
                                     ),
-                                    timeout,
+                                    
retryContext.updateAndGetState(timeoutKey).getTimeout(),
                                     TimeUnit.MILLISECONDS,
-                                    retryExecutor
+                                    retryExecutor,
+                                    () -> retryContext.resetState(timeoutKey)
                             );
                         }
 
@@ -465,10 +447,6 @@ public class TxCleanupRequestSender {
                 .thenCompose(v -> v);
     }
 
-    private static long incrementTimeout(long currentTimeout) {
-        return min(currentTimeout * 2, RETRY_MAX_TIMEOUT_MS);
-    }
-
     private static class CleanupContext {
         private final ZonePartitionId commitPartitionId;
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 2c8cc25e878..22e76838c6a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -44,6 +44,7 @@ import static 
org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.internal.util.retry.RetryUtil.scheduleRetry;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -121,6 +122,9 @@ import 
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import org.apache.ignite.internal.tx.views.LocksViewProvider;
 import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
 import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.RetryContext;
+import org.apache.ignite.internal.util.retry.TimeoutStrategy;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -242,6 +246,8 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
 
     private final RemotelyTriggeredResourceRegistry resourcesRegistry;
 
+    private final RetryContext retryContext;
+
     /**
      * Test-only constructor.
      *
@@ -260,6 +266,7 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
      * @param transactionInflights Transaction inflights.
      * @param lowWatermark Low watermark.
      * @param metricManager Metric manager.
+     * @param timeoutStrategy Timeout strategy.
      */
     @TestOnly
     public TxManagerImpl(
@@ -278,7 +285,8 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
             TransactionInflights transactionInflights,
             LowWatermark lowWatermark,
             ScheduledExecutorService commonScheduler,
-            MetricManager metricManager
+            MetricManager metricManager,
+            TimeoutStrategy timeoutStrategy
     ) {
         this(
                 clusterService.staticLocalNode(),
@@ -300,7 +308,8 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
                 lowWatermark,
                 commonScheduler,
                 new FailureManager(new NoOpFailureHandler()),
-                metricManager
+                metricManager,
+                timeoutStrategy
         );
     }
 
@@ -324,6 +333,7 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
      * @param transactionInflights Transaction inflights.
      * @param lowWatermark Low watermark.
      * @param metricManager Metric manager.
+     * @param timeoutStrategy Timeout strategy.
      */
     public TxManagerImpl(
             InternalClusterNode localNode,
@@ -345,7 +355,8 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
             LowWatermark lowWatermark,
             ScheduledExecutorService commonScheduler,
             FailureProcessor failureProcessor,
-            MetricManager metricManager
+            MetricManager metricManager,
+            TimeoutStrategy timeoutStrategy
     ) {
         this.txConfig = txConfig;
         this.systemCfg = systemCfg;
@@ -404,13 +415,16 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
 
         transactionExpirationRegistry = new 
TransactionExpirationRegistry(txStateVolatileStorage);
 
+        retryContext = new KeyBasedRetryContext(timeoutStrategy);
+
         txCleanupRequestSender = new TxCleanupRequestSender(
                 txMessageSender,
                 placementDriverHelper,
                 txStateVolatileStorage,
                 writeIntentSwitchPool,
                 commonScheduler,
-                topologyService
+                topologyService,
+                retryContext
         );
 
         txMetrics = new TransactionMetricsSource(clockService);
@@ -827,7 +841,7 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
      */
     private CompletableFuture<Void> durableFinish(
             HybridTimestampTracker observableTimestampTracker,
-            ZonePartitionId commitPartition,
+            @Nullable ZonePartitionId commitPartition,
             boolean commit,
             Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
             UUID txId,
@@ -871,14 +885,23 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
                         if 
(ReplicatorRecoverableExceptions.isRecoverable(cause)) {
                             LOG.debug("Failed to finish Tx. The operation will 
be retried {}.", ex,
                                     formatTxInfo(txId, 
txStateVolatileStorage));
-                            return supplyAsync(() -> durableFinish(
-                                    observableTimestampTracker,
-                                    commitPartition,
-                                    commit,
-                                    enlistedPartitions,
-                                    txId,
-                                    commitTimestamp,
-                                    txFinishFuture
+
+                            String timeoutKey = commitPartition == null ? 
txId.toString() : commitPartition.toString();
+
+                            return supplyAsync(() -> scheduleRetry(
+                                    () -> durableFinish(
+                                            observableTimestampTracker,
+                                            commitPartition,
+                                            commit,
+                                            enlistedPartitions,
+                                            txId,
+                                            commitTimestamp,
+                                            txFinishFuture
+                                    ),
+                                    
retryContext.updateAndGetState(timeoutKey).getTimeout(),
+                                    MILLISECONDS,
+                                    commonScheduler,
+                                    () -> retryContext.resetState(timeoutKey)
                             ), 
partitionOperationsExecutor).thenCompose(identity());
                         } else {
                             LOG.warn("Failed to finish Tx {}.", ex,
@@ -1386,4 +1409,9 @@ public class TxManagerImpl implements TxManager, 
SystemViewProvider {
     public void clearLocalRwTxCounter() {
         localRwTxCounter.clear();
     }
+
+    @TestOnly
+    public RetryContext retryContext() {
+        return retryContext;
+    }
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index ba491c9160e..a2a794e0f7a 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -66,6 +66,8 @@ import 
org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -147,7 +149,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
                 mock(VolatileTxStateMetaStorage.class),
                 testSyncExecutorService(),
                 testSyncScheduledExecutorService(),
-                topologyService
+                topologyService,
+                new KeyBasedRetryContext(new NoopTimeoutStrategy())
         );
     }
 
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 71c8bb701e0..0d44bfb93b9 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -107,6 +107,7 @@ import 
org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
@@ -203,7 +204,8 @@ public class TxManagerTest extends IgniteAbstractTest {
                 transactionInflights,
                 lowWatermark,
                 commonScheduler,
-                new TestMetricManager()
+                new TestMetricManager(),
+                new NoopTimeoutStrategy()
         );
 
         assertThat(txManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());


Reply via email to