This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c7908d8f755 IGNITE-27685 Implement exponential backoff for durable
finish (#7680)
c7908d8f755 is described below
commit c7908d8f75555a9bbd17f462236f30d028560dce
Author: Konstantin Sirotkin <[email protected]>
AuthorDate: Tue Apr 21 16:15:11 2026 +0300
IGNITE-27685 Implement exponential backoff for durable finish (#7680)
---
.../apache/ignite/internal/util/IgniteUtils.java | 31 --
.../retry/ExponentialBackoffTimeoutStrategy.java | 132 ++++++
.../internal/util/retry/KeyBasedRetryContext.java | 155 +++++++
.../internal/util/retry/NoopTimeoutStrategy.java | 40 ++
.../ignite/internal/util/retry/RetryContext.java | 74 ++++
.../ignite/internal/util/retry/RetryUtil.java | 110 +++++
.../ignite/internal/util/retry/TimeoutState.java | 149 +++++++
.../internal/util/retry/TimeoutStrategy.java | 47 +++
.../ExponentialBackoffTimeoutStrategyTest.java | 129 ++++++
.../util/retry/KeyBasedRetryContextTest.java | 386 +++++++++++++++++
.../internal/util/retry/TimeoutStateTest.java | 132 ++++++
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../partition/replicator/fixtures/Node.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 4 +-
.../exec/rel/TableScanNodeExecutionTest.java | 4 +-
.../apache/ignite/distributed/ItLockTableTest.java | 4 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 4 +-
.../ignite/internal/table/ItColocationTest.java | 4 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 7 +-
.../table/impl/DummyInternalTableImpl.java | 4 +-
.../tx/distributed/ItDurableFinishFailureTest.java | 468 +++++++++++++++++++++
.../tx/distributed/ItTxCleanupFailureTest.java | 406 +++++++++++++++++-
.../internal/tx/impl/TxCleanupRequestSender.java | 102 ++---
.../ignite/internal/tx/impl/TxManagerImpl.java | 54 ++-
.../apache/ignite/internal/tx/TxCleanupTest.java | 5 +-
.../apache/ignite/internal/tx/TxManagerTest.java | 4 +-
27 files changed, 2339 insertions(+), 128 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f67cb78720c..63cf65f443d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -51,7 +51,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -60,7 +59,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -1341,35 +1339,6 @@ public class IgniteUtils {
return list.toArray();
}
- /**
- * Schedules the provided operation to be retried after the specified
delay.
- *
- * @param operation Operation.
- * @param delay Delay.
- * @param unit Time unit of the delay.
- * @param executor Executor to schedule the retry in.
- * @return Future that is completed when the operation is successful or
failed with an exception.
- */
- public static <T> CompletableFuture<T> scheduleRetry(
- Callable<CompletableFuture<T>> operation,
- long delay,
- TimeUnit unit,
- ScheduledExecutorService executor
- ) {
- CompletableFuture<T> future = new CompletableFuture<>();
-
- executor.schedule(() -> operation.call()
- .whenComplete((res, e) -> {
- if (e == null) {
- future.complete(res);
- } else {
- future.completeExceptionally(e);
- }
- }), delay, unit);
-
- return future;
- }
-
private static CompletableFuture<Void> startAsync(ComponentContext
componentContext, Stream<? extends IgniteComponent> components) {
return allOf(components
.filter(Objects::nonNull)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java
new file mode 100644
index 00000000000..58c38cc95b3
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@link TimeoutStrategy} that increases retry timeouts exponentially on
each attempt.
+ *
+ * <p>Each call to {@link #next(int)} multiplies the current timeout by {@code
backoffCoefficient},
+ * capping the result at {@link #maxTimeout}. Optionally, random jitter can be
applied to spread
+ * retry attempts across time and avoid thundering herd problems under high
concurrency.
+ *
+ * <p>When jitter is enabled, the returned timeout is randomized within the
range
+ * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout}.
+ *
+ * <p>This class is stateless and thread-safe. A single instance can be shared
across
+ * multiple retry contexts.
+ */
+// TODO: https://issues.apache.org/jira/browse/IGNITE-28481
+public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
+ /**
+ * Default backoff coefficient applied on each retry step. Doubles the
timeout per attempt.
+ */
+ private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
+
+ /**
+ * Multiplier applied to the current timeout on each call to {@link
#next(int)}.
+ * Must be greater than {@code 1.0} to produce a growing sequence.
+ */
+ private final double backoffCoefficient;
+
+ /**
+ * Whether to apply random jitter to the computed timeout.
+ * When {@code true}, the result is randomized within {@code [raw / 2, raw
* 1.5]}.
+ */
+ private final boolean jitter;
+
+ /**
+ * Upper bound for the timeout produced by this strategy, in milliseconds.
+ * The result of {@link #next(int)} is always capped at this value.
+ */
+ private final int maxTimeout;
+
+ /**
+ * Creates a strategy with default max timeout and backoff coefficient,
and no jitter.
+ *
+ * @see TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX
+ * @see #DEFAULT_BACKOFF_COEFFICIENT
+ */
+ public ExponentialBackoffTimeoutStrategy() {
+ this(DEFAULT_RETRY_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT);
+ }
+
+ /**
+ * Creates a strategy with the given max timeout and backoff coefficient,
and no jitter.
+ *
+ * @param maxTimeout maximum timeout this strategy may produce, in
milliseconds.
+ * @param backoffCoefficient multiplier applied to the current timeout on
each step.
+ * Must be greater than {@code 1.0}.
+ */
+ public ExponentialBackoffTimeoutStrategy(
+ int maxTimeout,
+ double backoffCoefficient
+ ) {
+ this(maxTimeout, backoffCoefficient, false);
+ }
+
+ /**
+ * Creates a strategy with the given max timeout, backoff coefficient, and
jitter setting.
+ *
+ * @param maxTimeout maximum timeout this strategy may produce, in
milliseconds.
+ * @param backoffCoefficient multiplier applied to the current timeout on
each step.
+ * Must be greater than {@code 1.0}.
+ * @param jitter if {@code true}, random jitter is applied to each
computed timeout.
+ */
+ public ExponentialBackoffTimeoutStrategy(
+ int maxTimeout,
+ double backoffCoefficient,
+ boolean jitter
+ ) {
+ this.maxTimeout = maxTimeout;
+ this.backoffCoefficient = backoffCoefficient;
+ this.jitter = jitter;
+ }
+
+ /**
+ * Computes the next retry timeout by multiplying {@code currentTimeout} by
+ * {@link #backoffCoefficient}, then capping at {@link #maxTimeout}.
+ * If jitter is enabled, the result is further randomized.
+ *
+ * @param currentTimeout current retry timeout in milliseconds.
+ * @return next retry timeout in milliseconds, capped at {@link
#maxTimeout}.
+ */
+ @Override
+ public int next(int currentTimeout) {
+ int jitteredTimeout = jitter ? applyJitter(currentTimeout) :
currentTimeout;
+
+ return (int) Math.min((jitteredTimeout * backoffCoefficient),
maxTimeout);
+ }
+
+ /**
+ * Applies random jitter to the given timeout value.
+ *
+ * <p>The result is uniformly distributed within {@code [raw / 2, raw *
1.5]},
+ * then capped at {@link #maxTimeout} to ensure the strategy ceiling is
never exceeded.
+ *
+ * @param raw computed timeout before jitter, in milliseconds.
+ * @return jittered timeout in milliseconds, capped at {@link #maxTimeout}.
+ */
+ private int applyJitter(int raw) {
+ int lo = raw / 2;
+ int hi = raw + lo;
+
+ return lo + ThreadLocalRandom.current().nextInt(hi - lo + 1);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java
new file mode 100644
index 00000000000..525523d03c6
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_TIMEOUT_MS_MAX;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A retry context that tracks timeout state independently per key.
+ *
+ * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff
progression
+ * for different retry targets — for example, different replication group IDs
or transaction IDs.
+ * State updates are performed atomically per key using {@link
ConcurrentHashMap#compute}.
+ *
+ * <p>To prevent unbounded memory growth, the registry is capped at {@link
#REGISTRY_SIZE_LIMIT}
+ * entries. Once the limit is reached, untracked keys receive a fixed {@link
#fallbackTimeoutState}
+ * that always returns {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
The limit is a soft cap and may be
+ * slightly exceeded under concurrent insertions.
+ *
+ * <p>This class is thread-safe.
+ */
+public class KeyBasedRetryContext implements RetryContext {
+ /**
+ * Maximum number of keys tracked in {@link #registry}.
+ * Once the limit is reached, untracked keys receive a fixed {@link
#fallbackTimeoutState}.
+ * Can be slightly exceeded under concurrent insertions.
+ */
+ private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+ /** Strategy used to compute the next timeout from the current one on each
advancement. */
+ private final TimeoutStrategy timeoutStrategy;
+
+ /**
+ * Sentinel state returned for keys that cannot be tracked because the
registry is full.
+ * Initialized with {@link TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}
and attempt {@code -1}
+ * to distinguish it from legitimately tracked states.
+ */
+ private final TimeoutState fallbackTimeoutState;
+
+ /** Per-key timeout state registry. Keys are typically transaction IDs or
replication group IDs. */
+ private final ConcurrentHashMap<String, TimeoutState> registry = new
ConcurrentHashMap<>();
+
+ /**
+ * Creates a new context with the given initial timeout and strategy.
+ *
+ * @param timeoutStrategy strategy used to compute subsequent timeout
values.
+ */
+ public KeyBasedRetryContext(TimeoutStrategy timeoutStrategy) {
+ this.timeoutStrategy = timeoutStrategy;
+
+ this.fallbackTimeoutState = new
TimeoutState(DEFAULT_RETRY_TIMEOUT_MS_MAX, -1);
+ }
+
+ /**
+ * Returns the current {@link TimeoutState} for the given key, if tracked.
+ *
+ * <p>Returns an empty {@link Optional} if the key has no recorded state
yet.
+ * If the registry is full and the key is not yet tracked, returns an
{@link Optional}
+ * containing a fallback state initialized to {@link
TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
+ *
+ * <p>This method does not insert the key into the registry.
+ *
+ * @param key the key to look up, typically a transaction ID or
replication group ID.
+ * @return current state for the key, fallback state if registry is full,
or empty if not tracked.
+ */
+ @Override
+ public Optional<TimeoutState> getState(String key) {
+ if (!registry.containsKey(key) && registry.size() >=
REGISTRY_SIZE_LIMIT) {
+ return of(fallbackTimeoutState);
+ }
+
+ return ofNullable(registry.get(key));
+ }
+
+ /**
+ * Atomically advances the retry state for the given key and returns the
updated state.
+ *
+ * <p>The update is performed inside {@link ConcurrentHashMap#compute},
which holds
+ * an exclusive per-key lock for the duration of the lambda, ensuring that
+ * {@link TimeoutState#update(TimeoutStrategy)} is never called
concurrently on the same instance.
+ *
+ * <p>When the registry is full, untracked keys receive the maximum
timeout.
+ * This acts as implicit backpressure: if enough keys are actively
retrying to fill
+ * the registry, the system is under a heavy load and new operations
should retry conservatively.
+ *
+ * @param key the key to advance state for, typically a transaction ID or
replication group ID.
+ * @return updated {@link TimeoutState} for the key, or {@link
#fallbackTimeoutState}
+ * if the registry is full.
+ */
+ @Override
+ public TimeoutState updateAndGetState(String key) {
+ if (!registry.containsKey(key) && registry.size() >=
REGISTRY_SIZE_LIMIT) {
+ return fallbackTimeoutState;
+ }
+
+ return registry.compute(key, (k, state) -> {
+ if (state == null) {
+ state = new TimeoutState();
+ }
+
+ state.update(timeoutStrategy);
+
+ return state;
+ });
+ }
+
+ /**
+ * Removes the retry state for the given key, resetting it as if no
retries had occurred.
+ *
+ * @param key the key whose state should be removed.
+ */
+ @Override
+ public void resetState(String key) {
+ registry.remove(key);
+ }
+
+ /**
+ * Returns an unmodifiable snapshot of the current registry contents.
+ *
+ * <p>The snapshot is a point-in-time copy of the registry map. The
returned
+ * {@link TimeoutState} values are live references — their internal state
may
+ * continue to change concurrently after the snapshot is taken.
+ *
+ * <p>This method is intended for testing only and should not be used in
production code.
+ *
+ * @return unmodifiable copy of the current key-to-state mappings.
+ */
+ @TestOnly
+ public Map<String, TimeoutState> snapshot() {
+ return unmodifiableMap(new HashMap<>(registry));
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java
new file mode 100644
index 00000000000..d77a83d9edc
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+/**
+ * A {@link TimeoutStrategy} that returns the current timeout unchanged on
every call.
+ *
+ * <p>Useful when retry backoff is not desired — for example, in tests or when
a flat
+ * retry interval is intentional. The timeout passed to {@link #next(int)} is
returned
+ * as-is, so the retry interval remains constant across all attempts.
+ *
+ * <p>This class is stateless and thread-safe.
+ */
+public class NoopTimeoutStrategy implements TimeoutStrategy {
+ /**
+ * Returns {@code currentTimeout} unchanged.
+ *
+ * @param currentTimeout current retry timeout in milliseconds.
+ * @return the same {@code currentTimeout} value, unmodified.
+ */
+ @Override
+ public int next(int currentTimeout) {
+ return currentTimeout;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryContext.java
new file mode 100644
index 00000000000..ce09996fd7d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.Optional;
+
+/**
+ * Manages per-key retry state for tracking timeout progression across retry
attempts.
+ *
+ * <p>Each key (typically a transaction ID or replication group ID) maps to an
independent
+ * {@link TimeoutState} that records the current retry timeout and attempt
count. The context
+ * is responsible for creating, advancing, and removing that state.
+ *
+ * <p>Implementations must be thread-safe.
+ *
+ * @see KeyBasedRetryContext
+ * @see TimeoutState
+ */
+public interface RetryContext {
+
+ /**
+ * Returns the current {@link TimeoutState} for the given key, if one
exists.
+ *
+ * <p>Returns an empty {@link Optional} if the key has not yet been
tracked.
+ * Implementations may also return a fallback state when internal capacity
limits are
+ * reached, in which case the returned state reflects the maximum
permissible timeout.
+ *
+ * <p>This method must not modify the registry — it is a read-only lookup.
+ *
+ * @param key the key to look up, typically a transaction ID or
replication group ID.
+ * @return current {@link TimeoutState} for the key, or empty if not yet
tracked.
+ */
+ Optional<TimeoutState> getState(String key);
+
+ /**
+ * Atomically advances the retry state for the given key and returns the
updated state.
+ *
+ * <p>If no state exists for the key yet, a fresh {@link TimeoutState} is
created.
+ * Otherwise, the existing state is advanced using the configured
+ * {@link TimeoutStrategy} and the attempt count is incremented.
+ *
+ * <p>When internal capacity limits prevent tracking new keys, a fallback
state with the
+ * maximum timeout is returned instead.
+ *
+ * @param key the key to advance state for, typically a transaction ID or
replication group ID.
+ * @return updated {@link TimeoutState} for the key, or a fallback state
if capacity is exhausted.
+ */
+ TimeoutState updateAndGetState(String key);
+
+ /**
+ * Removes the retry state for the given key, resetting it as if no
retries had occurred.
+ *
+ * <p>This allows future calls to {@link #updateAndGetState(String)} for
the same key to
+ * start fresh with the initial timeout.
+ *
+ * @param key the key whose state should be removed.
+ */
+ void resetState(String key);
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java
new file mode 100644
index 00000000000..80ce941f172
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for scheduling asynchronous retry operations.
+ *
+ * <p>Provides overloaded {@code scheduleRetry} methods that schedule an async
operation
+ * to run after a delay, optionally invoking callbacks on success or failure.
The returned
+ * {@link CompletableFuture} is completed when the scheduled operation
completes.
+ *
+ * <p>This class is not intended to manage the retry loop itself — it
schedules a single
+ * delayed execution. The caller is responsible for chaining successive calls
to build a
+ * full retry loop, typically driven by a retry context and a timeout strategy.
+ */
+public class RetryUtil {
+ /**
+ * Schedules the provided operation to run once after the specified delay.
+ *
+ * @param <T> result type of the operation.
+ * @param operation the async operation to schedule. Must return a
non-null {@link CompletableFuture}.
+ * @param delay delay before execution.
+ * @param unit time unit of {@code delay}.
+ * @param executor executor used to schedule the operation.
+ * @return a {@link CompletableFuture} completed with the operation's
result on success,
+ * or completed exceptionally if the operation fails.
+ */
+ public static <T> CompletableFuture<T> scheduleRetry(
+ Callable<CompletableFuture<T>> operation,
+ long delay,
+ TimeUnit unit,
+ ScheduledExecutorService executor
+ ) {
+ return scheduleRetry(operation, delay, unit, executor, null);
+ }
+
+ /**
+ * Schedules the provided operation to run once after the specified delay,
+ * invoking separate callbacks on success and failure.
+ *
+ * <p>The provided completion callback is invoked regardless of whether the
+ * operation succeeds or fails.
+ *
+ * @param <T> result type of the operation.
+ * @param operation the async operation to schedule. Must return a
non-null
+ * {@link CompletableFuture}.
+ * @param delay delay before execution.
+ * @param unit time unit of {@code delay}.
+ * @param executor executor used to schedule the operation.
+ * @param onComplete optional callback invoked after the operation
completes, whether
+ * successfully or exceptionally — for example, to reset
a retry context.
+ * @return a {@link CompletableFuture} completed with the operation's
result on success,
+ * or completed exceptionally if the operation fails.
+ */
+ public static <T> CompletableFuture<T> scheduleRetry(
+ Callable<CompletableFuture<T>> operation,
+ long delay,
+ TimeUnit unit,
+ ScheduledExecutorService executor,
+ @Nullable Runnable onComplete
+ ) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+
+ executor.schedule(() -> {
+ try {
+ operation.call()
+ .whenComplete((res, e) -> {
+ if (e == null) {
+ future.complete(res);
+ } else {
+ future.completeExceptionally(e);
+ }
+
+ if (onComplete != null) {
+ onComplete.run();
+ }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+
+ if (onComplete != null) {
+ onComplete.run();
+ }
+ }
+ }, delay, unit);
+
+ return future;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java
new file mode 100644
index 00000000000..519e8a31f96
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Mutable holder for retry timeout and attempt count.
+ *
+ * <p>Both fields are packed into a single {@link AtomicLong} to allow a
consistent
+ * atomic read of the combined state via a single {@code get()}. The high 32
bits store
+ * the timeout in milliseconds; the low 32 bits store the attempt count.
+ *
+ * <p>This class intentionally does not override {@link Object#equals(Object)}
or
+ * {@link Object#hashCode()}. Because the state is mutable, value-based
equality
+ * would break the contracts required by {@link java.util.HashMap} and similar
+ * collections. Reference equality (the {@link Object} default) is correct
here.
+ *
+ * <p>The static helper methods {@link #timeout(long)} and {@link
#attempt(long)}
+ * are package-private to allow callers that hold a raw snapshot to extract
fields
+ * without additional reads.
+ */
+public class TimeoutState {
+ /**
+ * Packed representation of timeout and attempt count.
+ * High 32 bits: timeout (ms). Low 32 bits: attempt count.
+ */
+ private final AtomicLong state = new AtomicLong();
+
+ /**
+ * Creates a new {@code TimeoutState} with the default initial timeout and
attempt count of {@code 0}.
+ *
+ * <p>Attempt count {@code 0} acts as a sentinel indicating the state has
been initialized
+ * but not yet advanced. The first call to {@link
#update(TimeoutStrategy)} will set the
+ * timeout to {@link TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and
increment the count to {@code 1}.
+ */
+ public TimeoutState() {
+ this(DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 0);
+ }
+
+ /**
+ * Creates a new {@code TimeoutState} with the given initial timeout and
attempt count.
+ *
+ * @param timeout initial timeout in milliseconds.
+ * @param attempt attempt count. Use {@code 0} as a sentinel to indicate
+ * "initialized but not yet advanced" when lazy
initialization is needed.
+ */
+ public TimeoutState(int timeout, int attempt) {
+ state.set(pack(timeout, attempt));
+ }
+
+ /**
+ * Returns the current retry timeout in milliseconds.
+ *
+ * <p>This is a single atomic read.
+ *
+ * @return current timeout in milliseconds.
+ */
+ public int getTimeout() {
+ return timeout(state.get());
+ }
+
+ /**
+ * Returns the current attempt count.
+ *
+ * <p>This is a single atomic read.
+ *
+ * @return current attempt count.
+ */
+ public int getAttempt() {
+ return attempt(state.get());
+ }
+
+ /**
+ * Advances the retry state using the given strategy.
+ *
+ * <p>If the current attempt count is {@code 0} (the initial sentinel),
the timeout is reset
+ * to {@link TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and the
attempt count is set to {@code 1}.
+ * On subsequent calls, the timeout is computed by {@link
TimeoutStrategy#next(int)} and the
+ * attempt count is incremented.
+ *
+ * <p>This method is package-private because callers are responsible for
external synchronization.
+ * The only intended call site is inside {@link
java.util.concurrent.ConcurrentHashMap#compute} in
+ * {@link KeyBasedRetryContext#updateAndGetState}, which holds an
exclusive per-key lock for the
+ * duration of the lambda, so no concurrent access to the same instance is
possible.
+ *
+ * @param timeoutStrategy strategy used to compute the next timeout value.
+ */
+ void update(TimeoutStrategy timeoutStrategy) {
+ long raw = state.get();
+
+ int nextTimeout = attempt(raw) == 0
+ ? DEFAULT_RETRY_INITIAL_TIMEOUT_MS
+ : timeoutStrategy.next(timeout(raw));
+
+ state.set(pack(nextTimeout, attempt(raw) + 1));
+ }
+
+ /**
+ * Packs timeout and attempt count into a single {@code long}.
+ * Timeout occupies the high 32 bits; attempt occupies the low 32 bits.
+ *
+ * @param timeout timeout in milliseconds.
+ * @param attempt attempt count.
+ * @return packed {@code long} value.
+ */
+ static long pack(int timeout, int attempt) {
+ return ((long) timeout << 32) | (attempt & 0xFFFFFFFFL);
+ }
+
+ /**
+ * Extracts the timeout from a packed raw state value.
+ *
+ * @param packed raw state value produced by {@link #pack(int, int)} or
read directly
+ * from the underlying {@link AtomicLong}.
+ * @return timeout in milliseconds.
+ */
+ static int timeout(long packed) {
+ return (int) (packed >>> 32);
+ }
+
+ /**
+ * Extracts the attempt count from a packed raw state value.
+ *
+ * @param packed raw state value produced by {@link #pack(int, int)} or
read directly
+ * from the underlying {@link AtomicLong}.
+ * @return attempt count.
+ */
+ static int attempt(long packed) {
+ return (int) packed;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java
new file mode 100644
index 00000000000..f213c2493f8
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+/**
+ * Stateless strategy for computing the next retry timeout based on the
current one.
+ *
+ * <p>Implementations must be stateless and thread-safe — the same instance is
expected
+ * to be shared across multiple callers and retry contexts. All mutable state
(current
+ * timeout, attempt count) is maintained externally by the caller or a retry
context.
+ *
+ * @see ExponentialBackoffTimeoutStrategy
+ * @see NoopTimeoutStrategy
+ */
+public interface TimeoutStrategy {
+ /** Default maximum timeout that a strategy may produce, in milliseconds.
*/
+ int DEFAULT_RETRY_TIMEOUT_MS_MAX = 11_000;
+
+ /** Initial timeout used at the start of a retry sequence, before any
backoff is applied, in milliseconds. */
+ int DEFAULT_RETRY_INITIAL_TIMEOUT_MS = 20;
+
+ /**
+ * Computes the next retry timeout based on the current one.
+ *
+ * <p>Implementations must not produce a value exceeding {@link
#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
+ * The returned value is used directly as the delay before the next retry
attempt.
+ *
+ * @param currentTimeout current retry timeout in milliseconds.
+ * @return next retry timeout in milliseconds, capped at {@link
#DEFAULT_RETRY_TIMEOUT_MS_MAX}.
+ */
+ int next(int currentTimeout);
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java
new file mode 100644
index 00000000000..c5498b39029
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link ExponentialBackoffTimeoutStrategy}.
+ *
+ * <p>Verifies the correctness of exponential timeout progression, the maximum
timeout
+ * ceiling, and optional jitter behavior. Tests use predictable integer
arithmetic to
+ * make expected values easy to verify by hand.
+ */
+public class ExponentialBackoffTimeoutStrategyTest {
+ /** Initial timeout passed to {@link TimeoutStrategy#next(int)} as the
starting value. */
+ private static final int INITIAL_TIMEOUT = 20;
+
+ /** Maximum timeout the strategy is allowed to produce. */
+ private static final int MAX_TIMEOUT = 100;
+
+ /** Backoff coefficient used by the default strategy instance. Doubles
each timeout step. */
+ private static final double BACKOFF_COEFFICIENT = 2.0;
+
+ /** Strategy instance under test, recreated before each test. */
+ private TimeoutStrategy timeoutStrategy;
+
+ /**
+ * Creates a fresh {@link ExponentialBackoffTimeoutStrategy} with {@link
#MAX_TIMEOUT}
+ * and {@link #BACKOFF_COEFFICIENT}, without jitter, before each test.
+ */
+ @BeforeEach
+ void setUp() {
+ timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT,
BACKOFF_COEFFICIENT);
+ }
+
+ /**
+ * Verifies that a single call to {@link TimeoutStrategy#next(int)} returns
+ * {@code currentTimeout * backoffCoefficient}.
+ *
+ * <p>This is the core contract of the exponential strategy — each step
multiplies
+ * the current timeout by the configured coefficient.
+ */
+ @Test
+ void testGettingNextTimeout() {
+ assertEquals(BACKOFF_COEFFICIENT * INITIAL_TIMEOUT,
timeoutStrategy.next(INITIAL_TIMEOUT));
+ }
+
+ /**
+ * Verifies that the timeout progression reaches {@link #MAX_TIMEOUT}
within the
+ * expected number of steps and does not exceed it on subsequent calls.
+ *
+ * <p>The upper bound on steps is computed from the coefficient and the
ratio of
+ * {@code MAX_TIMEOUT} to {@code INITIAL_TIMEOUT}. If the strategy fails
to reach
+ * the cap within this bound, the test fails with a descriptive message.
Once the
+ * cap is reached, a further call to {@link TimeoutStrategy#next(int)}
must return
+ * exactly {@link #MAX_TIMEOUT}.
+ */
+ @Test
+ void testMaxTimeoutNotExceeded() {
+ int maxSteps = 3;
+ int steps = 0;
+
+ int timeout = INITIAL_TIMEOUT;
+ do {
+ timeout = timeoutStrategy.next(timeout);
+
+ assertTrue(++steps <= maxSteps,
+ "Strategy did not reach MAX_TIMEOUT within expected steps,
last timeout: " + timeout);
+ } while (timeout < MAX_TIMEOUT);
+
+ assertEquals(MAX_TIMEOUT, timeout);
+ assertEquals(MAX_TIMEOUT, timeoutStrategy.next(timeout));
+ }
+
+ /**
+ * Verifies that when jitter is enabled, the returned timeout falls within
the
+ * expected randomized range {@code [raw / 2, raw * 1.5]}, capped at
{@link #MAX_TIMEOUT}.
+ *
+ * <p>A separate strategy instance with jitter enabled is created for this
test.
+ */
+ @Test
+ void testJitterApplying() {
+ timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT,
BACKOFF_COEFFICIENT, true);
+
+ for (int i = 0; i < 100; i++) {
+ int timeout = timeoutStrategy.next(INITIAL_TIMEOUT);
+ int raw = (int) (INITIAL_TIMEOUT * BACKOFF_COEFFICIENT);
+ int lo = raw / 2;
+ int hi = raw + lo;
+
+ assertTrue(
+ lo <= timeout && timeout <= Math.min(hi, MAX_TIMEOUT),
+ "Timeout is out of range: " + timeout + " (expected [" +
lo + ", " + Math.min(hi, MAX_TIMEOUT) + "])"
+ );
+ }
+ }
+
+ /**
+ * Verifies that when jitter is enabled, the returned timeout does not
exceed
+ * the maximum timeout set for the strategy.
+ */
+ @Test
+ void testJitterDoesntCauseMaxTimeoutExceeded() {
+ timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT,
BACKOFF_COEFFICIENT, true);
+
+ int timeout = MAX_TIMEOUT;
+ timeout = timeoutStrategy.next(timeout);
+ assertEquals(MAX_TIMEOUT, timeout);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java
new file mode 100644
index 00000000000..6cd8dc1fda1
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS;
+import static
org.apache.ignite.internal.util.retry.TimeoutStrategy.DEFAULT_RETRY_TIMEOUT_MS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Unit tests for {@link KeyBasedRetryContext}.
+ *
+ * <p>Verifies per-key state isolation, sequential timeout progression, reset
behavior,
+ * registry size limit enforcement, fallback state for overflow keys, and
thread safety
+ * under concurrent updates to both the same and different keys.
+ *
+ * <p>A deterministic {@link TestProgressiveTimeoutStrategy} with a fixed
multiplier is
+ * used to make expected timeout values easy to compute by hand.
+ */
+public class KeyBasedRetryContextTest {
+ /** Message included in exceptions thrown when an expected state is
absent. */
+ private static final String MISSING_STATE_MESSAGE = "TimeoutState is
missing!";
+
+ /**
+ * Multiplier applied by {@link TestProgressiveTimeoutStrategy} on each
step.
+ * Used to compute expected timeout values in assertions.
+ */
+ private static final int MULTIPLYING_COEFFICIENT = 4;
+
+ /** Initial timeout passed to the {@link KeyBasedRetryContext} under test.
*/
+ private static final int INITIAL_TIMEOUT = 20;
+
+ /**
+ * Maximum timeout configured in {@link TestProgressiveTimeoutStrategy}.
+ * The progression is capped at this value.
+ */
+ private static final int MAX_TIMEOUT = 1_000;
+
+ /**
+ * Registry size limit mirrored from {@link KeyBasedRetryContext}.
+ * Used in tests that fill the registry to verify fallback behavior.
+ */
+ private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+ /** Primary key used in single-key tests. */
+ private static final String KEY = "key";
+
+ /** Secondary key used in isolation and reset tests. */
+ private static final String OTHER_KEY = "other-key";
+
+ /** Retry context under test, recreated before each test. */
+ private KeyBasedRetryContext retryContext;
+
+ /**
+ * Creates a fresh {@link KeyBasedRetryContext} with
+ * a {@link TestProgressiveTimeoutStrategy} before each test.
+ */
+ @BeforeEach
+ void setUp() {
+ retryContext = new KeyBasedRetryContext(new
TestProgressiveTimeoutStrategy());
+ }
+
+ /**
+ * Verifies that {@link KeyBasedRetryContext#getState(String)} returns an
empty
+ * {@link java.util.Optional} for an untracked key, and that after the
first call
+ * to {@link KeyBasedRetryContext#updateAndGetState(String)}, the state is
present
+ * with {@link TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and
attempt count {@code 1}.
+ */
+ @Test
+ void testGettingState() {
+ assertFalse(retryContext.getState(KEY).isPresent());
+
+ retryContext.updateAndGetState(KEY);
+
+ assertTrue(retryContext.getState(KEY).isPresent());
+
+ TimeoutState state = retryContext.getState(KEY).get();
+
+ assertEquals(DEFAULT_RETRY_INITIAL_TIMEOUT_MS, state.getTimeout());
+ assertEquals(1, state.getAttempt());
+ }
+
+ /**
+ * Verifies that {@link KeyBasedRetryContext#updateAndGetState(String)}
returns the
+ * same object reference as {@link KeyBasedRetryContext#getState(String)},
and that
+ * the timeout advances correctly after multiple calls for the same key.
+ *
+ * <p>After three updates, the expected timeout is
+ * {@code DEFAULT_RETRY_INITIAL_TIMEOUT_MS * MULTIPLYING_COEFFICIENT^2}
with attempt count {@code 3}.
+ */
+ @Test
+ void testUpdatingAndGettingState() {
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+ TimeoutState returnedState = retryContext.updateAndGetState(KEY);
+ TimeoutState observedState = retryContext.getState(KEY)
+ .orElseThrow(() -> new
IllegalStateException(MISSING_STATE_MESSAGE));
+
+ assertSame(returnedState, observedState);
+
+ checkRetryContextState(KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS *
MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3);
+ }
+
+ /**
+ * Verifies that each key maintains its own independent backoff
progression.
+ *
+ * <p>Advances {@link #KEY} three times and {@link #OTHER_KEY} once, then
asserts
+ * that each key holds the timeout and attempt count corresponding only to
its own
+ * update history, with no cross-key interference.
+ */
+ @Test
+ void testStatesAreIsolatedPerKey() {
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+
+ retryContext.updateAndGetState(OTHER_KEY);
+
+ checkRetryContextState(KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS *
MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3);
+ checkRetryContextState(OTHER_KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 1);
+ }
+
+ /**
+ * Verifies that {@link KeyBasedRetryContext#resetState(String)} removes
the state
+ * for the given key, causing {@link
KeyBasedRetryContext#getState(String)} to return
+ * an empty {@link java.util.Optional} after the reset.
+ */
+ @Test
+ void testResettingState() {
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+
+ checkRetryContextState(KEY, INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT
* MULTIPLYING_COEFFICIENT, 3);
+
+ retryContext.resetState(KEY);
+
+ assertFalse(retryContext.getState(KEY).isPresent());
+ }
+
+ /**
+ * Verifies that resetting one key does not affect the state of other keys.
+ *
+ * <p>Advances {@link #KEY} twice and {@link #OTHER_KEY} once, resets
{@link #KEY},
+ * then asserts that {@link #KEY} is absent while {@link #OTHER_KEY}
retains its state.
+ */
+ @Test
+ void testResettingStateDoesNotAffectOtherKeys() {
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(OTHER_KEY);
+
+ retryContext.resetState(KEY);
+
+ assertFalse(retryContext.getState(KEY).isPresent());
+ checkRetryContextState(OTHER_KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 1);
+ }
+
+ /**
+ * Verifies that when the registry is at capacity, a new key passed to
+ * {@link KeyBasedRetryContext#updateAndGetState(String)} receives the
fallback
+ * {@link TimeoutState} with {@link
TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX} and attempt {@code -1}.
+ */
+ @Test
+ void testFallbackWhenRegistryIsFull() {
+ // Fill registry to the limit
+ IntStream.range(0, REGISTRY_SIZE_LIMIT)
+ .mapToObj(i -> "key-" + i)
+ .forEach(retryContext::updateAndGetState);
+
+ // New key should get fallback
+ TimeoutState state =
retryContext.updateAndGetState("new-key-beyond-limit");
+
+ assertEquals(DEFAULT_RETRY_TIMEOUT_MS_MAX, state.getTimeout());
+ assertEquals(-1, state.getAttempt());
+ }
+
+ /**
+ * Verifies that a key already tracked in the registry continues to
progress normally
+ * even when the registry has reached its size limit.
+ *
+ * <p>This guards against the regression where the overflow check
incorrectly
+ * blocked updates for existing keys rather than only for new ones.
+ */
+ @Test
+ void testExistingKeyStillProgressesWhenRegistryIsFull() {
+ retryContext.updateAndGetState(KEY);
+
+ // Fill registry to the limit with other keys
+ IntStream.range(0, REGISTRY_SIZE_LIMIT - 1)
+ .mapToObj(i -> "key-" + i)
+ .forEach(retryContext::updateAndGetState);
+
+ // Existing key should still progress normally
+ retryContext.updateAndGetState(KEY);
+ retryContext.updateAndGetState(KEY);
+
+ checkRetryContextState(KEY, DEFAULT_RETRY_INITIAL_TIMEOUT_MS *
MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3);
+ }
+
+ /**
+ * Verifies that {@link KeyBasedRetryContext#getState(String)} returns the
fallback
+ * {@link TimeoutState} (with {@link
TimeoutStrategy#DEFAULT_RETRY_TIMEOUT_MS_MAX} and attempt {@code -1}) for a key
+ * that is not tracked when the registry is full.
+ */
+ @Test
+ void testGetStateReturnsFallbackWhenRegistryIsFull() {
+ IntStream.range(0, REGISTRY_SIZE_LIMIT)
+ .mapToObj(i -> "key-" + i)
+ .forEach(retryContext::updateAndGetState);
+
+ Optional<TimeoutState> state =
retryContext.getState("new-key-beyond-limit");
+
+ assertTrue(state.isPresent());
+ assertEquals(DEFAULT_RETRY_TIMEOUT_MS_MAX, state.get().getTimeout());
+ assertEquals(-1, state.get().getAttempt());
+ }
+
+ /**
+ * Verifies that concurrent calls to {@link
KeyBasedRetryContext#updateAndGetState(String)}
+ * for the same key from multiple threads all succeed, and that the final
state reflects
+ * exactly {@code attemptsNumber} advancements.
+ *
+ * <p>This exercises the per-key locking in {@link
java.util.concurrent.ConcurrentHashMap#compute}
+ * and the CAS loop inside {@link TimeoutState}.
+ *
+ * @throws Exception if the thread pool is interrupted during shutdown.
+ */
+ @Test
+ @Timeout(value = 5, unit = SECONDS)
+ void testConcurrentStateUpdatingSameKey() throws Exception {
+ ExecutorService threadPool = Executors.newFixedThreadPool(5);
+
+ int attemptsNumber = 20;
+
+ List<Future<TimeoutState>> futures = IntStream.range(0, attemptsNumber)
+ .mapToObj(i -> threadPool.submit(() ->
retryContext.updateAndGetState(KEY)))
+ .collect(toList());
+
+ try {
+ futures.forEach(fut -> {
+ try {
+ fut.get();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+
+ checkRetryContextState(KEY, MAX_TIMEOUT, attemptsNumber);
+ } finally {
+ threadPool.shutdown();
+ threadPool.awaitTermination(5, SECONDS);
+ }
+ }
+
+ /**
+ * Verifies that concurrent calls to {@link
KeyBasedRetryContext#updateAndGetState(String)}
+ * for different keys do not interfere with each other.
+ *
+ * <p>Each key is updated exactly once from a dedicated task. After all
tasks complete,
+ * each key must hold {@link
TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS} and attempt count {@code 1},
confirming
+ * no cross-key state contamination under concurrency.
+ *
+ * @throws Exception if the thread pool is interrupted during shutdown.
+ */
+ @Test
+ @Timeout(value = 5, unit = SECONDS)
+ void testConcurrentStateUpdatingDifferentKeys() throws Exception {
+ ExecutorService threadPool = Executors.newFixedThreadPool(5);
+
+ List<String> keys = List.of("key-1", "key-2", "key-3", "key-4",
"key-5");
+
+ List<Future<TimeoutState>> futures = keys.stream()
+ .map(key -> threadPool.submit(() ->
retryContext.updateAndGetState(key)))
+ .collect(toList());
+
+ try {
+ futures.forEach(KeyBasedRetryContextTest::getQuietly);
+
+ keys.forEach(key -> checkRetryContextState(key,
DEFAULT_RETRY_INITIAL_TIMEOUT_MS, 1));
+ } finally {
+ threadPool.shutdown();
+ threadPool.awaitTermination(5, SECONDS);
+ }
+ }
+
+ /**
+ * Asserts that the retry context holds the expected timeout and attempt
count for
+ * the given key.
+ *
+ * <p>Fails with {@link #MISSING_STATE_MESSAGE} if the state is absent.
+ *
+ * @param key the key whose state to check.
+ * @param expectedTimeout expected current timeout in milliseconds.
+ * @param expectedAttempts expected current attempt count.
+ */
+ private void checkRetryContextState(String key, int expectedTimeout, int
expectedAttempts) {
+ retryContext.getState(key).ifPresentOrElse(state -> {
+ assertEquals(expectedTimeout, state.getTimeout(), "[DEBUG_LOG]
Timeout mismatch for key: " + key);
+ assertEquals(expectedAttempts, state.getAttempt(), "[DEBUG_LOG]
Attempt mismatch for key: " + key);
+ }, () -> {
+ throw new IllegalStateException(MISSING_STATE_MESSAGE);
+ });
+ }
+
+ /**
+ * A deterministic {@link TimeoutStrategy} that multiplies the current
timeout by
+ * {@link #MULTIPLYING_COEFFICIENT} on each step, capped at {@link
#MAX_TIMEOUT}.
+ *
+ * <p>Using integer multiplication rather than a floating-point
coefficient avoids
+ * rounding ambiguity, making expected values in test assertions exact and
easy to
+ * compute by hand.
+ */
+ private static class TestProgressiveTimeoutStrategy implements
TimeoutStrategy {
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Multiplies {@code currentTimeout} by {@link
#MULTIPLYING_COEFFICIENT},
+ * capped at {@link #MAX_TIMEOUT}.
+ */
+ @Override
+ public int next(int currentTimeout) {
+ return Math.min(currentTimeout * MULTIPLYING_COEFFICIENT,
MAX_TIMEOUT);
+ }
+ }
+
+ /**
+ * Waits for the given {@link Future} to complete and returns its result.
+ *
+ * <p>Wraps checked exceptions as {@link AssertionError} so they propagate
cleanly
+ * through {@link java.util.function.Consumer} lambdas in test code
without requiring
+ * explicit try-catch blocks.
+ *
+ * <ul>
+ * <li>{@link ExecutionException} — wraps the cause as an {@link
AssertionError},
+ * preserving the original exception for diagnosis.</li>
+ * <li>{@link InterruptedException} — restores the interrupt flag and
wraps
+ * as an {@link AssertionError}.</li>
+ * </ul>
+ *
+ * @param <T> the future's result type.
+ * @param future the future to wait for.
+ * @return the future's result.
+ * @throws AssertionError if the future completed exceptionally or the
thread was interrupted.
+ */
+ private static <T> T getQuietly(Future<T> future) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ throw new AssertionError("Future completed exceptionally",
e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("Interrupted while waiting for future",
e);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java
new file mode 100644
index 00000000000..5672d0c88ac
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link TimeoutState}.
+ *
+ * <p>Verifies the correctness of initial state construction, atomic CAS
updates,
+ * stale-snapshot rejection, and the consistency between the raw packed {@code
long}
+ * and the individual accessor methods.
+ */
+public class TimeoutStateTest {
+ /** Timeout value used to construct the shared {@link TimeoutState}
instance. */
+ private static final int TIMEOUT = 20;
+
+ /** Attempt count used to construct the shared {@link TimeoutState}
instance. */
+ private static final int ATTEMPT = 10;
+
+ /** Shared state instance recreated before each test. */
+ private TimeoutState state;
+
+ /**
+ * Creates a fresh {@link TimeoutState} with {@link #TIMEOUT} and {@link
#ATTEMPT}
+ * before each test to ensure full isolation.
+ */
+ @BeforeEach
+ void setUp() {
+ state = new TimeoutState(TIMEOUT, ATTEMPT);
+ }
+
+ /**
+ * Verifies that newly constructed {@link TimeoutState} returns the
default initial timeout
+ * and attempt count of {@code 0}.
+ */
+ @Test
+ void testDefaultInitialState() {
+ TimeoutState defaultState = new TimeoutState();
+ assertEquals(TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS,
defaultState.getTimeout());
+ assertEquals(0, defaultState.getAttempt());
+ }
+
+ /**
+ * Verifies that a newly constructed {@link TimeoutState} returns the
timeout and
+ * attempt values it was initialized with.
+ */
+ @Test
+ void testInitialState() {
+ assertEquals(TIMEOUT, state.getTimeout());
+ assertEquals(ATTEMPT, state.getAttempt());
+ }
+
+ /**
+ * Verifies that {@link TimeoutState#update(TimeoutStrategy)} correctly
advances
+ * both timeout and attempt count.
+ */
+ @Test
+ void testUpdate() {
+ int nextTimeout = 100;
+ TimeoutStrategy strategy = current -> nextTimeout;
+
+ state.update(strategy);
+
+ assertEquals(nextTimeout, state.getTimeout());
+ assertEquals(ATTEMPT + 1, state.getAttempt());
+ }
+
+ /**
+ * Verifies that {@link TimeoutState#update(TimeoutStrategy)} starting
from attempt {@code 0}
+ * resets the timeout to {@link
TimeoutStrategy#DEFAULT_RETRY_INITIAL_TIMEOUT_MS}
+ * and sets attempt count to {@code 1}.
+ */
+ @Test
+ void testUpdateFromZeroAttempt() {
+ TimeoutState zeroState = new TimeoutState(1000, 0);
+ TimeoutStrategy strategy = current -> 2000; // Should be ignored
+
+ zeroState.update(strategy);
+
+ assertEquals(TimeoutStrategy.DEFAULT_RETRY_INITIAL_TIMEOUT_MS,
zeroState.getTimeout());
+ assertEquals(1, zeroState.getAttempt());
+ }
+
+ /**
+ * Verifies that {@link TimeoutState#getTimeout()} and {@link
TimeoutState#getAttempt()}
+ * are consistent with the raw packed value.
+ */
+ @Test
+ void testGetTimeoutAndGetAttemptAreConsistentWithPacked() {
+ long packed = TimeoutState.pack(TIMEOUT, ATTEMPT);
+
+ assertEquals(TIMEOUT, timeout(packed));
+ assertEquals(ATTEMPT, attempt(packed));
+ }
+
+ /**
+ * Verifies that {@link TimeoutState#pack(int, int)} followed by
+ * {@link TimeoutState#timeout(long)} and {@link
TimeoutState#attempt(long)}
+ * recovers the original values exactly.
+ *
+ * <p>Tests the bit-level correctness of the packing scheme independently
of
+ * the {@link TimeoutState} object lifecycle.
+ */
+ @Test
+ void testPackUnpackRoundtrip() {
+ long packed = TimeoutState.pack(TIMEOUT, ATTEMPT);
+
+ assertEquals(TIMEOUT, timeout(packed));
+ assertEquals(ATTEMPT, attempt(packed));
+ }
+}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 8696887c30e..cf8c5778268 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -260,6 +260,7 @@ import
org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.NetworkAddress;
@@ -1476,7 +1477,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
transactionInflights,
lowWatermark,
commonScheduledExecutorService,
- metricManager
+ metricManager,
+ new NoopTimeoutStrategy()
);
replicaManager = spy(new ReplicaManager(
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 51a7a9ffeef..389bd47b822 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -197,6 +197,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -644,7 +645,8 @@ public class Node {
transactionInflights,
lowWatermark,
threadPoolsManager.commonScheduler(),
- metricManager
+ metricManager,
+ new NoopTimeoutStrategy()
);
volatileLogStorageManagerCreator = new
VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-"
+ name));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index b35f59aaae1..a5233c7c47a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -239,6 +239,7 @@ import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource;
import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
@@ -697,7 +698,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
lowWatermark,
threadPoolsManager.commonScheduler(),
failureProcessor,
- metricManager
+ metricManager,
+ new NoopTimeoutStrategy()
);
ResourceVacuumManager resourceVacuumManager = new
ResourceVacuumManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index afe9e7d4cd0..8d29f79d49a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -302,6 +302,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import org.apache.ignite.internal.util.retry.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource;
@@ -1127,7 +1128,8 @@ public class IgniteImpl implements Ignite {
lowWatermark,
threadPoolsManager.commonScheduler(),
failureManager,
- metricManager
+ metricManager,
+ new ExponentialBackoffTimeoutStrategy()
);
sharedTxStateStorage = new TxStateRocksDbSharedStorage(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 5d3941cc864..2eb12260ea8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -101,6 +101,7 @@ import
org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.type.StructNativeType;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
@@ -197,7 +198,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
transactionInflights,
new TestLowWatermark(),
commonExecutor,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ new NoopTimeoutStrategy()
);
assertThat(txManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index 024a2d6aee9..ad0224c6794 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
@@ -172,7 +173,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
transactionInflights,
lowWatermark,
commonExecutor,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ new NoopTimeoutStrategy()
);
}
};
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index b37ea5bfb5e..a68006cc8e0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -81,6 +81,7 @@ import
org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaReques
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
@@ -162,7 +163,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
transactionInflights,
lowWatermark,
commonExecutor,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ new NoopTimeoutStrategy()
) {
@Override
public Executor writeIntentSwitchExecutor() {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index c02dd8f7d6a..d100c358722 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -120,6 +120,7 @@ import
org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.QualifiedName;
@@ -224,7 +225,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
transactionInflights,
new TestLowWatermark(),
commonExecutor,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ new NoopTimeoutStrategy()
) {
@Override
public CompletableFuture<Void> finish(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a19ca2b08eb..007b0f5cea4 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -197,6 +197,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
@@ -654,7 +655,8 @@ public class ItTxTestCluster {
lowWatermark,
executor,
new NoOpFailureManager(),
- new TestMetricManager()
+ new TestMetricManager(),
+ new NoopTimeoutStrategy()
);
}
@@ -1341,7 +1343,8 @@ public class ItTxTestCluster {
lowWatermark,
executor,
new NoOpFailureManager(),
- new TestMetricManager()
+ new TestMetricManager(),
+ new NoopTimeoutStrategy()
);
clientResourceVacuumManager = new ResourceVacuumManager(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 56852c04886..de75440356a 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -150,6 +150,7 @@ import
org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
@@ -719,7 +720,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
transactionInflights,
new TestLowWatermark(),
COMMON_SCHEDULER,
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ new NoopTimeoutStrategy()
);
assertThat(txManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java
new file mode 100644
index 00000000000..bec5f443802
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx.distributed;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.TimeoutState;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests verifying the retry behavior of durable transaction finish
+ * ({@link TxFinishReplicaRequest}) in a 3-node Apache Ignite cluster.
+ *
+ * <p>Each test drops or intercepts {@link TxFinishReplicaRequest} messages on
all nodes
+ * to simulate network failures, then verifies that:
+ * <ul>
+ * <li>the finish operation is retried as expected;</li>
+ * <li>retry timeouts grow exponentially between attempts;</li>
+ * <li>the retry context is cleaned up after success;</li>
+ * <li>no unnecessary retries occur when the first attempt succeeds.</li>
+ * </ul>
+ *
+ * <p>Each test creates a single-partition, 3-replica zone and table in {@code
@BeforeEach}.
+ * Tests that require additional zones/tables create them locally.
+ */
+public class ItDurableFinishFailureTest extends ClusterPerTestIntegrationTest {
+ /**
+ * Thread name fragment identifying the scheduler thread that sends retry
attempts.
+ * Used to distinguish retried messages from the original commit attempt
in message interceptors.
+ */
+ private static final String RETRY_THREAD_NAME = "common-scheduler";
+
+ /** Metric name for the total number of committed transactions, used to
verify commit completion. */
+ private static final String TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME =
"TotalCommits";
+
+ /** Name of the default test table created in {@code @BeforeEach}. */
+ private static final String TABLE_NAME = "test_table";
+
+ /** Number of replicas for all test zones. */
+ private static final int REPLICAS = 3;
+
+ /**
+ * Creates a single-partition, 3-replica zone and a test table before each
test.
+ * Tests that require additional tables or zones create them locally.
+ */
+ @BeforeEach
+ public void setup() {
+ String zoneSql = "create zone test_zone (partitions 1, replicas " +
REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String tableSql = "create table " + TABLE_NAME + " (key bigint primary
key, val varchar(20)) zone TEST_ZONE";
+
+ sql(zoneSql);
+ sql(tableSql);
+ }
+
+ /**
+ * Verifies that no retry occurs when the durable finish succeeds on the
first attempt.
+ *
+ * <p>Installs a message interceptor that counts {@link
TxFinishReplicaRequest} messages
+ * without dropping any of them. After the commit completes, asserts that
exactly one
+ * finish message was sent across all nodes.
+ */
+ @Test
+ public void testNoRetryOnSuccessfulFinish() {
+ IgniteImpl node = anyNode();
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val)
values (1, 'val-1')");
+
+ AtomicInteger finishAttempts = new AtomicInteger();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ finishAttempts.incrementAndGet();
+ }
+
+ return false;
+ });
+ }
+
+ tx.commitAsync();
+
+ await().timeout(5, SECONDS).until(() -> commitedTransactions(node) ==
1);
+
+ await().timeout(1, SECONDS).until(() -> finishAttempts.get() >= 1);
+
+ assertEquals(1, finishAttempts.get());
+ }
+
+ /**
+ * Verifies that the durable finish is retried after a single failure.
+ *
+ * <p>Drops the first {@link TxFinishReplicaRequest} on all nodes to
simulate a transient
+ * network failure. On the subsequent retry, captures a snapshot of the
retry context
+ * across all nodes and asserts that exactly one node has an active retry
entry for the
+ * transaction. After the commit completes, asserts that the retry context
is fully cleaned up.
+ */
+ @Test
+ public void testDurableFinishRetry() {
+ IgniteImpl node = anyNode();
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val)
values (1, 'val-1')");
+
+ AtomicInteger failedFinishAttempts = new AtomicInteger();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ AtomicLong expectedSizeOfRetryContext = new AtomicLong(0);
+
+ for (IgniteImpl n : runningNodesIter()) {
+ retryContexts.add((KeyBasedRetryContext) ((TxManagerImpl)
n.txManager()).retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ if (failedFinishAttempts.get() == 0) {
+ // Makes durable finish fail with replication timeout,
on the first attempt.
+ return failedFinishAttempts.incrementAndGet() == 1;
+ }
+
+ expectedSizeOfRetryContext.compareAndSet(0,
retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot)
+ .filter(retryContextSnapshot ->
retryContextSnapshot.size() == 1)
+ .count());
+ }
+
+ return false;
+ });
+ }
+
+ tx.commitAsync();
+
+ await().timeout(5, SECONDS).until(() -> failedFinishAttempts.get() ==
1);
+
+ await().timeout(5, SECONDS).until(() -> commitedTransactions(node) ==
1);
+
+ assertEquals(1, expectedSizeOfRetryContext.get());
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Verifies that retry timeouts grow monotonically between consecutive
retry attempts.
+ *
+ * <p>Drops the first 3 {@link TxFinishReplicaRequest} messages. For each
retry attempt
+ * arriving on the scheduler thread, captures the current timeout from the
retry context
+ * snapshot. After all drops and the final successful commit, asserts that
the captured
+ * timeout sequence is strictly increasing, confirming exponential backoff
behavior.
+ *
+ * <p>Timeout samples are captured only on the retry scheduler thread
+ * (identified by {@link #RETRY_THREAD_NAME}) to ensure we observe
post-advancement values
+ * rather than the stale pre-drop state.
+ */
+ @Test
+ public void testRetryWithGrowingTimeout() {
+ IgniteImpl node = anyNode();
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val)
values (1, 'val-1')");
+
+ AtomicInteger failedFinishAttempts = new AtomicInteger();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ List<Integer> timeoutSamples = new CopyOnWriteArrayList<>();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+ retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ if
(Thread.currentThread().getName().contains(RETRY_THREAD_NAME)) {
+ retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot)
+ .filter(retryContextSnapshot ->
retryContextSnapshot.size() == 1)
+ .flatMap(retryContextSnapshot ->
retryContextSnapshot.values().stream())
+ .forEach(timeoutState ->
timeoutSamples.add(timeoutState.getTimeout()));
+ }
+
+ if (failedFinishAttempts.getAndIncrement() < 3) {
+ return true;
+ }
+
+ return false;
+ }
+
+ return false;
+ });
+ }
+
+ tx.commitAsync();
+
+ await().timeout(5, SECONDS).until(() -> failedFinishAttempts.get() ==
3);
+
+ await().timeout(5, SECONDS).until(() -> commitedTransactions(node) ==
1);
+
+ assertTrue(timeoutSamples.size() > 1, "Expected at least 2 timeout
samples, got: " + timeoutSamples.size());
+
+ for (int i = 1; i < timeoutSamples.size(); i++) {
+ assertTrue(timeoutSamples.get(i - 1) < timeoutSamples.get(i),
"timeout increasing is expected!");
+ }
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Verifies that 100 concurrent transactions are all eventually committed
after transient failures.
+ *
+ * <p>Creates a zone with 25 partitions to distribute load. For each
transaction, drops
+ * the first {@link TxFinishReplicaRequest} (keyed by transaction ID) to
force one retry per
+ * transaction. After all tasks are submitted and the thread pool shuts
down, waits for all
+ * 100 transactions to complete and for all retry context entries to be
cleaned up.
+ *
+ * @throws Exception if the thread pool is interrupted during shutdown.
+ */
+ @Test
+ public void testRetryManyConcurrentDurableFinishes() throws Exception {
+ IgniteImpl node = anyNode();
+
+ String zone1Sql = "create zone test_zone_1 (partitions 25, replicas "
+ REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String table1Sql = "create table test_table_1 (key bigint primary key,
val varchar(20)) zone TEST_ZONE_1";
+
+ sql(zone1Sql);
+ sql(table1Sql);
+
+ Map<String, AtomicInteger> failedCleanupAttempts = new
ConcurrentHashMap<>();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+ retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ String txId = ((TxFinishReplicaRequest)
msg).txId().toString();
+
+ if (failedCleanupAttempts.computeIfAbsent(txId, key -> new
AtomicInteger(0)).getAndIncrement() == 0) {
+ return true;
+ }
+ }
+
+ return false;
+ });
+ }
+
+ ExecutorService threadPool = newFixedThreadPool(10);
+
+ List<? extends Future<?>> futures = IntStream.range(0, 100).mapToObj(i
-> threadPool.submit(() -> {
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into test_table_1 (key, val) values
(?, ?)", i, "val-" + i);
+
+ tx.commitAsync();
+ })).collect(toList());
+
+ try {
+ futures.forEach(ItDurableFinishFailureTest::getQuietly);
+ } finally {
+ threadPool.shutdown();
+ threadPool.awaitTermination(5, SECONDS);
+ }
+
+ await().timeout(15, SECONDS).until(() -> commitedTransactions(node) ==
100);
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Verifies that transactions targeting different replication groups are
tracked
+ * independently in the retry context and converge to the same retry
timeout value.
+ *
+ * <p>Creates two separate single-partition zones with dedicated tables.
Drops the first
+ * two {@link TxFinishReplicaRequest} attempts for each transaction (keyed
by transaction ID)
+ * to advance both retry contexts by the same number of steps. On the
third attempt, captures
+ * a snapshot of all retry context entries across all nodes.
+ *
+ * <p>After both transactions commit, asserts that:
+ * <ul>
+ * <li>exactly two distinct entries are present in the captured
snapshot — one per transaction,
+ * confirming that each transaction is tracked under its own
key;</li>
+ * <li>both entries carry the same timeout value, confirming that
independently progressed
+ * retry contexts reach identical timeouts when starting from the
same initial value
+ * and advancing the same number of steps.</li>
+ * </ul>
+ *
+ * <p>Finally, asserts that both retry context entries are cleaned up
after successful commit.
+ */
+ @Test
+ public void testRetryDurableFinishForDifferentZones() {
+ IgniteImpl node = anyNode();
+
+ String zone1Sql = "create zone test_zone_1 (partitions 1, replicas " +
REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String table1Sql = "create table test_table_1 (key bigint primary key,
val varchar(20)) zone TEST_ZONE_1";
+
+ sql(zone1Sql);
+ sql(table1Sql);
+
+ String zone2Sql = "create zone test_zone_2 (partitions 1, replicas " +
REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String table2Sql = "create table test_table_2 (key bigint primary key,
val varchar(20)) zone TEST_ZONE_2";
+
+ sql(zone2Sql);
+ sql(table2Sql);
+
+ Map<String, AtomicInteger> failedFinishAttempts = new
ConcurrentHashMap<>();
+ Map<String, TimeoutState> timeoutSamples = new ConcurrentHashMap<>();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+ retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ String txId = ((TxFinishReplicaRequest)
msg).txId().toString();
+
+ if (failedFinishAttempts.computeIfAbsent(txId, key -> new
AtomicInteger(0)).getAndIncrement() < 2) {
+ return true;
+ }
+
+ timeoutSamples.putAll(
+ retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot)
+ .filter(snapshot -> !snapshot.isEmpty())
+ .flatMap(snapshot ->
snapshot.entrySet().stream())
+ .collect(toMap(Entry::getKey,
Entry::getValue, (existing, replacement) -> replacement))
+ );
+ }
+
+ return false;
+ });
+ }
+
+ Transaction tx1 = node.transactions().begin();
+ node.sql().execute(tx1, "insert into test_table_1 (key, val) values
(1, 'val-1')");
+
+ tx1.commitAsync();
+
+ Transaction tx2 = node.transactions().begin();
+ node.sql().execute(tx2, "insert into TEST_TABLE_2 (key, val) values
(1, 'val-1')");
+
+ tx2.commitAsync();
+
+ await().timeout(5, SECONDS).until(() -> commitedTransactions(node) ==
2);
+
+ assertEquals(2, timeoutSamples.size(),
+ "Expected timeout state for both transactions, but got: " +
timeoutSamples.keySet());
+
+ List<Integer> collectedTimeouts = timeoutSamples.values().stream()
+ .map(TimeoutState::getTimeout)
+ .distinct()
+ .collect(toList());
+
+ assertEquals(1, collectedTimeouts.size(),
+ "Expected both transactions to have the same timeout value,
but got: " + collectedTimeouts);
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Returns the total number of committed transactions on the given node by
reading
+ * the {@value #TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME} metric from
+ * {@link TransactionMetricsSource}.
+ *
+ * <p>Fails the test immediately if the metric is not found, as this
indicates
+ * a misconfiguration or an unexpected change in metric naming.
+ *
+ * @param node the node to read the metric from.
+ * @return total number of committed transactions.
+ */
+ private static long commitedTransactions(IgniteImpl node) {
+ Iterable<Metric> metrics = node.metricManager()
+ .metricSnapshot()
+ .metrics()
+ .get(TransactionMetricsSource.SOURCE_NAME);
+
+ for (Metric m : metrics) {
+ if (TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME.equals(m.name())) {
+ return ((LongMetric) m).value();
+ }
+ }
+
+ fail();
+
+ return -1;
+ }
+
+ /**
+ * Waits for the given {@link Future} to complete and returns its result.
+ *
+ * <p>Wraps checked exceptions as {@link AssertionError} so they propagate
cleanly
+ * through {@link java.util.function.Consumer} lambdas in test code
without requiring
+ * explicit try-catch blocks.
+ *
+ * <ul>
+ * <li>{@link ExecutionException} — wraps the cause as an {@link
AssertionError},
+ * preserving the original exception for diagnosis.</li>
+ * <li>{@link InterruptedException} — restores the interrupt flag and
wraps
+ * as an {@link AssertionError}.</li>
+ * </ul>
+ *
+ * @param <T> the future's result type.
+ * @param future the future to wait for.
+ * @return the future's result.
+ * @throws AssertionError if the future completed exceptionally or the
thread was interrupted.
+ */
+ private static <T> T getQuietly(Future<T> future) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ throw new AssertionError("Future completed exceptionally",
e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("Interrupted while waiting for future",
e);
+ }
+ }
+}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
index 723e57df6d5..a153c05d332 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java
@@ -17,31 +17,76 @@
package org.apache.ignite.tx.distributed;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS;
import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.TimeoutState;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
- * Tests for transaction cleanup failure.
+ * Integration tests verifying the retry behavior of transaction write intent
cleanup
+ * ({@link WriteIntentSwitchReplicaRequest}) in a 3-node Apache Ignite cluster.
+ *
+ * <p>Each test drops or intercepts {@link WriteIntentSwitchReplicaRequest}
messages on all nodes
+ * to simulate transient network failures during the cleanup phase that
follows a committed
+ * transaction. Tests verify that:
+ * <ul>
+ * <li>cleanup is retried after a transient failure and eventually
succeeds;</li>
+ * <li>retry timeouts grow monotonically between consecutive attempts;</li>
+ * <li>the retry context is cleaned up after successful cleanup;</li>
+ * <li>no unnecessary retries occur when the first cleanup attempt
succeeds.</li>
+ * </ul>
+ *
+ * <p>Each test creates a single-partition, 3-replica zone and table in {@link
#setup()}.
+ * Tests that require additional zones or tables create them locally.
*/
public class ItTxCleanupFailureTest extends ClusterPerTestIntegrationTest {
- /** Table name. */
+ /**
+ * Thread name fragment identifying the thread that sends retry attempts.
+ * Used to distinguish retried messages from the original cleanup attempt
in message interceptors.
+ */
+ private static final String CLEANUP_THREAD_NAME = "tx-async-write-intent";
+
+ /** Name of the default test table created in {@link #setup()}. */
private static final String TABLE_NAME = "test_table";
+
+ /** Number of replicas for all test zones. */
private static final int REPLICAS = 3;
+ /**
+ * Creates a single-partition, 3-replica zone and a test table before each
test.
+ * Tests that require additional tables or zones create them locally.
+ */
@BeforeEach
public void setup() {
String zoneSql = "create zone test_zone (partitions 1, replicas " +
REPLICAS
@@ -52,6 +97,50 @@ public class ItTxCleanupFailureTest extends
ClusterPerTestIntegrationTest {
sql(tableSql);
}
+ /**
+ * Verifies that no retry occurs when the write intent cleanup succeeds on
the first attempt.
+ *
+ * <p>Installs a message interceptor that counts {@link
WriteIntentSwitchReplicaRequest}
+ * messages without dropping any of them. After all write intents are
resolved, asserts
+ * that exactly one cleanup message was sent across all nodes.
+ */
+ @Test
+ public void testNoRetryOnSuccessfulCleanup() {
+ IgniteImpl node = anyNode();
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val)
values (1, 'val-1')");
+
+ AtomicInteger cleanupAttempts = new AtomicInteger();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof WriteIntentSwitchReplicaRequest) {
+ cleanupAttempts.incrementAndGet();
+ }
+ return false;
+ });
+ }
+
+ tx.commitAsync();
+
+ await().timeout(5, SECONDS)
+ .until(() -> pendingWriteIntents(node) == 0);
+
+ await().timeout(1, SECONDS).until(() -> cleanupAttempts.get() >= 1);
+
+ assertEquals(1, cleanupAttempts.get());
+ }
+
+ /**
+ * Verifies that the write intent cleanup is retried after a single
failure.
+ *
+ * <p>Drops the first {@link WriteIntentSwitchReplicaRequest} on all nodes
to simulate
+ * a transient failure. On the subsequent retry — arriving on the write
intent switch
+ * executor thread — captures a snapshot of the retry context across all
nodes and records
+ * how many nodes have an active entry for this transaction. After cleanup
completes,
+ * asserts that exactly one node had an active retry entry during the
retry, and that
+ * the retry context is fully cleaned up.
+ */
@Test
public void testRetry() {
IgniteImpl node = anyNode();
@@ -60,11 +149,26 @@ public class ItTxCleanupFailureTest extends
ClusterPerTestIntegrationTest {
AtomicInteger failedCleanupAttempts = new AtomicInteger();
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ AtomicLong expectedSizeOfRetryContext = new AtomicLong(0);
+
for (IgniteImpl n : runningNodesIter()) {
+ retryContexts.add((KeyBasedRetryContext) ((TxManagerImpl)
n.txManager()).retryContext());
+
n.dropMessages((dest, msg) -> {
- if (msg instanceof WriteIntentSwitchReplicaRequest &&
failedCleanupAttempts.get() == 0) {
- // Makes cleanup fail on write intent switch attempt with
replication timeout, on the first attempt.
- return failedCleanupAttempts.incrementAndGet() == 1;
+ if (msg instanceof WriteIntentSwitchReplicaRequest) {
+ if (failedCleanupAttempts.get() == 0) {
+ // Makes cleanup fail on write intent switch attempt
with replication timeout, on the first attempt.
+ return failedCleanupAttempts.incrementAndGet() == 1;
+ }
+
+ if
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+ expectedSizeOfRetryContext.compareAndSet(0,
retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot)
+ .filter(retryContextSnapshot ->
retryContextSnapshot.size() == 1)
+ .count());
+ }
}
return false;
@@ -73,12 +177,266 @@ public class ItTxCleanupFailureTest extends
ClusterPerTestIntegrationTest {
tx.commitAsync();
- await().timeout(5, TimeUnit.SECONDS).until(() ->
failedCleanupAttempts.get() == 1);
+ await().timeout(5, SECONDS).until(() -> failedCleanupAttempts.get() ==
1);
+
+ await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) ==
0);
- // Checks that cleanup finally succeeded.
- await().timeout(5, TimeUnit.SECONDS).until(() ->
pendingWriteIntents(node) == 0);
+ assertEquals(1, expectedSizeOfRetryContext.get());
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
}
+ /**
+ * Verifies that retry timeouts grow monotonically between consecutive
retry attempts.
+ *
+ * <p>Drops the first 3 {@link WriteIntentSwitchReplicaRequest} messages
arriving on the
+ * write intent switch executor thread (identified by the executor's
thread name prefix).
+ * For each dropped attempt, captures the current timeout from the retry
context snapshot.
+ * After all drops and the final successful cleanup, asserts that the
captured timeout
+ * sequence is strictly increasing, confirming exponential backoff
behavior.
+ *
+ * <p>Sampling is restricted to the write intent switch executor thread to
ensure timeouts
+ * are captured after the retry context has been advanced for the current
attempt, rather
+ * than observing a stale pre-drop state.
+ */
+ @Test
+ public void testRetryWithGrowingTimeout() {
+ IgniteImpl node = anyNode();
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val)
values (1, 'val-1')");
+
+ AtomicInteger failedCleanupAttempts = new AtomicInteger();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ List<Integer> timeoutSamples = new CopyOnWriteArrayList<>();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+ retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof WriteIntentSwitchReplicaRequest) {
+ if
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+ if (failedCleanupAttempts.getAndIncrement() < 3) {
+ retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot)
+ .filter(retryContextSnapshot ->
retryContextSnapshot.size() == 1)
+ .flatMap(retryContextSnapshot ->
retryContextSnapshot.values().stream())
+ .forEach(timeoutState ->
timeoutSamples.add(timeoutState.getTimeout()));
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ return false;
+ });
+ }
+
+ tx.commitAsync();
+
+ await().timeout(5, SECONDS).until(() -> failedCleanupAttempts.get() ==
3);
+
+ await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) ==
0);
+
+ assertTrue(timeoutSamples.size() > 1, "Expected at least 2 timeout
samples, got: " + timeoutSamples.size());
+
+ for (int i = 1; i < timeoutSamples.size(); i++) {
+ assertTrue(timeoutSamples.get(i - 1) < timeoutSamples.get(i),
"timeout increasing is expected!");
+ }
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Verifies that 100 concurrent transactions all have their write intents
cleaned up
+ * after transient failures.
+ *
+ * <p>Creates a zone with 25 partitions to distribute transaction load.
For each
+ * transaction, drops the first {@link WriteIntentSwitchReplicaRequest} on
the write
+ * intent switch executor thread (keyed by transaction ID) to force
exactly one retry
+ * per transaction. After all 100 tasks are submitted and the thread pool
shuts down,
+ * waits for all pending write intents to reach zero and for all per-node
retry context
+ * entries to be cleaned up.
+ *
+ * @throws Exception if the thread pool is interrupted during shutdown.
+ */
+ @Test
+ public void testRetryManyConcurrentCleanUps() throws Exception {
+ IgniteImpl node = anyNode();
+
+ String zone1Sql = "create zone test_zone_1 (partitions 25, replicas "
+ REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String table1Sql = "create table test_table_1 (key bigint primary key,
val varchar(20)) zone TEST_ZONE_1";
+
+ sql(zone1Sql);
+ sql(table1Sql);
+
+ Map<String, AtomicInteger> failedCleanupAttempts = new
ConcurrentHashMap<>();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+ retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof WriteIntentSwitchReplicaRequest) {
+ if
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+ String txId = ((WriteIntentSwitchReplicaRequest)
msg).txId().toString();
+
+ return failedCleanupAttempts.computeIfAbsent(txId, key
-> new AtomicInteger(0)).getAndIncrement() == 0;
+ }
+
+ return false;
+ }
+
+ return false;
+ });
+ }
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(10);
+
+ List<? extends Future<?>> futures = IntStream.range(0, 100).mapToObj(i
-> threadPool.submit(() -> {
+ Transaction tx = node.transactions().begin();
+ node.sql().execute(tx, "insert into test_table_1 (key, val) values
(?, ?)", i, "val-" + i);
+
+ tx.commitAsync();
+ })).collect(toList());
+
+ try {
+ futures.forEach(ItTxCleanupFailureTest::getQuietly);
+ } finally {
+ threadPool.shutdown();
+ threadPool.awaitTermination(5, SECONDS);
+ }
+
+ await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) ==
0);
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Verifies that cleanup retries for transactions targeting different
replication groups
+ * are tracked independently in each node's retry context, and that
independently
+ * progressed contexts converge to the same timeout value.
+ *
+ * <p>Creates two separate single-partition zones with dedicated tables.
For each
+ * transaction, drops the first two {@link
WriteIntentSwitchReplicaRequest} messages
+ * arriving on the cleanup thread (keyed by transaction ID), forcing both
retry contexts
+ * to advance by the same number of steps. On the third attempt for each
transaction,
+ * captures a snapshot of all retry context entries across all nodes,
merging per-node
+ * entries by transaction ID — values are identical across nodes for the
same transaction
+ * since each node applies the same backoff logic independently.
+ *
+ * <p>After both transactions' write intents are resolved, asserts that:
+ * <ul>
+ * <li>exactly two distinct transaction IDs are present in the merged
snapshot —
+ * one per transaction, confirming each transaction is tracked
under its own key
+ * in every node's retry context;</li>
+ * <li>both entries carry the same timeout value, confirming that
independently
+ * progressed per-node retry contexts reach identical timeouts
when starting
+ * from the same initial value and advancing the same number of
steps.</li>
+ * </ul>
+ *
+ * <p>Finally, asserts that all per-node retry context entries are cleaned
up after
+ * successful cleanup.
+ */
+ @Test
+ public void testRetryCleanUpsForDifferentZones() {
+ IgniteImpl node = anyNode();
+
+ String zone1Sql = "create zone test_zone_1 (partitions 1, replicas " +
REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String table1Sql = "create table test_table_1 (key bigint primary key,
val varchar(20)) zone TEST_ZONE_1";
+
+ sql(zone1Sql);
+ sql(table1Sql);
+
+ String zone2Sql = "create zone test_zone_2 (partitions 1, replicas " +
REPLICAS
+ + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']";
+ String table2Sql = "create table test_table_2 (key bigint primary key,
val varchar(20)) zone TEST_ZONE_2";
+
+ sql(zone2Sql);
+ sql(table2Sql);
+
+ Map<String, AtomicInteger> failedCleanupAttempts = new
ConcurrentHashMap<>();
+ Map<String, TimeoutState> timeoutSamples = new ConcurrentHashMap<>();
+
+ List<KeyBasedRetryContext> retryContexts = new ArrayList<>();
+
+ for (IgniteImpl n : runningNodesIter()) {
+ TxManagerImpl txManager = (TxManagerImpl) n.txManager();
+ retryContexts.add((KeyBasedRetryContext) txManager.retryContext());
+
+ n.dropMessages((dest, msg) -> {
+ if (msg instanceof WriteIntentSwitchReplicaRequest) {
+ if
(Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) {
+ String txId = ((WriteIntentSwitchReplicaRequest)
msg).txId().toString();
+
+ if (failedCleanupAttempts.computeIfAbsent(txId, key ->
new AtomicInteger(0)).getAndIncrement() < 2) {
+ return true;
+ }
+
+ timeoutSamples.putAll(
+ retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot)
+ .filter(snapshot ->
!snapshot.isEmpty())
+ .flatMap(snapshot ->
snapshot.entrySet().stream())
+ .collect(toMap(Entry::getKey,
Entry::getValue, (existing, replacement) -> replacement))
+ );
+ }
+
+ return false;
+ }
+
+ return false;
+ });
+ }
+
+ Transaction tx1 = node.transactions().begin();
+ node.sql().execute(tx1, "insert into test_table_1 (key, val) values
(1, 'val-1')");
+
+ tx1.commitAsync();
+
+ Transaction tx2 = node.transactions().begin();
+ node.sql().execute(tx2, "insert into TEST_TABLE_2 (key, val) values
(1, 'val-1')");
+
+ tx2.commitAsync();
+
+ await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) ==
0);
+
+ assertEquals(2, timeoutSamples.size(),
+ "Expected timeout state for both transactions, but got: " +
timeoutSamples.keySet());
+
+ List<Integer> collectedTimeouts = timeoutSamples.values().stream()
+ .map(TimeoutState::getTimeout)
+ .distinct()
+ .collect(toList());
+
+ assertEquals(1, collectedTimeouts.size(),
+ "Expected both transactions to have the same timeout value,
but got: " + collectedTimeouts);
+
+ await().timeout(5, SECONDS).until(() -> retryContexts.stream()
+ .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty));
+ }
+
+ /**
+ * Returns the number of pending write intents on the given node by
reading the
+ * {@link TransactionMetricsSource#METRIC_PENDING_WRITE_INTENTS} metric.
+ *
+ * <p>Fails the test immediately if the metric is not found, as this
indicates
+ * a misconfiguration or an unexpected change in metric naming.
+ *
+ * @param node the node to read the metric from.
+ * @return number of pending write intents.
+ */
private static long pendingWriteIntents(IgniteImpl node) {
Iterable<Metric> metrics = node.metricManager()
.metricSnapshot()
@@ -95,4 +453,34 @@ public class ItTxCleanupFailureTest extends
ClusterPerTestIntegrationTest {
return -1;
}
+
+ /**
+ * Waits for the given {@link Future} to complete and returns its result.
+ *
+ * <p>Wraps checked exceptions as {@link AssertionError} so they propagate
cleanly
+ * through {@link java.util.function.Consumer} lambdas in test code
without requiring
+ * explicit try-catch blocks.
+ *
+ * <ul>
+ * <li>{@link ExecutionException} — wraps the cause as an {@link
AssertionError},
+ * preserving the original exception for diagnosis.</li>
+ * <li>{@link InterruptedException} — restores the interrupt flag and
wraps
+ * as an {@link AssertionError}.</li>
+ * </ul>
+ *
+ * @param <T> the future's result type.
+ * @param future the future to wait for.
+ * @return the future's result.
+ * @throws AssertionError if the future completed exceptionally or the
thread was interrupted.
+ */
+ private static <T> T getQuietly(Future<T> future) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ throw new AssertionError("Future completed exceptionally",
e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("Interrupted while waiting for future",
e);
+ }
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 953d0baf249..1c46039a245 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.tx.impl;
-import static java.lang.Math.min;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
@@ -25,7 +24,7 @@ import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
import static org.apache.ignite.internal.tx.TxStateMeta.builder;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
-import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry;
+import static org.apache.ignite.internal.util.retry.RetryUtil.scheduleRetry;
import java.util.ArrayList;
import java.util.Collection;
@@ -57,6 +56,7 @@ import
org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.retry.RetryContext;
import org.jetbrains.annotations.Nullable;
/**
@@ -65,9 +65,6 @@ import org.jetbrains.annotations.Nullable;
public class TxCleanupRequestSender {
private static final int ATTEMPTS_LOG_THRESHOLD = 100;
- private static final int RETRY_INITIAL_TIMEOUT_MS = 20;
- private static final int RETRY_MAX_TIMEOUT_MS = 30_000;
-
private static final String UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY =
"Unsuccessful transaction cleanup after N attempts";
private final IgniteThrottledLogger throttledLog;
@@ -91,6 +88,9 @@ public class TxCleanupRequestSender {
/** Executor that is used to schedule retries of cleanup messages in case
of retryable errors. */
private final ScheduledExecutorService retryExecutor;
+ /** The retry context for handling retries of cleanup messages. */
+ private final RetryContext retryContext;
+
/**
* The constructor.
*
@@ -100,6 +100,7 @@ public class TxCleanupRequestSender {
* @param cleanupExecutor Cleanup executor.
* @param commonScheduler Common scheduler.
* @param clusterNodeResolver Cluster node resolver.
+ * @param retryContext retry context.
*/
public TxCleanupRequestSender(
TxMessageSender txMessageSender,
@@ -107,7 +108,8 @@ public class TxCleanupRequestSender {
VolatileTxStateMetaStorage txStateVolatileStorage,
ExecutorService cleanupExecutor,
ScheduledExecutorService commonScheduler,
- ClusterNodeResolver clusterNodeResolver
+ ClusterNodeResolver clusterNodeResolver,
+ RetryContext retryContext
) {
this.txMessageSender = txMessageSender;
this.placementDriverHelper = placementDriverHelper;
@@ -116,6 +118,7 @@ public class TxCleanupRequestSender {
this.retryExecutor = commonScheduler;
this.clusterNodeResolver = clusterNodeResolver;
this.throttledLog =
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class),
commonScheduler);
+ this.retryContext = retryContext;
}
/**
@@ -185,7 +188,7 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
- return sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
+ return sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node, null);
}
/**
@@ -227,7 +230,7 @@ public class TxCleanupRequestSender {
enlistedPartitionGroups.add(new
EnlistedPartitionGroup(partitionId, partition.tableIds()));
});
- return cleanupPartitions(commitPartitionId, partitionsByPrimaryName,
commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0);
+ return cleanupPartitions(commitPartitionId, partitionsByPrimaryName,
commit, commitTimestamp, txId);
}
/**
@@ -246,18 +249,6 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
- ) {
- return cleanup(commitPartitionId, partitions, commit, commitTimestamp,
txId, RETRY_INITIAL_TIMEOUT_MS, 0);
- }
-
- private CompletableFuture<Void> cleanup(
- @Nullable ZonePartitionId commitPartitionId,
- Collection<EnlistedPartitionGroup> partitions,
- boolean commit,
- @Nullable HybridTimestamp commitTimestamp,
- UUID txId,
- long timeout,
- int attemptsMade
) {
Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds =
partitions.stream()
.collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -282,9 +273,7 @@ public class TxCleanupRequestSender {
commit,
commitTimestamp,
txId,
-
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds),
- timeout,
- attemptsMade
+
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds)
);
Map<String, List<EnlistedPartitionGroup>>
partitionsByPrimaryName = toPartitionInfosByPrimaryName(
@@ -296,9 +285,7 @@ public class TxCleanupRequestSender {
partitionsByPrimaryName,
commit,
commitTimestamp,
- txId,
- timeout,
- attemptsMade
+ txId
);
});
}
@@ -325,9 +312,7 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
- List<EnlistedPartitionGroup> partitionsWithoutPrimary,
- long timeout,
- int attemptsMade
+ List<EnlistedPartitionGroup> partitionsWithoutPrimary
) {
Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds =
partitionsWithoutPrimary.stream()
.collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -345,9 +330,7 @@ public class TxCleanupRequestSender {
partitionsByPrimaryName,
commit,
commitTimestamp,
- txId,
- timeout,
- attemptsMade
+ txId
);
});
}
@@ -357,9 +340,7 @@ public class TxCleanupRequestSender {
Map<String, List<EnlistedPartitionGroup>> partitionsByNode,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
- UUID txId,
- long timeout,
- int attemptsMade
+ UUID txId
) {
List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
@@ -368,7 +349,7 @@ public class TxCleanupRequestSender {
List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node,
- commitPartitionId == null ? null : nodePartitions,
timeout, attemptsMade));
+ commitPartitionId == null ? null : nodePartitions));
}
return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
@@ -380,9 +361,7 @@ public class TxCleanupRequestSender {
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
String node,
- @Nullable Collection<EnlistedPartitionGroup> partitions,
- long timeout,
- int attemptsMade
+ @Nullable Collection<EnlistedPartitionGroup> partitions
) {
return txMessageSender.cleanup(node, partitions, txId, commit,
commitTimestamp)
.thenApply(response -> {
@@ -396,16 +375,21 @@ public class TxCleanupRequestSender {
})
.handleAsync((networkMessage, throwable) -> {
if (throwable != null) {
+ String timeoutKey = commitPartitionId == null ?
txId.toString() : commitPartitionId.toString();
+
if
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
- if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) {
- throttledLog.warn(
- UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY,
- "Unsuccessful transaction cleanup
after {} attempts, keep retrying [txId={}]",
- throwable,
- ATTEMPTS_LOG_THRESHOLD,
- txId
- );
- }
+
retryContext.getState(timeoutKey).ifPresent(timeoutState -> {
+ if (timeoutState.getAttempt() >
ATTEMPTS_LOG_THRESHOLD || timeoutState.getAttempt() < 0) {
+ throttledLog.warn(
+ UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY,
+ "Unsuccessful transaction cleanup
after {} attempts for key {}, keep retrying [txId={}]",
+ throwable,
+ ATTEMPTS_LOG_THRESHOLD,
+ timeoutKey,
+ txId
+ );
+ }
+ });
// In the case of a failure we repeat the process,
but start with finding correct primary replicas
// for this subset of partitions. If nothing
changed in terms of the nodes and primaries
@@ -429,13 +413,12 @@ public class TxCleanupRequestSender {
commitTimestamp,
txId,
node,
- partitions,
- incrementTimeout(timeout),
- attemptsMade + 1
+ partitions
),
- timeout,
+
retryContext.updateAndGetState(timeoutKey).getTimeout(),
TimeUnit.MILLISECONDS,
- retryExecutor
+ retryExecutor,
+ () ->
retryContext.resetState(timeoutKey)
);
}
@@ -447,13 +430,12 @@ public class TxCleanupRequestSender {
partitions,
commit,
commitTimestamp,
- txId,
- incrementTimeout(timeout),
- attemptsMade + 1
+ txId
),
- timeout,
+
retryContext.updateAndGetState(timeoutKey).getTimeout(),
TimeUnit.MILLISECONDS,
- retryExecutor
+ retryExecutor,
+ () -> retryContext.resetState(timeoutKey)
);
}
@@ -465,10 +447,6 @@ public class TxCleanupRequestSender {
.thenCompose(v -> v);
}
- private static long incrementTimeout(long currentTimeout) {
- return min(currentTimeout * 2, RETRY_MAX_TIMEOUT_MS);
- }
-
private static class CleanupContext {
private final ZonePartitionId commitPartitionId;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 2c8cc25e878..22e76838c6a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -44,6 +44,7 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.internal.util.retry.RetryUtil.scheduleRetry;
import java.util.ArrayList;
import java.util.Collection;
@@ -121,6 +122,9 @@ import
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.apache.ignite.internal.tx.views.LocksViewProvider;
import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.RetryContext;
+import org.apache.ignite.internal.util.retry.TimeoutStrategy;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -242,6 +246,8 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
private final RemotelyTriggeredResourceRegistry resourcesRegistry;
+ private final RetryContext retryContext;
+
/**
* Test-only constructor.
*
@@ -260,6 +266,7 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
* @param transactionInflights Transaction inflights.
* @param lowWatermark Low watermark.
* @param metricManager Metric manager.
+ * @param timeoutStrategy Timeout strategy.
*/
@TestOnly
public TxManagerImpl(
@@ -278,7 +285,8 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
TransactionInflights transactionInflights,
LowWatermark lowWatermark,
ScheduledExecutorService commonScheduler,
- MetricManager metricManager
+ MetricManager metricManager,
+ TimeoutStrategy timeoutStrategy
) {
this(
clusterService.staticLocalNode(),
@@ -300,7 +308,8 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
lowWatermark,
commonScheduler,
new FailureManager(new NoOpFailureHandler()),
- metricManager
+ metricManager,
+ timeoutStrategy
);
}
@@ -324,6 +333,7 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
* @param transactionInflights Transaction inflights.
* @param lowWatermark Low watermark.
* @param metricManager Metric manager.
+ * @param timeoutStrategy Timeout strategy.
*/
public TxManagerImpl(
InternalClusterNode localNode,
@@ -345,7 +355,8 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
LowWatermark lowWatermark,
ScheduledExecutorService commonScheduler,
FailureProcessor failureProcessor,
- MetricManager metricManager
+ MetricManager metricManager,
+ TimeoutStrategy timeoutStrategy
) {
this.txConfig = txConfig;
this.systemCfg = systemCfg;
@@ -404,13 +415,16 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
transactionExpirationRegistry = new
TransactionExpirationRegistry(txStateVolatileStorage);
+ retryContext = new KeyBasedRetryContext(timeoutStrategy);
+
txCleanupRequestSender = new TxCleanupRequestSender(
txMessageSender,
placementDriverHelper,
txStateVolatileStorage,
writeIntentSwitchPool,
commonScheduler,
- topologyService
+ topologyService,
+ retryContext
);
txMetrics = new TransactionMetricsSource(clockService);
@@ -827,7 +841,7 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
*/
private CompletableFuture<Void> durableFinish(
HybridTimestampTracker observableTimestampTracker,
- ZonePartitionId commitPartition,
+ @Nullable ZonePartitionId commitPartition,
boolean commit,
Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
UUID txId,
@@ -871,14 +885,23 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
if
(ReplicatorRecoverableExceptions.isRecoverable(cause)) {
LOG.debug("Failed to finish Tx. The operation will
be retried {}.", ex,
formatTxInfo(txId,
txStateVolatileStorage));
- return supplyAsync(() -> durableFinish(
- observableTimestampTracker,
- commitPartition,
- commit,
- enlistedPartitions,
- txId,
- commitTimestamp,
- txFinishFuture
+
+ String timeoutKey = commitPartition == null ?
txId.toString() : commitPartition.toString();
+
+ return supplyAsync(() -> scheduleRetry(
+ () -> durableFinish(
+ observableTimestampTracker,
+ commitPartition,
+ commit,
+ enlistedPartitions,
+ txId,
+ commitTimestamp,
+ txFinishFuture
+ ),
+
retryContext.updateAndGetState(timeoutKey).getTimeout(),
+ MILLISECONDS,
+ commonScheduler,
+ () -> retryContext.resetState(timeoutKey)
),
partitionOperationsExecutor).thenCompose(identity());
} else {
LOG.warn("Failed to finish Tx {}.", ex,
@@ -1386,4 +1409,9 @@ public class TxManagerImpl implements TxManager,
SystemViewProvider {
public void clearLocalRwTxCounter() {
localRwTxCounter.clear();
}
+
+ @TestOnly
+ public RetryContext retryContext() {
+ return retryContext;
+ }
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index ba491c9160e..a2a794e0f7a 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -66,6 +66,8 @@ import
org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
+import org.apache.ignite.internal.util.retry.KeyBasedRetryContext;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -147,7 +149,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
mock(VolatileTxStateMetaStorage.class),
testSyncExecutorService(),
testSyncScheduledExecutorService(),
- topologyService
+ topologyService,
+ new KeyBasedRetryContext(new NoopTimeoutStrategy())
);
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 71c8bb701e0..0d44bfb93b9 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -107,6 +107,7 @@ import
org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
@@ -203,7 +204,8 @@ public class TxManagerTest extends IgniteAbstractTest {
transactionInflights,
lowWatermark,
commonScheduler,
- new TestMetricManager()
+ new TestMetricManager(),
+ new NoopTimeoutStrategy()
);
assertThat(txManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());