This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c25c060763 IGNITE-22130 Fix retries logic.
c25c060763 is described below
commit c25c060763a17dcf2a746d6c5d7bd4c150c391eb
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Mon May 13 16:50:28 2024 +0300
IGNITE-22130 Fix retries logic.
---
.../ignite/internal/util/ExceptionUtils.java | 24 ++
.../ignite/internal/replicator/ReplicaService.java | 36 ++-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../ignite/internal/table/ItTableScanTest.java | 6 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../distributed/storage/InternalTableImpl.java | 243 +++++++++++----------
.../apache/ignite/distributed/ItTxTestCluster.java | 7 +-
.../tx/TransactionExceptionMapperProvider.java | 46 ++++
9 files changed, 240 insertions(+), 131 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index dbad400afa..63a365689c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -567,6 +567,30 @@ public final class ExceptionUtils {
return INTERNAL_ERR;
}
+ /**
+ * Determine if a particular error matches any of passed error codes.
+ *
+ * @param t Unwrapped throwable.
+ * @param code The code.
+ * @param codes Other codes.
+ * @return {@code True} if exception allows retry.
+ */
+ public static boolean matchAny(Throwable t, int code, int... codes) {
+ int errCode = extractCodeFrom(t);
+
+ if (code == errCode) {
+ return true;
+ }
+
+ for (int c0 : codes) {
+ if (c0 == errCode) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
// TODO: https://issues.apache.org/jira/browse/IGNITE-19870
// This method should be removed or re-worked and usages should be changed
to IgniteExceptionMapperUtil.mapToPublicException.
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 245b3b4bdb..a174eae3b1 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -17,17 +17,22 @@
package org.apache.ignite.internal.replicator;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -45,10 +50,14 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/** The service is intended to execute requests on replicas. */
public class ReplicaService {
+ /** Retry timeout. */
+ private static final int RETRY_TIMEOUT_MILLIS = 10;
+
/** Message service. */
private final MessagingService messagingService;
@@ -59,6 +68,8 @@ public class ReplicaService {
private final ReplicationConfiguration replicationConfiguration;
+ private @Nullable final ScheduledExecutorService retryExecutor;
+
/** Requests to retry. */
private final Map<String, CompletableFuture<NetworkMessage>>
pendingInvokes = new ConcurrentHashMap<>();
@@ -82,7 +93,8 @@ public class ReplicaService {
messagingService,
clock,
ForkJoinPool.commonPool(),
- replicationConfiguration
+ replicationConfiguration,
+ null
);
}
@@ -93,24 +105,27 @@ public class ReplicaService {
* @param clock A hybrid logical clock.
* @param partitionOperationsExecutor Partition operation executor.
* @param replicationConfiguration Replication configuration.
+ * @param retryExecutor Retry executor.
*/
public ReplicaService(
MessagingService messagingService,
HybridClock clock,
Executor partitionOperationsExecutor,
- ReplicationConfiguration replicationConfiguration
+ ReplicationConfiguration replicationConfiguration,
+ @Nullable ScheduledExecutorService retryExecutor
) {
this.messagingService = messagingService;
this.clock = clock;
this.partitionOperationsExecutor = partitionOperationsExecutor;
this.replicationConfiguration = replicationConfiguration;
+ this.retryExecutor = retryExecutor;
}
/**
* Sends request to the replica node.
*
* @param targetNodeConsistentId A consistent id of the replica node..
- * @param req Replica request.
+ * @param req Replica request.
* @return Response future with either evaluation result or completed
exceptionally.
* @see NodeStoppingException If either supplier or demander node is
stopping.
* @see ReplicaUnavailableException If replica with given replication
group id doesn't exist or not started yet.
@@ -210,7 +225,14 @@ public class ReplicaService {
return null;
});
} else {
- res.completeExceptionally(errResp.throwable());
+ if (retryExecutor != null &&
matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR))
{
+ retryExecutor.schedule(
+ // Need to resubmit again to pool which is
valid for synchronous IO execution.
+ () ->
partitionOperationsExecutor.execute(() ->
res.completeExceptionally(errResp.throwable())),
+ RETRY_TIMEOUT_MILLIS, MILLISECONDS);
+ } else {
+ res.completeExceptionally(errResp.throwable());
+ }
}
} else {
res.complete((R) ((ReplicaResponse) response).result());
@@ -224,7 +246,7 @@ public class ReplicaService {
/**
* Sends a request to the given replica {@code node} and returns a future
that will be completed with a result of request processing.
*
- * @param node Replica node.
+ * @param node Replica node.
* @param request Request.
* @return Response future with either evaluation result or completed
exceptionally.
* @see NodeStoppingException If either supplier or demander node is
stopping.
@@ -252,8 +274,8 @@ public class ReplicaService {
/**
* Sends a request to the given replica {@code node} and returns a future
that will be completed with a result of request processing.
*
- * @param node Replica node.
- * @param request Request.
+ * @param node Replica node.
+ * @param request Request.
* @param storageId Storage id.
* @return Response future with either evaluation result or completed
exceptionally.
* @see NodeStoppingException If either supplier or demander node is
stopping.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index cdb511f090..0c0ead5bf1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -386,7 +386,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
messagingServiceReturningToStorageOperationsPool,
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
- replicationConfiguration
+ replicationConfiguration,
+ threadPoolsManager.commonScheduler()
);
var lockManager = new HeapLockManager();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index d1b31ab596..9fd16e4dc5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -135,7 +135,11 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
private void checkResourcesAreReleased(IgniteImpl ignite) {
checkCursorsAreClosed(ignite);
- assertTrue(ignite.txManager().lockManager().isEmpty());
+ try {
+ assertTrue(waitForCondition(() ->
ignite.txManager().lockManager().isEmpty(), 1000));
+ } catch (InterruptedException e) {
+ fail("Unexpected interruption");
+ }
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8503e0f529..cf3d4ca1b0 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -610,7 +610,8 @@ public class IgniteImpl implements Ignite {
messagingServiceReturningToStorageOperationsPool,
clock,
threadPoolsManager.partitionOperationsExecutor(),
- replicationConfig
+ replicationConfig,
+ threadPoolsManager.commonScheduler()
);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier =
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 0c742a9801..e08d866cd0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1098,7 +1098,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
clusterService.messagingService(),
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
- replicationConfiguration
+ replicationConfiguration,
+ threadPoolsManager.commonScheduler()
);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 8ea498f868..9692a2f908 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,7 @@ import static
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
@@ -29,19 +30,20 @@ import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allR
import static
org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
@@ -52,12 +54,10 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
@@ -75,8 +75,6 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
-import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -101,16 +99,13 @@ import
org.apache.ignite.internal.table.distributed.replication.request.SwapRowR
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.CollectionUtils;
-import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.tx.TransactionException;
@@ -365,16 +360,16 @@ public class InternalTableImpl implements InternalTable {
if (implicit) {
long ts = (txStartTs == null) ?
actualTx.startTimestamp().getPhysical() : txStartTs;
- if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis()
- ts < implicitTransactionTimeout) {
+ if (exceptionAllowsImplicitTxRetry(e) &&
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
return enlistInTx(row, null, fac, noWriteChecker, ts);
}
}
- throw wrapReplicationException(e);
+ sneakyThrow(e);
}
return completedFuture(r);
- }).thenCompose(x -> x);
+ }).thenCompose(identity());
}
/**
@@ -484,16 +479,16 @@ public class InternalTableImpl implements InternalTable {
if (implicit) {
long ts = (txStartTs == null) ?
actualTx.startTimestamp().getPhysical() : txStartTs;
- if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis()
- ts < implicitTransactionTimeout) {
+ if (exceptionAllowsImplicitTxRetry(e) &&
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
return enlistInTx(keyRows, null, fac, reducer,
noOpChecker, ts);
}
}
- throw wrapReplicationException(e);
+ sneakyThrow(e);
}
return completedFuture(r);
- }).thenCompose(x -> x);
+ }).thenCompose(identity());
}
private InternalTransaction startImplicitRwTxIfNeeded(@Nullable
InternalTransaction tx) {
@@ -626,50 +621,71 @@ public class InternalTableImpl implements InternalTable {
|| request instanceof MultipleRowPkReplicaRequest &&
((MultipleRowPkReplicaRequest) request).requestType() != RW_GET_ALL
|| request instanceof SwapRowReplicaRequest;
- if (write && !full) {
- // Track only write requests from explicit transactions.
- if (!transactionInflights.addInflight(tx.id(), false)) {
- return failedFuture(
- new TransactionException(TX_ALREADY_FINISHED_ERR,
format(
- "Transaction is already finished
[tableName={}, partId={}, txState={}].",
- tableName,
- partId,
- tx.state()
- )));
- }
+ if (full) { // Full transaction retries are handled in postEnlist.
+ return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
request);
+ } else {
+ if (write) { // Track only write requests from explicit
transactions.
+ if (!transactionInflights.addInflight(tx.id(), false)) {
+ return failedFuture(
+ new TransactionException(TX_ALREADY_FINISHED_ERR,
format(
+ "Transaction is already finished
[tableName={}, partId={}, txState={}].",
+ tableName,
+ partId,
+ tx.state()
+ )));
+ }
- return
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(),
request).thenApply(res -> {
- assert noWriteChecker != null;
+ return
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(),
request).thenApply(res -> {
+ assert noWriteChecker != null;
- // Remove inflight if no replication was scheduled, otherwise
inflight will be removed by delayed response.
- if (noWriteChecker.test(res, request)) {
- transactionInflights.removeInflight(tx.id());
- }
+ // Remove inflight if no replication was scheduled,
otherwise inflight will be removed by delayed response.
+ if (noWriteChecker.test(res, request)) {
+ transactionInflights.removeInflight(tx.id());
+ }
+
+ return res;
+ }).handle((r, e) -> {
+ if (e != null) {
+ if (retryOnLockConflict > 0 &&
matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
+ transactionInflights.removeInflight(tx.id()); //
Will be retried.
+
+ return trackingInvoke(
+ tx,
+ partId,
+ ignored -> request,
+ false,
+ primaryReplicaAndConsistencyToken,
+ noWriteChecker,
+ retryOnLockConflict - 1
+ );
+ }
- return res;
- }).handle((r, e) -> {
- if (e != null) {
- if (retryOnLockConflict > 0 && e.getCause() instanceof
LockException) {
- transactionInflights.removeInflight(tx.id()); // Will
be retried.
-
- return trackingInvoke(
- tx,
- partId,
- ignored -> request,
- full,
- primaryReplicaAndConsistencyToken,
- noWriteChecker,
- retryOnLockConflict - 1
- );
+ sneakyThrow(e);
}
- ExceptionUtils.sneakyThrow(e);
- }
+ return completedFuture(r);
+ }).thenCompose(identity());
+ } else { // Explicit reads should be retried too.
+ return
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(),
request).handle((r, e) -> {
+ if (e != null) {
+ if (retryOnLockConflict > 0 &&
matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
+ return trackingInvoke(
+ tx,
+ partId,
+ ignored -> request,
+ false,
+ primaryReplicaAndConsistencyToken,
+ noWriteChecker,
+ retryOnLockConflict - 1
+ );
+ }
- return completedFuture(r);
- }).thenCompose(x -> x);
- } else {
- return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
request);
+ sneakyThrow(e);
+ }
+
+ return completedFuture(r);
+ }).thenCompose(identity());
+ }
}
}
@@ -690,30 +706,25 @@ public class InternalTableImpl implements InternalTable {
if (full) { // Full txn is already finished remotely. Just update
local state.
txManager.finishFull(observableTimestampTracker, tx0.id(), e
== null);
- return e != null ? failedFuture(wrapReplicationException(e)) :
completedFuture(r);
+ return e != null ? failedFuture(e) : completedFuture(r);
}
if (e != null) {
- RuntimeException e0 = wrapReplicationException(e);
-
return tx0.rollbackAsync().handle((ignored, err) -> {
if (err != null) {
- e0.addSuppressed(err);
+ e.addSuppressed(err);
}
- throw e0;
+ sneakyThrow(e);
+ return null;
}); // Preserve failed state.
} else {
if (autoCommit) {
- return tx0.commitAsync()
- .exceptionally(ex -> {
- throw wrapReplicationException(ex);
- })
- .thenApply(ignored -> r);
+ return tx0.commitAsync().thenApply(ignored -> r);
} else {
return completedFuture(r);
}
}
- }).thenCompose(x -> x);
+ }).thenCompose(identity());
}
/**
@@ -819,29 +830,24 @@ public class InternalTableImpl implements InternalTable {
private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut,
InternalTransaction tx) {
return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r,
e) -> {
if (e != null) {
- RuntimeException e0 = wrapReplicationException(e);
-
return tx.finish(false, clock.now())
.handle((ignored, err) -> {
-
if (err != null) {
- e0.addSuppressed(err);
+ e.addSuppressed(err);
}
- throw e0;
+
+ sneakyThrow(e);
+ return null;
}); // Preserve failed state.
}
- return tx.finish(true, clock.now())
- .exceptionally(ex -> {
- throw wrapReplicationException(ex);
- })
- .thenApply(ignored -> r);
- }).thenCompose(x -> x);
+ return tx.finish(true, clock.now()).thenApply(ignored -> r);
+ }).thenCompose(identity());
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow,
InternalTransaction tx) {
+ public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable
InternalTransaction tx) {
if (tx == null) {
return evaluateReadOnlyPrimaryNode(
keyRow,
@@ -872,7 +878,7 @@ public class InternalTableImpl implements InternalTable {
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RW_GET)
.timestampLong(clock.nowLong())
- .full(tx == null)
+ .full(false)
.coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> false
@@ -1012,6 +1018,10 @@ public class InternalTableImpl implements InternalTable {
boolean first = true;
for (BinaryRow row : rows) {
+ if (row == null) {
+ continue;
+ }
+
if (first) {
schemaVersion = row.schemaVersion();
first = false;
@@ -1056,7 +1066,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction
tx) {
+ public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable
InternalTransaction tx) {
return enlistInTx(
row,
tx,
@@ -1078,7 +1088,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows,
InternalTransaction tx) {
+ public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows,
@Nullable InternalTransaction tx) {
return enlistInTx(
rows,
tx,
@@ -1091,9 +1101,29 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows,
@Nullable BitSet deleted, int partition) {
+ return updateAllWithRetry(rows, deleted, partition, null);
+ }
+
+ /**
+ * Update all with retry.
+ *
+ * @param rows Rows.
+ * @param deleted Deleted.
+ * @param partition The partition.
+ * @param txStartTs Start timestamp.
+ * @return The future.
+ */
+ private CompletableFuture<Void> updateAllWithRetry(
+ Collection<BinaryRowEx> rows,
+ @Nullable BitSet deleted,
+ int partition,
+ @Nullable Long txStartTs
+ ) {
InternalTransaction tx = txManager.begin(observableTimestampTracker);
TablePartitionId partGroupId = new TablePartitionId(tableId,
partition);
+ assert rows.stream().allMatch(row -> partitionId(row) == partition) :
"Invalid batch for partition " + partition;
+
CompletableFuture<Void> fut = enlistAndInvoke(
tx,
partition,
@@ -1102,7 +1132,20 @@ public class InternalTableImpl implements InternalTable {
null
);
- return postEnlist(fut, false, tx, true); // Will be committed in one
RTT.
+ // Will be finished in one RTT.
+ return postEnlist(fut, false, tx, true).handle((r, e) -> {
+ if (e != null) {
+ long ts = (txStartTs == null) ?
tx.startTimestamp().getPhysical() : txStartTs;
+
+ if (exceptionAllowsImplicitTxRetry(e) &&
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
+ return updateAllWithRetry(rows, deleted, partition, ts);
+ }
+
+ sneakyThrow(e);
+ }
+
+ return completedFuture(r);
+ }).thenCompose(identity());
}
/** {@inheritDoc} */
@@ -1702,7 +1745,7 @@ public class InternalTableImpl implements InternalTable {
}
return fut;
- }).thenCompose(Function.identity());
+ }).thenCompose(identity());
}
/**
@@ -2159,34 +2202,6 @@ public class InternalTableImpl implements InternalTable {
});
}
- /**
- * Casts any exception type to a client exception, wherein {@link
ReplicationException} and {@link LockException} are wrapped to
- * {@link TransactionException}, but another exceptions are wrapped to a
common exception. The method does not wrap an exception if the
- * exception already inherits type of {@link RuntimeException}.
- *
- * @param e An instance exception to cast to client side one.
- * @return {@link IgniteException} An instance of client side exception.
- */
- private RuntimeException wrapReplicationException(Throwable e) {
- if (e instanceof CompletionException) {
- e = e.getCause();
- }
-
- RuntimeException e0;
-
- if (e instanceof ReplicationException || e instanceof ConnectException
|| e instanceof TimeoutException) {
- e0 = withCause(TransactionException::new,
TX_REPLICA_UNAVAILABLE_ERR, e);
- } else if (e instanceof LockException) {
- e0 = withCause(TransactionException::new, ACQUIRE_LOCK_ERR, e);
- } else if (!(e instanceof RuntimeException)) {
- e0 = withCause(IgniteException::new, INTERNAL_ERR, e);
- } else {
- e0 = (RuntimeException) e;
- }
-
- return e0;
- }
-
@Override
public @Nullable PendingComparableValuesTracker<HybridTimestamp, Void>
getPartitionSafeTimeTracker(int partitionId) {
return safeTimeTrackerByPartitionId.get(partitionId);
@@ -2273,13 +2288,7 @@ public class InternalTableImpl implements InternalTable {
* @param e Exception to check.
* @return True if retrying is possible, false otherwise.
*/
- private static boolean exceptionAllowsTxRetry(Throwable e) {
- Throwable ex = unwrapCause(e);
-
- while (ex instanceof TransactionException && ex.getCause() != null) {
- ex = ex.getCause();
- }
-
- return ex instanceof LockException || ex instanceof
PrimaryReplicaMissException;
+ private static boolean exceptionAllowsImplicitTxRetry(Throwable e) {
+ return matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR);
}
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a7b3fe44ab..530dcbca3f 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -247,7 +247,6 @@ public class ItTxTestCluster {
protected String localNodeName;
private final ClusterNodeResolver nodeResolver = new ClusterNodeResolver()
{
-
@Override
public @Nullable ClusterNode getById(String id) {
for (ClusterService service : cluster) {
@@ -425,7 +424,8 @@ public class ItTxTestCluster {
clusterService.messagingService(),
clock,
partitionOperationsExecutor,
- replicationConfiguration
+ replicationConfiguration,
+ executor
));
replicaServices.put(node.name(), replicaSvc);
@@ -981,7 +981,8 @@ public class ItTxTestCluster {
client.messagingService(),
clientClock,
partitionOperationsExecutor,
- replicationConfiguration
+ replicationConfiguration,
+ executor
));
LOG.info("The client has been started");
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
new file mode 100644
index 0000000000..7c0dc5c40c
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.lang.IgniteExceptionMapper.unchecked;
+
+import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.lang.IgniteExceptionMapper;
+import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.tx.TransactionException;
+
+/**
+ * Transaction module exception mapper.
+ */
+@AutoService(IgniteExceptionMappersProvider.class)
+public class TransactionExceptionMapperProvider implements
IgniteExceptionMappersProvider {
+ @Override
+ public Collection<IgniteExceptionMapper<?, ?>> mappers() {
+ List<IgniteExceptionMapper<?, ?>> mappers = new ArrayList<>();
+
+ mappers.add(unchecked(LockException.class, err -> new
TransactionException(err.traceId(), err.code(), err.getMessage(), err)));
+ mappers.add(unchecked(ReplicationException.class,
+ err -> new TransactionException(err.traceId(), err.code(),
err.getMessage(), err)));
+
+ return mappers;
+ }
+}