This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4cd0fd51eec IGNITE-17838 Implement runInTransaction automatic retries 
(#6413)
4cd0fd51eec is described below

commit 4cd0fd51eec2d04c889955a83bfc3f600f16a0bc
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Wed Aug 27 10:44:29 2025 +0300

    IGNITE-17838 Implement runInTransaction automatic retries (#6413)
---
 .../ignite/tx/IgniteTransactionDefaults.java}      |  18 +-
 .../org/apache/ignite/tx/IgniteTransactions.java   | 112 ++++---
 .../ignite/tx/RetriableTransactionException.java}  |  15 +-
 .../ignite/tx/RunInTransactionInternalImpl.java    | 360 ++++++++++++++++++++
 .../ignite/tx/RunInTransactionRetryTest.java       | 367 +++++++++++++++++++++
 .../internal/lang/ComponentStoppingException.java  |   3 +-
 .../internal/lang/NodeStoppingException.java       |   3 +-
 .../ignite/internal/util/FastTimestamps.java       |   6 +-
 .../network/UnresolvableConsistentIdException.java |   3 +-
 .../PrimaryReplicaAwaitException.java              |   3 +-
 .../PrimaryReplicaAwaitTimeoutException.java       |   3 +-
 .../exception/PrimaryReplicaMissException.java     |   4 +-
 .../replicator/exception/ReplicationException.java |   3 +-
 .../ItTxDistributedCleanupRecoveryTest.java        |   4 +-
 ...tKeyValueBinaryViewApiExplicitRunInTxnTest.java |   3 +-
 .../ignite/internal/table/TxAbstractTest.java      |  87 ++---
 .../ignite/internal/tx/InternalTxOptions.java      |   5 +
 .../apache/ignite/internal/tx/LockException.java   |   4 +-
 .../TransactionConfigurationSchema.java            |   4 +-
 .../tx/impl/PersistentTxStateVacuumizer.java       |   4 +-
 .../tx/impl/PrimaryReplicaExpiredException.java    |   3 +-
 21 files changed, 883 insertions(+), 131 deletions(-)

diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
 b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java
similarity index 59%
copy from 
modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
copy to 
modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java
index cfe4e181ed7..f6abdf37349 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java
@@ -15,18 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network;
-
-import org.apache.ignite.lang.ErrorGroups.Network;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterNode;
+package org.apache.ignite.tx;
 
 /**
- * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} 
instance (i.e. when
- * there is no node with such consistent ID in the physical topology).
+ * Utility class containing transaction default constants.
  */
-public class UnresolvableConsistentIdException extends IgniteException {
-    public UnresolvableConsistentIdException(String msg) {
-        super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
-    }
+public class IgniteTransactionDefaults {
+    /**
+     * Default transaction timeout.
+     */
+    public static final long DEFAULT_RW_TX_TIMEOUT_SECONDS = 30;
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java 
b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
index 51cd9641647..0527e93e4d5 100644
--- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
+++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.tx;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.function.Function.identity;
+import static 
org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS;
+import static 
org.apache.ignite.tx.RunInTransactionInternalImpl.runInTransactionAsyncInternal;
+import static 
org.apache.ignite.tx.RunInTransactionInternalImpl.runInTransactionInternal;
 
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.table.Table;
@@ -115,7 +117,7 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>The correct variant will be:
+     * <p>The <b>correct</b> variant will be:
      * <pre>
      * {@code
      * igniteTransactions.runInTransaction(tx -> {
@@ -125,7 +127,12 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed. In a case of exception, the
+     * closure will be retried automatically within the transaction timeout, 
so it must be pure function. If the transaction timeout
+     * expires before the closure completes successfully and the transaction 
has been committed, the transaction is rolled back instead.
+     * <br>
+     * The closure is retried only in cases of "expected" exceptions, like 
{@code LockException}, {@code TimeoutException},
+     * exceptions related to the primary replica change, etc.
      *
      * @param clo The closure.
      *
@@ -153,7 +160,7 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>The correct variant will be:
+     * <p>The <b>correct</b> variant will be:
      * <pre>
      * {@code
      * igniteTransactions.runInTransaction(tx -> {
@@ -163,7 +170,12 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed. In a case of exception, the
+     * closure will be retried automatically within the transaction timeout, 
so it must be pure function. If the transaction timeout
+     * expires before the closure completes successfully and the transaction 
has been committed, the transaction is rolled back instead.
+     * <br>
+     * The closure is retried only in cases of "expected" exceptions, like 
{@code LockException}, {@code TimeoutException},
+     * exceptions related to the primary replica change, etc.
      *
      * @param options Transaction options.
      * @param clo The closure.
@@ -197,7 +209,7 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>The correct variant will be:
+     * <p>The <b>correct</b> variant will be:
      * <pre>
      * {@code
      * igniteTransactions.runInTransaction(tx -> {
@@ -207,7 +219,12 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed. In a case of exception, the
+     * closure will be retried automatically within the transaction timeout, 
so it must be pure function. If the transaction timeout
+     * expires before the closure completes successfully and the transaction 
has been committed, the transaction is rolled back instead.
+     * <br>
+     * The closure is retried only in cases of "expected" exceptions, like 
{@code LockException}, {@code TimeoutException},
+     * exceptions related to the primary replica change, etc.
      *
      * @param clo Closure.
      * @param <T> Closure result type.
@@ -237,7 +254,7 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>The correct variant will be:
+     * <p>The <b>correct</b> variant will be:
      * <pre>
      * {@code
      * igniteTransactions.runInTransaction(tx -> {
@@ -247,7 +264,12 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed.
+     * <p>If the closure is executed normally (no exceptions) the transaction 
is automatically committed. In a case of exception, the
+     * closure will be retried automatically within the transaction timeout, 
so it must be pure function. If the transaction timeout
+     * expires before the closure completes successfully and the transaction 
has been committed, the transaction is rolled back instead.
+     * <br>
+     * The closure is retried only in cases of "expected" exceptions, like 
{@code LockException}, {@code TimeoutException},
+     * exceptions related to the primary replica change, etc.
      *
      * @param clo The closure.
      * @param options Transaction options.
@@ -257,26 +279,11 @@ public interface IgniteTransactions {
      * @throws TransactionException If a transaction can't be finished 
successfully.
      */
     default <T> T runInTransaction(Function<Transaction, T> clo, @Nullable 
TransactionOptions options) throws TransactionException {
-        Objects.requireNonNull(clo);
-
-        Transaction tx = begin(options);
-
-        try {
-            T ret = clo.apply(tx);
-
-            tx.commit();
-
-            return ret;
-        } catch (Throwable t) {
-            // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 
Implement auto retries
-            try {
-                tx.rollback(); // Try rolling back on user exception.
-            } catch (Exception e) {
-                t.addSuppressed(e);
-            }
-
-            throw t;
-        }
+        // This start timestamp is not related to transaction's begin 
timestamp and only serves as local time for counting the timeout of
+        // possible retries.
+        long startTimestamp = System.currentTimeMillis();
+        long initialTimeout = options == null ? 
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : 
options.timeoutMillis();
+        return runInTransactionInternal(this, clo, options, startTimestamp, 
initialTimeout);
     }
 
     /**
@@ -293,7 +300,12 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>If the asynchronous chain resulted in no exception, the commitAsync 
will be automatically called.
+     * <p>If the asynchronous chain resulted with no exception, the 
commitAsync will be automatically called. In a case of exception, the
+     * closure will be retried automatically within the transaction timeout, 
so it must be pure function. If the transaction timeout
+     * expires before the closure completes successfully and the transaction 
has been committed, the transaction is rolled back instead.
+     * <br>
+     * The closure is retried only in cases of "expected" exceptions, like 
{@code LockException}, {@code TimeoutException},
+     * exceptions related to the primary replica change, etc.
      *
      * @param clo The closure.
      * @param <T> Closure result type.
@@ -317,7 +329,13 @@ public interface IgniteTransactions {
      * }
      * </pre>
      *
-     * <p>If the asynchronous chain resulted in no exception, the commitAsync 
will be automatically called.
+     * <p>If the asynchronous chain resulted with no exception, the 
commitAsync will be automatically called. In a case of exception, the
+     * closure will be retried automatically within the transaction timeout, 
so it must be pure function. If the transaction timeout
+     * expires before the closure completes successfully and the transaction 
has been committed, the transaction is rolled back instead.
+     * <br>
+     * The closure is retried only in cases of "expected" exceptions, like 
{@code LockException}, {@code TimeoutException},
+     * exceptions related to the primary replica change, etc.
+     *
      *
      * @param clo The closure.
      * @param options Transaction options.
@@ -326,28 +344,12 @@ public interface IgniteTransactions {
      */
     default <T> CompletableFuture<T> runInTransactionAsync(
             Function<Transaction, CompletableFuture<T>> clo,
-            @Nullable TransactionOptions options) {
-        Objects.requireNonNull(clo);
-
-        // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 
Implement auto retries
-        return beginAsync(options).thenCompose(tx -> {
-            try {
-                return clo.apply(tx).handle((res, e) -> {
-                    if (e != null) {
-                        return tx.rollbackAsync().exceptionally(e0 -> {
-                            e.addSuppressed(e0);
-                            return null;
-                        }).thenCompose(ignored -> 
CompletableFuture.<T>failedFuture(e));
-                    }
-
-                    return completedFuture(res);
-                }).thenCompose(identity()).thenCompose(val -> 
tx.commitAsync().thenApply(ignored -> val));
-            } catch (Exception e) {
-                return tx.rollbackAsync().exceptionally(e0 -> {
-                    e.addSuppressed(e0);
-                    return null;
-                }).thenCompose(ignored -> CompletableFuture.failedFuture(e));
-            }
-        });
+            @Nullable TransactionOptions options
+    ) {
+        // This start timestamp is not related to transaction's begin 
timestamp and only serves as local time for counting the timeout of
+        // possible retries.
+        long startTimestamp = System.currentTimeMillis();
+        long initialTimeout = options == null ? 
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : 
options.timeoutMillis();
+        return runInTransactionAsyncInternal(this, clo, options, 
startTimestamp, initialTimeout, null);
     }
 }
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
 
b/modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java
similarity index 59%
copy from 
modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
copy to 
modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java
index cfe4e181ed7..0a50878901e 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java
@@ -15,18 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network;
-
-import org.apache.ignite.lang.ErrorGroups.Network;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterNode;
+package org.apache.ignite.tx;
 
 /**
- * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} 
instance (i.e. when
- * there is no node with such consistent ID in the physical topology).
+ * This is the marker interface for exceptions that can be retried if happened 
in {@link IgniteTransactions#runInTransaction} and
+ * {@link IgniteTransactions#runInTransactionAsync}.
  */
-public class UnresolvableConsistentIdException extends IgniteException {
-    public UnresolvableConsistentIdException(String msg) {
-        super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
-    }
+public interface RetriableTransactionException {
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java
 
b/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java
new file mode 100644
index 00000000000..c67797cc4ef
--- /dev/null
+++ 
b/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx;
+
+import static java.util.Collections.synchronizedList;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
+import static 
org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This is, in fact, the default implementation of the {@link 
IgniteTransactions#runInTransaction} and
+ * {@link IgniteTransactions#runInTransactionAsync}, moved from the separate 
class to avoid the interface overloading. This
+ * implementation is common for both client and embedded {@link 
IgniteTransactions}.
+ */
+class RunInTransactionInternalImpl {
+    private static final int MAX_SUPPRESSED = 100;
+
+    static <T> T runInTransactionInternal(
+            IgniteTransactions igniteTransactions,
+            Function<Transaction, T> clo,
+            @Nullable TransactionOptions options,
+            long startTimestamp,
+            long initialTimeout
+    ) throws TransactionException {
+        Objects.requireNonNull(clo);
+
+        TransactionOptions txOptions = options == null
+                ? new 
TransactionOptions().timeoutMillis(TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS))
+                : options;
+
+        List<Throwable> suppressed = new ArrayList<>();
+
+        Transaction tx;
+        T ret;
+
+        while (true) {
+            tx = igniteTransactions.begin(txOptions);
+
+            try {
+                ret = clo.apply(tx);
+
+                break;
+            } catch (Exception ex) {
+                addSuppressedToList(suppressed, ex);
+
+                long remainingTime = calcRemainingTime(initialTimeout, 
startTimestamp);
+
+                if (remainingTime > 0 && isRetriable(ex)) {
+                    // Rollback on user exception, should be retried until 
success or timeout to ensure the lock release
+                    // before the next attempt.
+                    rollbackWithRetry(tx, ex, startTimestamp, initialTimeout, 
suppressed);
+
+                    long remaining = calcRemainingTime(initialTimeout, 
startTimestamp);
+
+                    if (remaining > 0) {
+                        // Will go on retry iteration.
+                        txOptions = txOptions.timeoutMillis(remainingTime);
+                    } else {
+                        throwExceptionWithSuppressed(ex, suppressed);
+                    }
+                } else {
+                    try {
+                        // No retries here, rely on the durable finish.
+                        tx.rollback();
+                    } catch (Exception e) {
+                        addSuppressedToList(suppressed, e);
+                    }
+
+                    throwExceptionWithSuppressed(ex, suppressed);
+                }
+            }
+        }
+
+        try {
+            tx.commit();
+        } catch (Exception e) {
+            try {
+                // Try to rollback tx in case if it's not finished. Retry is 
not needed here due to the durable finish.
+                tx.rollback();
+            } catch (Exception re) {
+                e.addSuppressed(re);
+            }
+
+            throw e;
+        }
+
+        return ret;
+    }
+
+    private static void rollbackWithRetry(
+            Transaction tx,
+            Exception closureException,
+            long startTimestamp,
+            long initialTimeout,
+            List<Throwable> suppressed
+    ) {
+        while (true) {
+            try {
+                tx.rollback();
+
+                break;
+            } catch (Exception re) {
+                addSuppressedToList(suppressed, re);
+
+                if (calcRemainingTime(initialTimeout, startTimestamp) <= 0) {
+                    throwExceptionWithSuppressed(closureException, suppressed);
+                }
+            }
+        }
+    }
+
+    static <T> CompletableFuture<T> runInTransactionAsyncInternal(
+            IgniteTransactions igniteTransactions,
+            Function<Transaction, CompletableFuture<T>> clo,
+            @Nullable TransactionOptions options,
+            long startTimestamp,
+            long initialTimeout,
+            @Nullable List<Throwable> suppressed
+    ) {
+        Objects.requireNonNull(clo);
+
+        TransactionOptions txOptions = options == null
+                ? new 
TransactionOptions().timeoutMillis(TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS))
+                : options;
+
+        List<Throwable> sup = suppressed == null ? synchronizedList(new 
ArrayList<>()) : suppressed;
+
+        return igniteTransactions
+                .beginAsync(txOptions)
+                // User closure with retries.
+                .thenCompose(tx -> {
+                    try {
+                        return clo.apply(tx)
+                                .handle((res, e) -> {
+                                    if (e != null) {
+                                        return handleClosureException(
+                                                igniteTransactions,
+                                                tx,
+                                                clo,
+                                                txOptions,
+                                                startTimestamp,
+                                                initialTimeout,
+                                                sup,
+                                                e
+                                        );
+                                    } else {
+                                        return completedFuture(res);
+                                    }
+                                })
+                                .thenCompose(identity())
+                                .thenApply(res -> new TxWithVal<>(tx, res));
+                    } catch (Exception e) {
+                        return handleClosureException(igniteTransactions, tx, 
clo, txOptions, startTimestamp, initialTimeout, sup, e)
+                                .thenApply(res -> new TxWithVal<>(tx, res));
+                    }
+                })
+                // Transaction commit with rollback on failure, without 
retries.
+                // Transaction rollback on closure failure is implemented in 
closure retry logic.
+                .thenCompose(txWithVal ->
+                        txWithVal.tx.commitAsync()
+                                .handle((ignored, e) -> {
+                                    if (e == null) {
+                                        return completedFuture(null);
+                                    } else {
+                                        return txWithVal.tx.rollbackAsync()
+                                                // Rethrow commit exception.
+                                                .handle((ign, re) -> 
sneakyThrow(e));
+                                    }
+                                })
+                                .thenCompose(fut -> fut)
+                                .thenApply(ignored -> txWithVal.val)
+                );
+    }
+
+    private static <T> CompletableFuture<T> handleClosureException(
+            IgniteTransactions igniteTransactions,
+            Transaction currentTx,
+            Function<Transaction, CompletableFuture<T>> clo,
+            TransactionOptions txOptions,
+            long startTimestamp,
+            long initialTimeout,
+            List<Throwable> suppressed,
+            Throwable e
+    ) {
+        addSuppressedToList(suppressed, e);
+
+        long remainingTime = calcRemainingTime(initialTimeout, startTimestamp);
+
+        if (remainingTime > 0 && isRetriable(e)) {
+            // Rollback on user exception, should be retried until success or 
timeout to ensure the lock release
+            // before the next attempt.
+            return rollbackWithRetryAsync(currentTx, startTimestamp, 
initialTimeout, suppressed, e)
+                    .thenCompose(ignored -> {
+                        long remaining = calcRemainingTime(initialTimeout, 
startTimestamp);
+
+                        if (remaining > 0) {
+                            TransactionOptions opt = 
txOptions.timeoutMillis(remaining);
+
+                            return runInTransactionAsyncInternal(
+                                    igniteTransactions,
+                                    clo,
+                                    opt,
+                                    startTimestamp,
+                                    initialTimeout,
+                                    suppressed
+                            );
+                        } else {
+                            return throwExceptionWithSuppressedAsync(e, 
suppressed)
+                                    .thenApply(ign -> null);
+                        }
+                    });
+        } else {
+            // No retries here, rely on the durable finish.
+            return currentTx.rollbackAsync()
+                    .exceptionally(re -> {
+                        addSuppressedToList(suppressed, re);
+
+                        return null;
+                    })
+                    .thenCompose(ignored -> 
throwExceptionWithSuppressedAsync(e, suppressed))
+                    // Never executed.
+                    .thenApply(ignored -> null);
+        }
+    }
+
+    private static CompletableFuture<Void> rollbackWithRetryAsync(
+            Transaction tx,
+            long startTimestamp,
+            long initialTimeout,
+            List<Throwable> suppressed,
+            Throwable e
+    ) {
+        return tx.rollbackAsync()
+                .handle((ignored, re) -> {
+                    CompletableFuture<Void> fut;
+
+                    if (re == null) {
+                        fut = completedFuture(null);
+                    } else {
+                        addSuppressedToList(suppressed, re);
+
+                        if (calcRemainingTime(initialTimeout, startTimestamp) 
<= 0) {
+                            for (Throwable s : suppressed) {
+                                addSuppressed(e, s);
+                            }
+
+                            fut = failedFuture(e);
+                        } else {
+                            fut = rollbackWithRetryAsync(tx, startTimestamp, 
initialTimeout, suppressed, e);
+                        }
+                    }
+
+                    return fut;
+                })
+                .thenCompose(identity());
+    }
+
+    private static void addSuppressedToList(List<Throwable> to, Throwable a) {
+        if (to.size() < MAX_SUPPRESSED) {
+            to.add(a);
+        }
+    }
+
+    private static void addSuppressed(Throwable to, Throwable a) {
+        if (to != null && a != null && to != a && to.getSuppressed().length < 
MAX_SUPPRESSED) {
+            to.addSuppressed(a);
+        }
+    }
+
+    private static void throwExceptionWithSuppressed(Throwable e, 
List<Throwable> suppressed) {
+        for (Throwable t : suppressed) {
+            addSuppressed(e, t);
+        }
+
+        sneakyThrow(e);
+    }
+
+    private static CompletableFuture<Void> 
throwExceptionWithSuppressedAsync(Throwable e, List<Throwable> suppressed) {
+        for (Throwable t : suppressed) {
+            addSuppressed(e, t);
+        }
+
+        return failedFuture(e);
+    }
+
+    private static boolean isRetriable(Throwable e) {
+        return hasCause(e,
+                TimeoutException.class,
+                RetriableTransactionException.class
+        );
+    }
+
+    private static boolean hasCause(Throwable e, Class<?>... classes) {
+        Set<Throwable> processed = new HashSet<>();
+
+        Throwable cause = e;
+        while (cause != null) {
+            if (!processed.add(cause)) {
+                break;
+            }
+
+            for (Class<?> cls : classes) {
+                if (cls.isAssignableFrom(cause.getClass())) {
+                    return true;
+                }
+            }
+
+            cause = cause.getCause();
+        }
+
+        return false;
+    }
+
+    private static long calcRemainingTime(long initialTimeout, long 
startTimestamp) {
+        long now = System.currentTimeMillis();
+        long remainingTime = initialTimeout - (now - startTimestamp);
+        return remainingTime;
+    }
+
+    private static <E extends Throwable> E sneakyThrow(Throwable e) throws E {
+        throw (E) e;
+    }
+
+    private static class TxWithVal<T> {
+        private final Transaction tx;
+        private final T val;
+
+        private TxWithVal(Transaction tx, T val) {
+            this.tx = tx;
+            this.val = val;
+        }
+    }
+}
diff --git 
a/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java 
b/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java
new file mode 100644
index 00000000000..c629eba22fd
--- /dev/null
+++ 
b/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.cartesian.ArgumentSets;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+
+/**
+ * Tests {@link IgniteTransactions#runInTransaction} and {@link 
IgniteTransactions#runInTransactionAsync} retries.
+ */
+public class RunInTransactionRetryTest {
+    private static final int SHORT_TIMEOUT_MILLIS = 100;
+
+    private long testStartTime;
+
+    @BeforeEach
+    public void setUp() {
+        testStartTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Tests the different scenarios of retries.
+     */
+    @CartesianTest
+    @CartesianTest.MethodFactory("testRetriesArgFactory")
+    public void testRetries(
+            boolean async,
+            int closureFailureCount,
+            int commitFailureCount,
+            int rollbackFailureCount,
+            ClosureFailureType closureFailureType
+    ) {
+        var closureFailures = new AtomicInteger(closureFailureCount);
+        var igniteTransactions = new 
MockIgniteTransactions(commitFailureCount, rollbackFailureCount);
+
+        Supplier<CompletableFuture<Integer>> testClosure;
+
+        if (async) {
+            testClosure = () -> igniteTransactions.runInTransactionAsync(
+                    tx -> closureCall(tx, closureFailures, closureFailureType),
+                    withShortTimeout()
+            );
+        } else {
+            // closureFailureType is always SYNC in sync mode (closure 
shouldn't return future).
+            testClosure = () -> igniteTransactions.runInTransaction(
+                    (Function<Transaction, CompletableFuture<Integer>>) tx ->
+                            closureCall(tx, closureFailures, 
ClosureFailureType.SYNC_FAIL),
+                    withShortTimeout()
+            );
+        }
+
+        boolean requiresEventualSuccess = closureFailureCount < 
Integer.MAX_VALUE
+                // Commit failure can't be retried.
+                && commitFailureCount == 0
+                // Rollbacks should be retried until success or timeout, so 
the rollback must succeed before closure retry.
+                && (closureFailureCount == 0 || rollbackFailureCount < 
Integer.MAX_VALUE);
+
+        boolean syncFail = false;
+        Exception ex = null;
+
+        CompletableFuture<Integer> future = null;
+
+        try {
+            future = testClosure.get();
+        } catch (Exception e) {
+            syncFail = true;
+            ex = e;
+        }
+
+        if (!syncFail) {
+            try {
+                future.join();
+
+                // Closure succeeded, check that it's expected.
+                assertTrue(requiresEventualSuccess);
+            } catch (Exception e) {
+                ex = e;
+            }
+        }
+
+        if (requiresEventualSuccess) {
+            assertEquals(42, future.join());
+        } else {
+            if (closureFailureCount == Integer.MAX_VALUE) {
+                // Had to retry until timed out.
+                checkTimeout();
+            }
+
+            assertNotNull(ex);
+
+            if (!async) {
+                assertTrue(syncFail);
+
+                if (commitFailureCount > 0) {
+                    if (closureFailureCount == Integer.MAX_VALUE || 
closureFailureCount > 0 && rollbackFailureCount == Integer.MAX_VALUE) {
+                        // Closure exception should be rethrown.
+                        assertThat(ex, 
instanceOf(FailedClosureTestException.class));
+                    } else {
+                        assertThat(ex, 
instanceOf(FailedCommitTestException.class));
+                    }
+                } else {
+                    assertThat(ex, 
instanceOf(FailedClosureTestException.class));
+                }
+            } else {
+                assertFalse(syncFail);
+
+                assertThat(ex, instanceOf(CompletionException.class));
+                assertThat(ex.getCause(), instanceOf(Exception.class));
+                Exception cause = (Exception) ex.getCause();
+
+                assertTrue(
+                        cause instanceof FailedClosureTestException || cause 
instanceof FailedCommitTestException
+                );
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private static ArgumentSets testRetriesArgFactory() {
+        return ArgumentSets.argumentsForFirstParameter(true, false)
+                .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE)
+                .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE)
+                .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE)
+                .argumentsForNextParameter(ClosureFailureType.SYNC_FAIL, 
ClosureFailureType.FUTURE_FAIL);
+    }
+
+    @Test
+    public void testNoRetryAfterTimeout() {
+        var igniteTransactions = new MockIgniteTransactions(0, 0);
+
+        AtomicBoolean runned = new AtomicBoolean();
+
+        assertThrows(
+                FailedClosureTestException.class,
+                () -> igniteTransactions.runInTransaction(
+                        (Consumer<Transaction>) tx -> {
+                            assertFalse(runned.get());
+                            runned.set(true);
+                            sleep(100);
+                            throw new FailedClosureTestException();
+                        },
+                        new TransactionOptions().timeoutMillis(1)
+                )
+        );
+    }
+
+    @Test
+    public void testNoRetryAfterTimeoutAsync() {
+        var igniteTransactions = new MockIgniteTransactions(0, 0);
+
+        AtomicBoolean runned = new AtomicBoolean();
+
+        CompletableFuture<Integer> future = 
igniteTransactions.runInTransactionAsync(
+                tx -> {
+                    assertFalse(runned.get());
+                    runned.set(true);
+                    sleep(100);
+                    throw new FailedClosureTestException();
+                },
+                new TransactionOptions().timeoutMillis(1)
+        );
+
+        try {
+            future.join();
+            fail();
+        } catch (Exception e) {
+            assertThat(e, instanceOf(CompletionException.class));
+            assertThat(e.getCause(), 
instanceOf(FailedClosureTestException.class));
+        }
+    }
+
+    private static TransactionOptions withShortTimeout() {
+        return new TransactionOptions().timeoutMillis(SHORT_TIMEOUT_MILLIS);
+    }
+
+    private void checkTimeout() {
+        long now = System.currentTimeMillis();
+        long duration = now - testStartTime;
+        // Assuming that at least 80% of timeout has passed (assuming 
currentTimeMillis inaccuracy).
+        assertThat("duration was: " + duration, duration, greaterThan((long) 
(SHORT_TIMEOUT_MILLIS * 0.8)));
+    }
+
+    private static CompletableFuture<Integer> closureCall(
+            Transaction tx,
+            AtomicInteger closureFailures,
+            ClosureFailureType closureFailureType
+    ) {
+        assertNotNull(tx);
+        assertFalse(isFinished(tx));
+
+        if (closureFailures.get() > 0) {
+            closureFailures.decrementAndGet();
+
+            if (closureFailureType == ClosureFailureType.SYNC_FAIL) {
+                throw new FailedClosureTestException();
+            } else if (closureFailureType == ClosureFailureType.FUTURE_FAIL) {
+                return failedFuture(new FailedClosureTestException());
+            } else {
+                throw new AssertionError("unknown failure type");
+            }
+        } else {
+            return completedFuture(42);
+        }
+    }
+
+    private static boolean isFinished(Transaction tx) {
+        assertInstanceOf(MockTransaction.class, tx);
+        return ((MockTransaction) tx).finished;
+    }
+
+    private static void sleep(long duration) {
+        try {
+            Thread.sleep(duration);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * If closure should fail, this enum defines the type of failure.
+     */
+    private enum ClosureFailureType {
+        /** Closure throws exception. */
+        SYNC_FAIL,
+
+        /** Closure returns failed future. */
+        FUTURE_FAIL
+    }
+
+    private static class MockIgniteTransactions implements IgniteTransactions {
+        final AtomicInteger commitsToFail;
+        final AtomicInteger rollbacksToFail;
+
+        MockIgniteTransactions(int commitsToFail, int rollbacksToFail) {
+            this.commitsToFail = new AtomicInteger(commitsToFail);
+            this.rollbacksToFail = new AtomicInteger(rollbacksToFail);
+        }
+
+        @Override
+        public Transaction begin(@Nullable TransactionOptions options) {
+            return new MockTransaction(commitsToFail, rollbacksToFail);
+        }
+
+        @Override
+        public CompletableFuture<Transaction> beginAsync(@Nullable 
TransactionOptions options) {
+            return completedFuture(begin(options));
+        }
+    }
+
+    private static class MockTransaction implements Transaction {
+        final AtomicInteger commitsToFail;
+        final AtomicInteger rollbacksToFail;
+        boolean finished;
+
+        MockTransaction(AtomicInteger commitsToFail, AtomicInteger 
rollbacksToFail) {
+            this.commitsToFail = commitsToFail;
+            this.rollbacksToFail = rollbacksToFail;
+        }
+
+        @Override
+        public void commit() throws TransactionException {
+            try {
+                commitAsync().join();
+            } catch (CompletionException e) {
+                if (e.getCause() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getCause();
+                } else {
+                    throw e;
+                }
+            }
+        }
+
+        @Override
+        public CompletableFuture<Void> commitAsync() {
+            sleep(1);
+
+            if (commitsToFail.get() > 0) {
+                commitsToFail.decrementAndGet();
+                return failedFuture(new FailedCommitTestException());
+            } else {
+                finished = true;
+                return completedFuture(null);
+            }
+        }
+
+        @Override
+        public void rollback() throws TransactionException {
+            try {
+                rollbackAsync().join();
+            } catch (CompletionException e) {
+                if (e.getCause() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getCause();
+                } else {
+                    throw e;
+                }
+            }
+        }
+
+        @Override
+        public CompletableFuture<Void> rollbackAsync() {
+            sleep(1);
+
+            if (rollbacksToFail.get() > 0) {
+                rollbacksToFail.decrementAndGet();
+                return failedFuture(new FailedRollbackTestException());
+            } else {
+                finished = true;
+                return completedFuture(null);
+            }
+        }
+
+        @Override
+        public boolean isReadOnly() {
+            return false;
+        }
+    }
+
+    private static class FailedClosureTestException extends RuntimeException 
implements RetriableTransactionException {
+        // No-op.
+    }
+
+    private static class FailedCommitTestException extends RuntimeException 
implements RetriableTransactionException {
+        // No-op.
+    }
+
+    private static class FailedRollbackTestException extends RuntimeException 
implements RetriableTransactionException {
+        // No-op.
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
index 02e7759ae49..06354bfe440 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.lang;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.util.UUID;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
  * This is different from {@link NodeStoppingException} as {@link 
ComponentStoppingException} might mean that just the component is stopped,
  * not the whole node.
  */
-public class ComponentStoppingException extends IgniteInternalCheckedException 
{
+public class ComponentStoppingException extends IgniteInternalCheckedException 
implements RetriableTransactionException {
     /** Serial version UID. */
     private static final long serialVersionUID = 0L;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
index 8983ab6e92a..35fd72fc99f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.lang;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.UUID;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * This exception is used to indicate that Ignite node is stopping (already 
stopped) for some reason.
  */
-public class NodeStoppingException extends IgniteInternalCheckedException {
+public class NodeStoppingException extends IgniteInternalCheckedException 
implements RetriableTransactionException {
     /** Serial version UID. */
     private static final long serialVersionUID = 0L;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
index 000d8c1e537..2a6019bc405 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -42,7 +42,11 @@ public class FastTimestamps {
         });
 
         Runnable updaterTask = () -> {
-            coarseCurrentTimeMillis = System.currentTimeMillis();
+            long now = System.currentTimeMillis();
+
+            if (now > coarseCurrentTimeMillis) {
+                coarseCurrentTimeMillis = now;
+            }
 
             // Safe-point-friendly hint.
             Thread.onSpinWait();
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
index cfe4e181ed7..9057f16c764 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.network;
 import org.apache.ignite.lang.ErrorGroups.Network;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.RetriableTransactionException;
 
 /**
  * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} 
instance (i.e. when
  * there is no node with such consistent ID in the physical topology).
  */
-public class UnresolvableConsistentIdException extends IgniteException {
+public class UnresolvableConsistentIdException extends IgniteException 
implements RetriableTransactionException {
     public UnresolvableConsistentIdException(String msg) {
         super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
     }
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
index 5de7483175f..39ae5402dda 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
@@ -22,12 +22,13 @@ import static 
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
 
 /**
  * The exception is thrown when a primary replica await process has failed. 
Please pay attention that there is a specific
  * {@link PrimaryReplicaAwaitTimeoutException} for the primary replica await 
timeout.
  */
-public class PrimaryReplicaAwaitException extends IgniteInternalException {
+public class PrimaryReplicaAwaitException extends IgniteInternalException 
implements RetriableTransactionException {
     private static final long serialVersionUID = 1029917546884926160L;
 
     /**
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
index 35045f9b069..633a2d93cdc 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
@@ -22,12 +22,13 @@ import static 
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * The exception is thrown when a primary replica await process has times out.
  */
-public class PrimaryReplicaAwaitTimeoutException extends 
IgniteInternalException {
+public class PrimaryReplicaAwaitTimeoutException extends 
IgniteInternalException implements RetriableTransactionException {
     private static final long serialVersionUID = -1450288033816499192L;
 
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
index 19ef18868b3..39dfff38cb9 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
@@ -22,12 +22,14 @@ import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
 
 import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Unchecked exception that is thrown when a replica is not the current 
primary replica.
  */
-public class PrimaryReplicaMissException extends IgniteInternalException 
implements ExpectedReplicationException {
+public class PrimaryReplicaMissException extends IgniteInternalException 
implements ExpectedReplicationException,
+        RetriableTransactionException {
     private static final long serialVersionUID = 8755220779942651494L;
 
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
index f6ead96f171..57a18d868d3 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
@@ -22,11 +22,12 @@ import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
 import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
 
 /**
  * The exception is thrown when some issue happened during a replication.
  */
-public class ReplicationException extends IgniteInternalException {
+public class ReplicationException extends IgniteInternalException implements 
RetriableTransactionException {
     /**
      * Constructor.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index e7f9ab5307a..0deed062641 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -47,7 +47,6 @@ public class ItTxDistributedCleanupRecoveryTest extends 
TxAbstractTest {
     @BeforeEach
     @Override
     public void before() throws Exception {
-        // The value of 3 is less than the allowed number of cleanup retries.
         setDefaultRetryCount(3);
 
         txTestCluster = new ItTxTestCluster(
@@ -74,10 +73,11 @@ public class ItTxDistributedCleanupRecoveryTest extends 
TxAbstractTest {
             DefaultMessagingService messagingService = 
(DefaultMessagingService) clusterService.messagingService();
             messagingService.dropMessages((s, networkMessage) -> {
                 if (networkMessage instanceof TxCleanupMessage && 
defaultRetryCount.getAndDecrement() > 0) {
-                    logger().info("Dropping cleanup request: {}", 
networkMessage);
+                    logger().info("Dropping cleanup request [message={}].", 
networkMessage);;
 
                     return true;
                 }
+
                 return false;
             });
         });
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
index 67c7642d103..48200c1de88 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -54,7 +55,7 @@ public class ItKeyValueBinaryViewApiExplicitRunInTxnTest 
extends ItKeyValueBinar
     private static class TxTestCase extends TestCase {
         @Override
         protected Executable wrap(Consumer<Transaction> run) {
-            return () -> ignite.transactions().runInTransaction(run);
+            return () -> ignite.transactions().runInTransaction(run, new 
TransactionOptions().timeoutMillis(1));
         }
 
         TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> 
view, TestTableDefinition tableDefinition, Ignite ignite) {
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 58f3da7f5c4..55c6281a3be 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -533,21 +533,24 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         RecordView<Tuple> view = accounts.recordView();
         view.upsert(null, makeValue(1, balance));
 
-        CompletableFuture<Double> fut0 = 
igniteTransactions.runInTransactionAsync(tx -> {
-            CompletableFuture<Double> fut = view.getAsync(tx, makeKey(1))
-                    .thenCompose(val2 -> {
-                        double prev = val2.doubleValue("balance");
-                        return view.upsertAsync(tx, makeValue(1, delta + 
20)).thenApply(ignored -> prev);
-                    });
-
-            fut.join();
-
-            if (true) {
-                throw new IllegalArgumentException();
-            }
+        CompletableFuture<Double> fut0 = 
igniteTransactions.runInTransactionAsync(
+                tx -> {
+                        CompletableFuture<Double> fut = view.getAsync(tx, 
makeKey(1))
+                                .thenCompose(val2 -> {
+                                    double prev = val2.doubleValue("balance");
+                                    return view.upsertAsync(tx, makeValue(1, 
delta + 20))
+                                            .thenApply(ignored -> prev);
+                                })
+                                .whenComplete((res, ex) -> log.info("Test: tx 
operations in tx closures completed, ex=" + ex));
+
+                        if (true) {
+                            throw new IllegalArgumentException();
+                        }
 
-            return fut;
-        });
+                        return fut;
+                },
+                new TransactionOptions().timeoutMillis(1000)
+        );
 
         var err = assertThrows(CompletionException.class, fut0::join);
 
@@ -567,13 +570,15 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
     public void testTxClosureUncaughtExceptionInChainAsync() {
         RecordView<Tuple> view = accounts.recordView();
 
-        CompletableFuture<Double> fut0 = 
igniteTransactions.runInTransactionAsync(tx -> {
-            return view.getAsync(tx, makeKey(2))
-                    .thenCompose(val2 -> {
-                        double prev = val2.doubleValue("balance"); // val2 is 
null - NPE is thrown here
-                        return view.upsertAsync(tx, makeValue(1, 
100)).thenApply(ignored -> prev);
-                    });
-        });
+        CompletableFuture<Double> fut0 = igniteTransactions
+                .runInTransactionAsync(
+                        tx -> view.getAsync(tx, makeKey(2))
+                            .thenCompose(val2 -> {
+                                double prev = val2.doubleValue("balance"); // 
val2 is null - NPE is thrown here
+                                return view.upsertAsync(tx, makeValue(1, 
100)).thenApply(ignored -> prev);
+                            }),
+                        new TransactionOptions().timeoutMillis(1000)
+                );
 
         var err = assertThrows(CompletionException.class, fut0::join);
 
@@ -644,7 +649,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
 
         var futUpd2 = table2.upsertAllAsync(tx1, rows2);
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> {
+        assertTrue(waitForCondition(() -> {
             boolean lockUpgraded = false;
 
             for (Iterator<Lock> it = 
txManager(accounts).lockManager().locks(tx1.id()); it.hasNext(); ) {
@@ -926,12 +931,18 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
 
         accounts.recordView().upsert(null, makeValue(2, 100.));
 
-        assertThrows(RuntimeException.class, () -> 
igniteTransactions.runInTransaction((Consumer<Transaction>) tx -> {
-            assertNotNull(accounts.recordView().get(tx, key2));
-            assertTrue(accounts.recordView().delete(tx, key2));
-            assertNull(accounts.recordView().get(tx, key2));
-            throw new RuntimeException(); // Triggers rollback.
-        }));
+        assertThrows(
+                RuntimeException.class,
+                () -> igniteTransactions.runInTransaction(
+                        (Consumer<Transaction>) tx -> {
+                                assertNotNull(accounts.recordView().get(tx, 
key2));
+                                assertTrue(accounts.recordView().delete(tx, 
key2));
+                                assertNull(accounts.recordView().get(tx, 
key2));
+                                throw new RuntimeException(); // Triggers 
rollback.
+                        },
+                        new TransactionOptions().timeoutMillis(1000)
+                )
+        );
 
         assertNotNull(accounts.recordView().get(null, key2));
         assertTrue(accounts.recordView().delete(null, key2));
@@ -997,7 +1008,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         validateBalance(txAcc2.getAll(tx1, List.of(makeKey(2), makeKey(1))), 
200., 300.);
         validateBalance(txAcc2.getAll(tx1, List.of(makeKey(1), makeKey(2))), 
300., 200.);
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED == 
tx2.state(), 5_000), tx2.state().toString());
+        assertTrue(waitForCondition(() -> TxState.ABORTED == tx2.state(), 
5_000), tx2.state().toString());
 
         tx1.commit();
 
@@ -1017,7 +1028,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
             if (true) {
                 throw new IgniteException(INTERNAL_ERR, "Test error");
             }
-        }));
+        }, new TransactionOptions().timeoutMillis(1000)));
 
         assertNull(accounts.recordView().get(null, makeKey(3)));
         assertNull(accounts.recordView().get(null, makeKey(4)));
@@ -1243,8 +1254,8 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test2", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(customers).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
+        assertTrue(waitForCondition(() -> lockManager(customers).isEmpty(), 
10_000));
     }
 
     @Test
@@ -1282,7 +1293,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test2", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
     }
 
     @Test
@@ -1320,7 +1331,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test2", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
     }
 
     @Test
@@ -1338,7 +1349,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test2", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
     }
 
     @Test
@@ -1356,7 +1367,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(100., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
     }
 
     @Test
@@ -1375,7 +1386,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test2", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(200., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
     }
 
     @Test
@@ -1394,7 +1405,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals("test", customers.recordView().get(null, 
makeKey(1)).stringValue("name"));
         assertEquals(100., accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
lockManager(accounts).isEmpty(), 10_000));
+        assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 
10_000));
     }
 
     @Test
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
index 9586a268419..35118be5896 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.tx.configuration.TransactionConfigurationSchema;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -76,6 +77,10 @@ public class InternalTxOptions {
     public static class Builder {
         private TxPriority priority = TxPriority.NORMAL;
 
+        /**
+         * This is NOT actually used as the default timeout, see defaults for 
{@link TransactionConfigurationSchema#readOnlyTimeoutMillis}
+         * and {@link TransactionConfigurationSchema#readWriteTimeoutMillis} 
which are actually used if tx timeout is 0.
+         */
         private long timeoutMillis = 0;
 
         @Nullable
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
index ab1919a5375..0e4d0760f18 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.tx;
 
+import org.apache.ignite.tx.RetriableTransactionException;
+
 /**
  * This exception is thrown when a lock cannot be acquired, released or 
downgraded.
  */
-public class LockException extends TransactionInternalCheckedException {
+public class LockException extends TransactionInternalCheckedException 
implements RetriableTransactionException {
     /**
      * Creates a new instance of LockException with the given message.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
index 6af2318c298..953a14515fe 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.tx.configuration;
 
+import static 
org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS;
+
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.annotation.Config;
 import org.apache.ignite.configuration.annotation.ConfigValue;
@@ -39,7 +41,7 @@ public class TransactionConfigurationSchema {
     @Range(min = 1)
     @Value(hasDefault = true)
     @PublicName(legacyNames = "readWriteTimeout")
-    public final long readWriteTimeoutMillis = TimeUnit.SECONDS.toMillis(30);
+    public final long readWriteTimeoutMillis = 
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS);
 
     // Deprecated properties
     /** How often abandoned transactions are searched for (milliseconds). */
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 9d4dd6b8b8f..ae4f2d36719 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -180,6 +180,7 @@ public class PersistentTxStateVacuumizer {
         return hasCause(e,
                 PrimaryReplicaMissException.class,
                 NodeStoppingException.class,
+                ComponentStoppingException.class,
                 GroupOverloadedException.class,
                 // AwaitReplicaTimeoutException can be thrown from 
ReplicaService on receiver node, when there
                 // is no replica. This may happen if it was removed after 
getting the primary replica but before the message was received
@@ -189,8 +190,7 @@ public class PersistentTxStateVacuumizer {
                 // the persistent tx state.
                 // Also, replica calls from PersistentTxStateVacuumizer are 
local, so retry with new primary replica most likely will
                 // happen on another node.
-                AwaitReplicaTimeoutException.class,
-                ComponentStoppingException.class
+                AwaitReplicaTimeoutException.class
         );
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
index 923a6437467..92130f03f70 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
@@ -23,10 +23,11 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /** Unchecked exception that is thrown when primary replica has expired. */
-public class PrimaryReplicaExpiredException extends IgniteInternalException {
+public class PrimaryReplicaExpiredException extends IgniteInternalException 
implements RetriableTransactionException {
     /**
      * The constructor.
      *

Reply via email to