This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4cd0fd51eec IGNITE-17838 Implement runInTransaction automatic retries (#6413) 4cd0fd51eec is described below commit 4cd0fd51eec2d04c889955a83bfc3f600f16a0bc Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Wed Aug 27 10:44:29 2025 +0300 IGNITE-17838 Implement runInTransaction automatic retries (#6413) --- .../ignite/tx/IgniteTransactionDefaults.java} | 18 +- .../org/apache/ignite/tx/IgniteTransactions.java | 112 ++++--- .../ignite/tx/RetriableTransactionException.java} | 15 +- .../ignite/tx/RunInTransactionInternalImpl.java | 360 ++++++++++++++++++++ .../ignite/tx/RunInTransactionRetryTest.java | 367 +++++++++++++++++++++ .../internal/lang/ComponentStoppingException.java | 3 +- .../internal/lang/NodeStoppingException.java | 3 +- .../ignite/internal/util/FastTimestamps.java | 6 +- .../network/UnresolvableConsistentIdException.java | 3 +- .../PrimaryReplicaAwaitException.java | 3 +- .../PrimaryReplicaAwaitTimeoutException.java | 3 +- .../exception/PrimaryReplicaMissException.java | 4 +- .../replicator/exception/ReplicationException.java | 3 +- .../ItTxDistributedCleanupRecoveryTest.java | 4 +- ...tKeyValueBinaryViewApiExplicitRunInTxnTest.java | 3 +- .../ignite/internal/table/TxAbstractTest.java | 87 ++--- .../ignite/internal/tx/InternalTxOptions.java | 5 + .../apache/ignite/internal/tx/LockException.java | 4 +- .../TransactionConfigurationSchema.java | 4 +- .../tx/impl/PersistentTxStateVacuumizer.java | 4 +- .../tx/impl/PrimaryReplicaExpiredException.java | 3 +- 21 files changed, 883 insertions(+), 131 deletions(-) diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java similarity index 59% copy from modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java copy to modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java index cfe4e181ed7..f6abdf37349 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java @@ -15,18 +15,14 @@ * limitations under the License. */ -package org.apache.ignite.internal.network; - -import org.apache.ignite.lang.ErrorGroups.Network; -import org.apache.ignite.lang.IgniteException; -import org.apache.ignite.network.ClusterNode; +package org.apache.ignite.tx; /** - * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} instance (i.e. when - * there is no node with such consistent ID in the physical topology). + * Utility class containing transaction default constants. */ -public class UnresolvableConsistentIdException extends IgniteException { - public UnresolvableConsistentIdException(String msg) { - super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg); - } +public class IgniteTransactionDefaults { + /** + * Default transaction timeout. + */ + public static final long DEFAULT_RW_TX_TIMEOUT_SECONDS = 30; } diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index 51cd9641647..0527e93e4d5 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -17,11 +17,13 @@ package org.apache.ignite.tx; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.function.Function.identity; +import static org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS; +import static org.apache.ignite.tx.RunInTransactionInternalImpl.runInTransactionAsyncInternal; +import static org.apache.ignite.tx.RunInTransactionInternalImpl.runInTransactionInternal; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import org.apache.ignite.table.Table; @@ -115,7 +117,7 @@ public interface IgniteTransactions { * } * </pre> * - * <p>The correct variant will be: + * <p>The <b>correct</b> variant will be: * <pre> * {@code * igniteTransactions.runInTransaction(tx -> { @@ -125,7 +127,12 @@ public interface IgniteTransactions { * } * </pre> * - * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. + * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. In a case of exception, the + * closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout + * expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead. + * <br> + * The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException}, + * exceptions related to the primary replica change, etc. * * @param clo The closure. * @@ -153,7 +160,7 @@ public interface IgniteTransactions { * } * </pre> * - * <p>The correct variant will be: + * <p>The <b>correct</b> variant will be: * <pre> * {@code * igniteTransactions.runInTransaction(tx -> { @@ -163,7 +170,12 @@ public interface IgniteTransactions { * } * </pre> * - * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. + * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. In a case of exception, the + * closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout + * expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead. + * <br> + * The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException}, + * exceptions related to the primary replica change, etc. * * @param options Transaction options. * @param clo The closure. @@ -197,7 +209,7 @@ public interface IgniteTransactions { * } * </pre> * - * <p>The correct variant will be: + * <p>The <b>correct</b> variant will be: * <pre> * {@code * igniteTransactions.runInTransaction(tx -> { @@ -207,7 +219,12 @@ public interface IgniteTransactions { * } * </pre> * - * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. + * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. In a case of exception, the + * closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout + * expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead. + * <br> + * The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException}, + * exceptions related to the primary replica change, etc. * * @param clo Closure. * @param <T> Closure result type. @@ -237,7 +254,7 @@ public interface IgniteTransactions { * } * </pre> * - * <p>The correct variant will be: + * <p>The <b>correct</b> variant will be: * <pre> * {@code * igniteTransactions.runInTransaction(tx -> { @@ -247,7 +264,12 @@ public interface IgniteTransactions { * } * </pre> * - * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. + * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed. In a case of exception, the + * closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout + * expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead. + * <br> + * The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException}, + * exceptions related to the primary replica change, etc. * * @param clo The closure. * @param options Transaction options. @@ -257,26 +279,11 @@ public interface IgniteTransactions { * @throws TransactionException If a transaction can't be finished successfully. */ default <T> T runInTransaction(Function<Transaction, T> clo, @Nullable TransactionOptions options) throws TransactionException { - Objects.requireNonNull(clo); - - Transaction tx = begin(options); - - try { - T ret = clo.apply(tx); - - tx.commit(); - - return ret; - } catch (Throwable t) { - // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries - try { - tx.rollback(); // Try rolling back on user exception. - } catch (Exception e) { - t.addSuppressed(e); - } - - throw t; - } + // This start timestamp is not related to transaction's begin timestamp and only serves as local time for counting the timeout of + // possible retries. + long startTimestamp = System.currentTimeMillis(); + long initialTimeout = options == null ? TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : options.timeoutMillis(); + return runInTransactionInternal(this, clo, options, startTimestamp, initialTimeout); } /** @@ -293,7 +300,12 @@ public interface IgniteTransactions { * } * </pre> * - * <p>If the asynchronous chain resulted in no exception, the commitAsync will be automatically called. + * <p>If the asynchronous chain resulted with no exception, the commitAsync will be automatically called. In a case of exception, the + * closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout + * expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead. + * <br> + * The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException}, + * exceptions related to the primary replica change, etc. * * @param clo The closure. * @param <T> Closure result type. @@ -317,7 +329,13 @@ public interface IgniteTransactions { * } * </pre> * - * <p>If the asynchronous chain resulted in no exception, the commitAsync will be automatically called. + * <p>If the asynchronous chain resulted with no exception, the commitAsync will be automatically called. In a case of exception, the + * closure will be retried automatically within the transaction timeout, so it must be pure function. If the transaction timeout + * expires before the closure completes successfully and the transaction has been committed, the transaction is rolled back instead. + * <br> + * The closure is retried only in cases of "expected" exceptions, like {@code LockException}, {@code TimeoutException}, + * exceptions related to the primary replica change, etc. + * * * @param clo The closure. * @param options Transaction options. @@ -326,28 +344,12 @@ public interface IgniteTransactions { */ default <T> CompletableFuture<T> runInTransactionAsync( Function<Transaction, CompletableFuture<T>> clo, - @Nullable TransactionOptions options) { - Objects.requireNonNull(clo); - - // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries - return beginAsync(options).thenCompose(tx -> { - try { - return clo.apply(tx).handle((res, e) -> { - if (e != null) { - return tx.rollbackAsync().exceptionally(e0 -> { - e.addSuppressed(e0); - return null; - }).thenCompose(ignored -> CompletableFuture.<T>failedFuture(e)); - } - - return completedFuture(res); - }).thenCompose(identity()).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val)); - } catch (Exception e) { - return tx.rollbackAsync().exceptionally(e0 -> { - e.addSuppressed(e0); - return null; - }).thenCompose(ignored -> CompletableFuture.failedFuture(e)); - } - }); + @Nullable TransactionOptions options + ) { + // This start timestamp is not related to transaction's begin timestamp and only serves as local time for counting the timeout of + // possible retries. + long startTimestamp = System.currentTimeMillis(); + long initialTimeout = options == null ? TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : options.timeoutMillis(); + return runInTransactionAsyncInternal(this, clo, options, startTimestamp, initialTimeout, null); } } diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java b/modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java similarity index 59% copy from modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java copy to modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java index cfe4e181ed7..0a50878901e 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java @@ -15,18 +15,11 @@ * limitations under the License. */ -package org.apache.ignite.internal.network; - -import org.apache.ignite.lang.ErrorGroups.Network; -import org.apache.ignite.lang.IgniteException; -import org.apache.ignite.network.ClusterNode; +package org.apache.ignite.tx; /** - * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} instance (i.e. when - * there is no node with such consistent ID in the physical topology). + * This is the marker interface for exceptions that can be retried if happened in {@link IgniteTransactions#runInTransaction} and + * {@link IgniteTransactions#runInTransactionAsync}. */ -public class UnresolvableConsistentIdException extends IgniteException { - public UnresolvableConsistentIdException(String msg) { - super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg); - } +public interface RetriableTransactionException { } diff --git a/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java b/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java new file mode 100644 index 00000000000..c67797cc4ef --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tx; + +import static java.util.Collections.synchronizedList; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.function.Function.identity; +import static org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import org.jetbrains.annotations.Nullable; + +/** + * This is, in fact, the default implementation of the {@link IgniteTransactions#runInTransaction} and + * {@link IgniteTransactions#runInTransactionAsync}, moved from the separate class to avoid the interface overloading. This + * implementation is common for both client and embedded {@link IgniteTransactions}. + */ +class RunInTransactionInternalImpl { + private static final int MAX_SUPPRESSED = 100; + + static <T> T runInTransactionInternal( + IgniteTransactions igniteTransactions, + Function<Transaction, T> clo, + @Nullable TransactionOptions options, + long startTimestamp, + long initialTimeout + ) throws TransactionException { + Objects.requireNonNull(clo); + + TransactionOptions txOptions = options == null + ? new TransactionOptions().timeoutMillis(TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS)) + : options; + + List<Throwable> suppressed = new ArrayList<>(); + + Transaction tx; + T ret; + + while (true) { + tx = igniteTransactions.begin(txOptions); + + try { + ret = clo.apply(tx); + + break; + } catch (Exception ex) { + addSuppressedToList(suppressed, ex); + + long remainingTime = calcRemainingTime(initialTimeout, startTimestamp); + + if (remainingTime > 0 && isRetriable(ex)) { + // Rollback on user exception, should be retried until success or timeout to ensure the lock release + // before the next attempt. + rollbackWithRetry(tx, ex, startTimestamp, initialTimeout, suppressed); + + long remaining = calcRemainingTime(initialTimeout, startTimestamp); + + if (remaining > 0) { + // Will go on retry iteration. + txOptions = txOptions.timeoutMillis(remainingTime); + } else { + throwExceptionWithSuppressed(ex, suppressed); + } + } else { + try { + // No retries here, rely on the durable finish. + tx.rollback(); + } catch (Exception e) { + addSuppressedToList(suppressed, e); + } + + throwExceptionWithSuppressed(ex, suppressed); + } + } + } + + try { + tx.commit(); + } catch (Exception e) { + try { + // Try to rollback tx in case if it's not finished. Retry is not needed here due to the durable finish. + tx.rollback(); + } catch (Exception re) { + e.addSuppressed(re); + } + + throw e; + } + + return ret; + } + + private static void rollbackWithRetry( + Transaction tx, + Exception closureException, + long startTimestamp, + long initialTimeout, + List<Throwable> suppressed + ) { + while (true) { + try { + tx.rollback(); + + break; + } catch (Exception re) { + addSuppressedToList(suppressed, re); + + if (calcRemainingTime(initialTimeout, startTimestamp) <= 0) { + throwExceptionWithSuppressed(closureException, suppressed); + } + } + } + } + + static <T> CompletableFuture<T> runInTransactionAsyncInternal( + IgniteTransactions igniteTransactions, + Function<Transaction, CompletableFuture<T>> clo, + @Nullable TransactionOptions options, + long startTimestamp, + long initialTimeout, + @Nullable List<Throwable> suppressed + ) { + Objects.requireNonNull(clo); + + TransactionOptions txOptions = options == null + ? new TransactionOptions().timeoutMillis(TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS)) + : options; + + List<Throwable> sup = suppressed == null ? synchronizedList(new ArrayList<>()) : suppressed; + + return igniteTransactions + .beginAsync(txOptions) + // User closure with retries. + .thenCompose(tx -> { + try { + return clo.apply(tx) + .handle((res, e) -> { + if (e != null) { + return handleClosureException( + igniteTransactions, + tx, + clo, + txOptions, + startTimestamp, + initialTimeout, + sup, + e + ); + } else { + return completedFuture(res); + } + }) + .thenCompose(identity()) + .thenApply(res -> new TxWithVal<>(tx, res)); + } catch (Exception e) { + return handleClosureException(igniteTransactions, tx, clo, txOptions, startTimestamp, initialTimeout, sup, e) + .thenApply(res -> new TxWithVal<>(tx, res)); + } + }) + // Transaction commit with rollback on failure, without retries. + // Transaction rollback on closure failure is implemented in closure retry logic. + .thenCompose(txWithVal -> + txWithVal.tx.commitAsync() + .handle((ignored, e) -> { + if (e == null) { + return completedFuture(null); + } else { + return txWithVal.tx.rollbackAsync() + // Rethrow commit exception. + .handle((ign, re) -> sneakyThrow(e)); + } + }) + .thenCompose(fut -> fut) + .thenApply(ignored -> txWithVal.val) + ); + } + + private static <T> CompletableFuture<T> handleClosureException( + IgniteTransactions igniteTransactions, + Transaction currentTx, + Function<Transaction, CompletableFuture<T>> clo, + TransactionOptions txOptions, + long startTimestamp, + long initialTimeout, + List<Throwable> suppressed, + Throwable e + ) { + addSuppressedToList(suppressed, e); + + long remainingTime = calcRemainingTime(initialTimeout, startTimestamp); + + if (remainingTime > 0 && isRetriable(e)) { + // Rollback on user exception, should be retried until success or timeout to ensure the lock release + // before the next attempt. + return rollbackWithRetryAsync(currentTx, startTimestamp, initialTimeout, suppressed, e) + .thenCompose(ignored -> { + long remaining = calcRemainingTime(initialTimeout, startTimestamp); + + if (remaining > 0) { + TransactionOptions opt = txOptions.timeoutMillis(remaining); + + return runInTransactionAsyncInternal( + igniteTransactions, + clo, + opt, + startTimestamp, + initialTimeout, + suppressed + ); + } else { + return throwExceptionWithSuppressedAsync(e, suppressed) + .thenApply(ign -> null); + } + }); + } else { + // No retries here, rely on the durable finish. + return currentTx.rollbackAsync() + .exceptionally(re -> { + addSuppressedToList(suppressed, re); + + return null; + }) + .thenCompose(ignored -> throwExceptionWithSuppressedAsync(e, suppressed)) + // Never executed. + .thenApply(ignored -> null); + } + } + + private static CompletableFuture<Void> rollbackWithRetryAsync( + Transaction tx, + long startTimestamp, + long initialTimeout, + List<Throwable> suppressed, + Throwable e + ) { + return tx.rollbackAsync() + .handle((ignored, re) -> { + CompletableFuture<Void> fut; + + if (re == null) { + fut = completedFuture(null); + } else { + addSuppressedToList(suppressed, re); + + if (calcRemainingTime(initialTimeout, startTimestamp) <= 0) { + for (Throwable s : suppressed) { + addSuppressed(e, s); + } + + fut = failedFuture(e); + } else { + fut = rollbackWithRetryAsync(tx, startTimestamp, initialTimeout, suppressed, e); + } + } + + return fut; + }) + .thenCompose(identity()); + } + + private static void addSuppressedToList(List<Throwable> to, Throwable a) { + if (to.size() < MAX_SUPPRESSED) { + to.add(a); + } + } + + private static void addSuppressed(Throwable to, Throwable a) { + if (to != null && a != null && to != a && to.getSuppressed().length < MAX_SUPPRESSED) { + to.addSuppressed(a); + } + } + + private static void throwExceptionWithSuppressed(Throwable e, List<Throwable> suppressed) { + for (Throwable t : suppressed) { + addSuppressed(e, t); + } + + sneakyThrow(e); + } + + private static CompletableFuture<Void> throwExceptionWithSuppressedAsync(Throwable e, List<Throwable> suppressed) { + for (Throwable t : suppressed) { + addSuppressed(e, t); + } + + return failedFuture(e); + } + + private static boolean isRetriable(Throwable e) { + return hasCause(e, + TimeoutException.class, + RetriableTransactionException.class + ); + } + + private static boolean hasCause(Throwable e, Class<?>... classes) { + Set<Throwable> processed = new HashSet<>(); + + Throwable cause = e; + while (cause != null) { + if (!processed.add(cause)) { + break; + } + + for (Class<?> cls : classes) { + if (cls.isAssignableFrom(cause.getClass())) { + return true; + } + } + + cause = cause.getCause(); + } + + return false; + } + + private static long calcRemainingTime(long initialTimeout, long startTimestamp) { + long now = System.currentTimeMillis(); + long remainingTime = initialTimeout - (now - startTimestamp); + return remainingTime; + } + + private static <E extends Throwable> E sneakyThrow(Throwable e) throws E { + throw (E) e; + } + + private static class TxWithVal<T> { + private final Transaction tx; + private final T val; + + private TxWithVal(Transaction tx, T val) { + this.tx = tx; + this.val = val; + } + } +} diff --git a/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java b/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java new file mode 100644 index 00000000000..c629eba22fd --- /dev/null +++ b/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tx; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.cartesian.ArgumentSets; +import org.junitpioneer.jupiter.cartesian.CartesianTest; + +/** + * Tests {@link IgniteTransactions#runInTransaction} and {@link IgniteTransactions#runInTransactionAsync} retries. + */ +public class RunInTransactionRetryTest { + private static final int SHORT_TIMEOUT_MILLIS = 100; + + private long testStartTime; + + @BeforeEach + public void setUp() { + testStartTime = System.currentTimeMillis(); + } + + /** + * Tests the different scenarios of retries. + */ + @CartesianTest + @CartesianTest.MethodFactory("testRetriesArgFactory") + public void testRetries( + boolean async, + int closureFailureCount, + int commitFailureCount, + int rollbackFailureCount, + ClosureFailureType closureFailureType + ) { + var closureFailures = new AtomicInteger(closureFailureCount); + var igniteTransactions = new MockIgniteTransactions(commitFailureCount, rollbackFailureCount); + + Supplier<CompletableFuture<Integer>> testClosure; + + if (async) { + testClosure = () -> igniteTransactions.runInTransactionAsync( + tx -> closureCall(tx, closureFailures, closureFailureType), + withShortTimeout() + ); + } else { + // closureFailureType is always SYNC in sync mode (closure shouldn't return future). + testClosure = () -> igniteTransactions.runInTransaction( + (Function<Transaction, CompletableFuture<Integer>>) tx -> + closureCall(tx, closureFailures, ClosureFailureType.SYNC_FAIL), + withShortTimeout() + ); + } + + boolean requiresEventualSuccess = closureFailureCount < Integer.MAX_VALUE + // Commit failure can't be retried. + && commitFailureCount == 0 + // Rollbacks should be retried until success or timeout, so the rollback must succeed before closure retry. + && (closureFailureCount == 0 || rollbackFailureCount < Integer.MAX_VALUE); + + boolean syncFail = false; + Exception ex = null; + + CompletableFuture<Integer> future = null; + + try { + future = testClosure.get(); + } catch (Exception e) { + syncFail = true; + ex = e; + } + + if (!syncFail) { + try { + future.join(); + + // Closure succeeded, check that it's expected. + assertTrue(requiresEventualSuccess); + } catch (Exception e) { + ex = e; + } + } + + if (requiresEventualSuccess) { + assertEquals(42, future.join()); + } else { + if (closureFailureCount == Integer.MAX_VALUE) { + // Had to retry until timed out. + checkTimeout(); + } + + assertNotNull(ex); + + if (!async) { + assertTrue(syncFail); + + if (commitFailureCount > 0) { + if (closureFailureCount == Integer.MAX_VALUE || closureFailureCount > 0 && rollbackFailureCount == Integer.MAX_VALUE) { + // Closure exception should be rethrown. + assertThat(ex, instanceOf(FailedClosureTestException.class)); + } else { + assertThat(ex, instanceOf(FailedCommitTestException.class)); + } + } else { + assertThat(ex, instanceOf(FailedClosureTestException.class)); + } + } else { + assertFalse(syncFail); + + assertThat(ex, instanceOf(CompletionException.class)); + assertThat(ex.getCause(), instanceOf(Exception.class)); + Exception cause = (Exception) ex.getCause(); + + assertTrue( + cause instanceof FailedClosureTestException || cause instanceof FailedCommitTestException + ); + } + } + } + + @SuppressWarnings("unused") + private static ArgumentSets testRetriesArgFactory() { + return ArgumentSets.argumentsForFirstParameter(true, false) + .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE) + .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE) + .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE) + .argumentsForNextParameter(ClosureFailureType.SYNC_FAIL, ClosureFailureType.FUTURE_FAIL); + } + + @Test + public void testNoRetryAfterTimeout() { + var igniteTransactions = new MockIgniteTransactions(0, 0); + + AtomicBoolean runned = new AtomicBoolean(); + + assertThrows( + FailedClosureTestException.class, + () -> igniteTransactions.runInTransaction( + (Consumer<Transaction>) tx -> { + assertFalse(runned.get()); + runned.set(true); + sleep(100); + throw new FailedClosureTestException(); + }, + new TransactionOptions().timeoutMillis(1) + ) + ); + } + + @Test + public void testNoRetryAfterTimeoutAsync() { + var igniteTransactions = new MockIgniteTransactions(0, 0); + + AtomicBoolean runned = new AtomicBoolean(); + + CompletableFuture<Integer> future = igniteTransactions.runInTransactionAsync( + tx -> { + assertFalse(runned.get()); + runned.set(true); + sleep(100); + throw new FailedClosureTestException(); + }, + new TransactionOptions().timeoutMillis(1) + ); + + try { + future.join(); + fail(); + } catch (Exception e) { + assertThat(e, instanceOf(CompletionException.class)); + assertThat(e.getCause(), instanceOf(FailedClosureTestException.class)); + } + } + + private static TransactionOptions withShortTimeout() { + return new TransactionOptions().timeoutMillis(SHORT_TIMEOUT_MILLIS); + } + + private void checkTimeout() { + long now = System.currentTimeMillis(); + long duration = now - testStartTime; + // Assuming that at least 80% of timeout has passed (assuming currentTimeMillis inaccuracy). + assertThat("duration was: " + duration, duration, greaterThan((long) (SHORT_TIMEOUT_MILLIS * 0.8))); + } + + private static CompletableFuture<Integer> closureCall( + Transaction tx, + AtomicInteger closureFailures, + ClosureFailureType closureFailureType + ) { + assertNotNull(tx); + assertFalse(isFinished(tx)); + + if (closureFailures.get() > 0) { + closureFailures.decrementAndGet(); + + if (closureFailureType == ClosureFailureType.SYNC_FAIL) { + throw new FailedClosureTestException(); + } else if (closureFailureType == ClosureFailureType.FUTURE_FAIL) { + return failedFuture(new FailedClosureTestException()); + } else { + throw new AssertionError("unknown failure type"); + } + } else { + return completedFuture(42); + } + } + + private static boolean isFinished(Transaction tx) { + assertInstanceOf(MockTransaction.class, tx); + return ((MockTransaction) tx).finished; + } + + private static void sleep(long duration) { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * If closure should fail, this enum defines the type of failure. + */ + private enum ClosureFailureType { + /** Closure throws exception. */ + SYNC_FAIL, + + /** Closure returns failed future. */ + FUTURE_FAIL + } + + private static class MockIgniteTransactions implements IgniteTransactions { + final AtomicInteger commitsToFail; + final AtomicInteger rollbacksToFail; + + MockIgniteTransactions(int commitsToFail, int rollbacksToFail) { + this.commitsToFail = new AtomicInteger(commitsToFail); + this.rollbacksToFail = new AtomicInteger(rollbacksToFail); + } + + @Override + public Transaction begin(@Nullable TransactionOptions options) { + return new MockTransaction(commitsToFail, rollbacksToFail); + } + + @Override + public CompletableFuture<Transaction> beginAsync(@Nullable TransactionOptions options) { + return completedFuture(begin(options)); + } + } + + private static class MockTransaction implements Transaction { + final AtomicInteger commitsToFail; + final AtomicInteger rollbacksToFail; + boolean finished; + + MockTransaction(AtomicInteger commitsToFail, AtomicInteger rollbacksToFail) { + this.commitsToFail = commitsToFail; + this.rollbacksToFail = rollbacksToFail; + } + + @Override + public void commit() throws TransactionException { + try { + commitAsync().join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw e; + } + } + } + + @Override + public CompletableFuture<Void> commitAsync() { + sleep(1); + + if (commitsToFail.get() > 0) { + commitsToFail.decrementAndGet(); + return failedFuture(new FailedCommitTestException()); + } else { + finished = true; + return completedFuture(null); + } + } + + @Override + public void rollback() throws TransactionException { + try { + rollbackAsync().join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw e; + } + } + } + + @Override + public CompletableFuture<Void> rollbackAsync() { + sleep(1); + + if (rollbacksToFail.get() > 0) { + rollbacksToFail.decrementAndGet(); + return failedFuture(new FailedRollbackTestException()); + } else { + finished = true; + return completedFuture(null); + } + } + + @Override + public boolean isReadOnly() { + return false; + } + } + + private static class FailedClosureTestException extends RuntimeException implements RetriableTransactionException { + // No-op. + } + + private static class FailedCommitTestException extends RuntimeException implements RetriableTransactionException { + // No-op. + } + + private static class FailedRollbackTestException extends RuntimeException implements RetriableTransactionException { + // No-op. + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java index 02e7759ae49..06354bfe440 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.lang; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import java.util.UUID; +import org.apache.ignite.tx.RetriableTransactionException; import org.jetbrains.annotations.Nullable; /** @@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable; * This is different from {@link NodeStoppingException} as {@link ComponentStoppingException} might mean that just the component is stopped, * not the whole node. */ -public class ComponentStoppingException extends IgniteInternalCheckedException { +public class ComponentStoppingException extends IgniteInternalCheckedException implements RetriableTransactionException { /** Serial version UID. */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java index 8983ab6e92a..35fd72fc99f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java @@ -20,12 +20,13 @@ package org.apache.ignite.internal.lang; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; import java.util.UUID; +import org.apache.ignite.tx.RetriableTransactionException; import org.jetbrains.annotations.Nullable; /** * This exception is used to indicate that Ignite node is stopping (already stopped) for some reason. */ -public class NodeStoppingException extends IgniteInternalCheckedException { +public class NodeStoppingException extends IgniteInternalCheckedException implements RetriableTransactionException { /** Serial version UID. */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java index 000d8c1e537..2a6019bc405 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java @@ -42,7 +42,11 @@ public class FastTimestamps { }); Runnable updaterTask = () -> { - coarseCurrentTimeMillis = System.currentTimeMillis(); + long now = System.currentTimeMillis(); + + if (now > coarseCurrentTimeMillis) { + coarseCurrentTimeMillis = now; + } // Safe-point-friendly hint. Thread.onSpinWait(); diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java index cfe4e181ed7..9057f16c764 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java @@ -20,12 +20,13 @@ package org.apache.ignite.internal.network; import org.apache.ignite.lang.ErrorGroups.Network; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.tx.RetriableTransactionException; /** * Thrown when consistent ID cannot be resolved to a {@link ClusterNode} instance (i.e. when * there is no node with such consistent ID in the physical topology). */ -public class UnresolvableConsistentIdException extends IgniteException { +public class UnresolvableConsistentIdException extends IgniteException implements RetriableTransactionException { public UnresolvableConsistentIdException(String msg) { super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg); } diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java index 5de7483175f..39ae5402dda 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java @@ -22,12 +22,13 @@ import static org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.tx.RetriableTransactionException; /** * The exception is thrown when a primary replica await process has failed. Please pay attention that there is a specific * {@link PrimaryReplicaAwaitTimeoutException} for the primary replica await timeout. */ -public class PrimaryReplicaAwaitException extends IgniteInternalException { +public class PrimaryReplicaAwaitException extends IgniteInternalException implements RetriableTransactionException { private static final long serialVersionUID = 1029917546884926160L; /** diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java index 35045f9b069..633a2d93cdc 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java @@ -22,12 +22,13 @@ import static org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.tx.RetriableTransactionException; import org.jetbrains.annotations.Nullable; /** * The exception is thrown when a primary replica await process has times out. */ -public class PrimaryReplicaAwaitTimeoutException extends IgniteInternalException { +public class PrimaryReplicaAwaitTimeoutException extends IgniteInternalException implements RetriableTransactionException { private static final long serialVersionUID = -1450288033816499192L; /** diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java index 19ef18868b3..39dfff38cb9 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java @@ -22,12 +22,14 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; import java.util.UUID; import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.tx.RetriableTransactionException; import org.jetbrains.annotations.Nullable; /** * Unchecked exception that is thrown when a replica is not the current primary replica. */ -public class PrimaryReplicaMissException extends IgniteInternalException implements ExpectedReplicationException { +public class PrimaryReplicaMissException extends IgniteInternalException implements ExpectedReplicationException, + RetriableTransactionException { private static final long serialVersionUID = 8755220779942651494L; /** diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java index f6ead96f171..57a18d868d3 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java @@ -22,11 +22,12 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; import java.util.UUID; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.tx.RetriableTransactionException; /** * The exception is thrown when some issue happened during a replication. */ -public class ReplicationException extends IgniteInternalException { +public class ReplicationException extends IgniteInternalException implements RetriableTransactionException { /** * Constructor. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java index e7f9ab5307a..0deed062641 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java @@ -47,7 +47,6 @@ public class ItTxDistributedCleanupRecoveryTest extends TxAbstractTest { @BeforeEach @Override public void before() throws Exception { - // The value of 3 is less than the allowed number of cleanup retries. setDefaultRetryCount(3); txTestCluster = new ItTxTestCluster( @@ -74,10 +73,11 @@ public class ItTxDistributedCleanupRecoveryTest extends TxAbstractTest { DefaultMessagingService messagingService = (DefaultMessagingService) clusterService.messagingService(); messagingService.dropMessages((s, networkMessage) -> { if (networkMessage instanceof TxCleanupMessage && defaultRetryCount.getAndDecrement() > 0) { - logger().info("Dropping cleanup request: {}", networkMessage); + logger().info("Dropping cleanup request [message={}].", networkMessage);; return true; } + return false; }); }); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java index 67c7642d103..48200c1de88 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionOptions; import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -54,7 +55,7 @@ public class ItKeyValueBinaryViewApiExplicitRunInTxnTest extends ItKeyValueBinar private static class TxTestCase extends TestCase { @Override protected Executable wrap(Consumer<Transaction> run) { - return () -> ignite.transactions().runInTransaction(run); + return () -> ignite.transactions().runInTransaction(run, new TransactionOptions().timeoutMillis(1)); } TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple> view, TestTableDefinition tableDefinition, Ignite ignite) { diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 58f3da7f5c4..55c6281a3be 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -533,21 +533,24 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { RecordView<Tuple> view = accounts.recordView(); view.upsert(null, makeValue(1, balance)); - CompletableFuture<Double> fut0 = igniteTransactions.runInTransactionAsync(tx -> { - CompletableFuture<Double> fut = view.getAsync(tx, makeKey(1)) - .thenCompose(val2 -> { - double prev = val2.doubleValue("balance"); - return view.upsertAsync(tx, makeValue(1, delta + 20)).thenApply(ignored -> prev); - }); - - fut.join(); - - if (true) { - throw new IllegalArgumentException(); - } + CompletableFuture<Double> fut0 = igniteTransactions.runInTransactionAsync( + tx -> { + CompletableFuture<Double> fut = view.getAsync(tx, makeKey(1)) + .thenCompose(val2 -> { + double prev = val2.doubleValue("balance"); + return view.upsertAsync(tx, makeValue(1, delta + 20)) + .thenApply(ignored -> prev); + }) + .whenComplete((res, ex) -> log.info("Test: tx operations in tx closures completed, ex=" + ex)); + + if (true) { + throw new IllegalArgumentException(); + } - return fut; - }); + return fut; + }, + new TransactionOptions().timeoutMillis(1000) + ); var err = assertThrows(CompletionException.class, fut0::join); @@ -567,13 +570,15 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { public void testTxClosureUncaughtExceptionInChainAsync() { RecordView<Tuple> view = accounts.recordView(); - CompletableFuture<Double> fut0 = igniteTransactions.runInTransactionAsync(tx -> { - return view.getAsync(tx, makeKey(2)) - .thenCompose(val2 -> { - double prev = val2.doubleValue("balance"); // val2 is null - NPE is thrown here - return view.upsertAsync(tx, makeValue(1, 100)).thenApply(ignored -> prev); - }); - }); + CompletableFuture<Double> fut0 = igniteTransactions + .runInTransactionAsync( + tx -> view.getAsync(tx, makeKey(2)) + .thenCompose(val2 -> { + double prev = val2.doubleValue("balance"); // val2 is null - NPE is thrown here + return view.upsertAsync(tx, makeValue(1, 100)).thenApply(ignored -> prev); + }), + new TransactionOptions().timeoutMillis(1000) + ); var err = assertThrows(CompletionException.class, fut0::join); @@ -644,7 +649,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { var futUpd2 = table2.upsertAllAsync(tx1, rows2); - assertTrue(IgniteTestUtils.waitForCondition(() -> { + assertTrue(waitForCondition(() -> { boolean lockUpgraded = false; for (Iterator<Lock> it = txManager(accounts).lockManager().locks(tx1.id()); it.hasNext(); ) { @@ -926,12 +931,18 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { accounts.recordView().upsert(null, makeValue(2, 100.)); - assertThrows(RuntimeException.class, () -> igniteTransactions.runInTransaction((Consumer<Transaction>) tx -> { - assertNotNull(accounts.recordView().get(tx, key2)); - assertTrue(accounts.recordView().delete(tx, key2)); - assertNull(accounts.recordView().get(tx, key2)); - throw new RuntimeException(); // Triggers rollback. - })); + assertThrows( + RuntimeException.class, + () -> igniteTransactions.runInTransaction( + (Consumer<Transaction>) tx -> { + assertNotNull(accounts.recordView().get(tx, key2)); + assertTrue(accounts.recordView().delete(tx, key2)); + assertNull(accounts.recordView().get(tx, key2)); + throw new RuntimeException(); // Triggers rollback. + }, + new TransactionOptions().timeoutMillis(1000) + ) + ); assertNotNull(accounts.recordView().get(null, key2)); assertTrue(accounts.recordView().delete(null, key2)); @@ -997,7 +1008,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { validateBalance(txAcc2.getAll(tx1, List.of(makeKey(2), makeKey(1))), 200., 300.); validateBalance(txAcc2.getAll(tx1, List.of(makeKey(1), makeKey(2))), 300., 200.); - assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED == tx2.state(), 5_000), tx2.state().toString()); + assertTrue(waitForCondition(() -> TxState.ABORTED == tx2.state(), 5_000), tx2.state().toString()); tx1.commit(); @@ -1017,7 +1028,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { if (true) { throw new IgniteException(INTERNAL_ERR, "Test error"); } - })); + }, new TransactionOptions().timeoutMillis(1000))); assertNull(accounts.recordView().get(null, makeKey(3))); assertNull(accounts.recordView().get(null, makeKey(4))); @@ -1243,8 +1254,8 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(customers).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(customers).isEmpty(), 10_000)); } @Test @@ -1282,7 +1293,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); } @Test @@ -1320,7 +1331,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); } @Test @@ -1338,7 +1349,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); } @Test @@ -1356,7 +1367,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); } @Test @@ -1375,7 +1386,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); } @Test @@ -1394,7 +1405,7 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name")); assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); + assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000)); } @Test diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java index 9586a268419..35118be5896 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.tx; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.tx.configuration.TransactionConfigurationSchema; import org.jetbrains.annotations.Nullable; /** @@ -76,6 +77,10 @@ public class InternalTxOptions { public static class Builder { private TxPriority priority = TxPriority.NORMAL; + /** + * This is NOT actually used as the default timeout, see defaults for {@link TransactionConfigurationSchema#readOnlyTimeoutMillis} + * and {@link TransactionConfigurationSchema#readWriteTimeoutMillis} which are actually used if tx timeout is 0. + */ private long timeoutMillis = 0; @Nullable diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java index ab1919a5375..0e4d0760f18 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.tx; +import org.apache.ignite.tx.RetriableTransactionException; + /** * This exception is thrown when a lock cannot be acquired, released or downgraded. */ -public class LockException extends TransactionInternalCheckedException { +public class LockException extends TransactionInternalCheckedException implements RetriableTransactionException { /** * Creates a new instance of LockException with the given message. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java index 6af2318c298..953a14515fe 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.tx.configuration; +import static org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS; + import java.util.concurrent.TimeUnit; import org.apache.ignite.configuration.annotation.Config; import org.apache.ignite.configuration.annotation.ConfigValue; @@ -39,7 +41,7 @@ public class TransactionConfigurationSchema { @Range(min = 1) @Value(hasDefault = true) @PublicName(legacyNames = "readWriteTimeout") - public final long readWriteTimeoutMillis = TimeUnit.SECONDS.toMillis(30); + public final long readWriteTimeoutMillis = TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS); // Deprecated properties /** How often abandoned transactions are searched for (milliseconds). */ diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java index 9d4dd6b8b8f..ae4f2d36719 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java @@ -180,6 +180,7 @@ public class PersistentTxStateVacuumizer { return hasCause(e, PrimaryReplicaMissException.class, NodeStoppingException.class, + ComponentStoppingException.class, GroupOverloadedException.class, // AwaitReplicaTimeoutException can be thrown from ReplicaService on receiver node, when there // is no replica. This may happen if it was removed after getting the primary replica but before the message was received @@ -189,8 +190,7 @@ public class PersistentTxStateVacuumizer { // the persistent tx state. // Also, replica calls from PersistentTxStateVacuumizer are local, so retry with new primary replica most likely will // happen on another node. - AwaitReplicaTimeoutException.class, - ComponentStoppingException.class + AwaitReplicaTimeoutException.class ); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java index 923a6437467..92130f03f70 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java @@ -23,10 +23,11 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.tx.RetriableTransactionException; import org.jetbrains.annotations.Nullable; /** Unchecked exception that is thrown when primary replica has expired. */ -public class PrimaryReplicaExpiredException extends IgniteInternalException { +public class PrimaryReplicaExpiredException extends IgniteInternalException implements RetriableTransactionException { /** * The constructor. *