This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7c93a132e4 IGNITE-20709 Fix safe time reordering wihtin partitions
(#2820)
7c93a132e4 is described below
commit 7c93a132e424daeddf9ede6f209b717d37d29a1e
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Nov 13 17:01:44 2023 +0200
IGNITE-20709 Fix safe time reordering wihtin partitions (#2820)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 2 +
.../internal/lang/SafeTimeReorderException.java} | 23 +-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 5 +
.../apache/ignite/raft/jraft/error/RaftError.java | 7 +-
.../jraft/rpc/impl/ActionRequestProcessor.java | 11 +-
.../command/SafeTimePropagatingCommand.java | 9 +
.../ReplicationMaxRetriesExceededException.java} | 28 +-
.../runner/app/ItIgniteNodeRestartTest.java | 1 -
.../schemasync/ItSchemaSyncAndReplicationTest.java | 2 -
.../ignite/internal/table/ItDurableFinishTest.java | 2 +
.../table/distributed/raft/PartitionListener.java | 42 ++-
.../replicator/PartitionReplicaListener.java | 333 +++++++++++++--------
.../raft/PartitionCommandListenerTest.java | 10 +-
.../ignite/internal/table/TxAbstractTest.java | 1 +
14 files changed, 315 insertions(+), 161 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 77ecdb68c9..e980295ca1 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -382,6 +382,8 @@ public class ErrorGroups {
/** Stopping replica exception code. */
public static final int REPLICA_STOPPING_ERR =
REPLICATOR_ERR_GROUP.registerErrorCode((short) 8);
+ /** Replication safe time reordering. */
+ public static final int REPLICATION_SAFE_TIME_REORDERING_ERR =
REPLICATOR_ERR_GROUP.registerErrorCode((short) 9);
}
/** Storage error group. */
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
similarity index 60%
copy from
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
index e47229aa2c..9c5faab605 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
@@ -15,26 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.replicator.command;
+package org.apache.ignite.internal.lang;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
+import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICATION_SAFE_TIME_REORDERING_ERR;
/**
- * Common interface for commands carrying safe time.
+ * This exception is used to indicate a detection of a safe time reordering.
*/
-public interface SafeTimePropagatingCommand extends WriteCommand {
- /**
- * Returns safe time.
- */
- long safeTimeLong();
+public class SafeTimeReorderException extends IgniteInternalException {
/**
- * Returns safe time.
+ * The constructor.
*/
- default HybridTimestamp safeTime() {
- return hybridTimestamp(safeTimeLong());
+ public SafeTimeReorderException() {
+ super(REPLICATION_SAFE_TIME_REORDERING_ERR, "Replication safe time
reordering detected.");
}
+
}
+
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index d5309ecd49..66ca55ccbf 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -637,6 +638,10 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
scheduleRetry(() -> sendWithRetry(leader, requestFactory,
stopTime, fut));
}
+ break;
+ case EREORDER:
+ fut.completeExceptionally(new SafeTimeReorderException());
+
break;
default:
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
index 74c8b4b2ae..35dd2061c0 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
@@ -238,7 +238,12 @@ public enum RaftError {
/**
* Permission denied
*/
- EACCES(1016);
+ EACCES(1016),
+
+ /**
+ * Command reordering detected. This is not the original JRAFT error code,
but an Ignite specific one.
+ */
+ EREORDER(2017);
private static final Map<Integer, RaftError> RAFT_ERROR_MAP = new
HashMap<>();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index 02273d94e9..aa9bbcf185 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -22,12 +22,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
-import org.apache.ignite.internal.raft.WriteCommand;
import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -87,7 +87,14 @@ public class ActionRequestProcessor implements
RpcProcessor<ActionRequest> {
if (request instanceof WriteActionRequest) {
if (fsm.getListener() instanceof BeforeApplyHandler) {
synchronized (groupIdSyncMonitor(request.groupId())) {
- callOnBeforeApply(request, fsm);
+ try {
+ callOnBeforeApply(request, fsm);
+ } catch (SafeTimeReorderException e) {
+
rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.EREORDER.getNumber()).build());
+
+ return;
+ }
+
applyWrite(node, (WriteActionRequest) request, rpcCtx);
}
} else {
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
index e47229aa2c..cb2bc49c1b 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.network.annotations.WithSetter;
/**
* Common interface for commands carrying safe time.
@@ -29,8 +30,16 @@ public interface SafeTimePropagatingCommand extends
WriteCommand {
/**
* Returns safe time.
*/
+ @WithSetter
long safeTimeLong();
+ /**
+ * Setter for the safeTime field.
+ */
+ default void safeTimeLong(long safeTime) {
+ // No-op.
+ }
+
/**
* Returns safe time.
*/
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationMaxRetriesExceededException.java
similarity index 50%
copy from
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
copy to
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationMaxRetriesExceededException.java
index e47229aa2c..f7ffeefa34 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationMaxRetriesExceededException.java
@@ -15,26 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.replicator.command;
+package org.apache.ignite.internal.replicator.exception;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.ErrorGroups.Replicator;
/**
- * Common interface for commands carrying safe time.
+ * The exception is thrown when an amount of replication retries exceeds the
limit.
*/
-public interface SafeTimePropagatingCommand extends WriteCommand {
- /**
- * Returns safe time.
- */
- long safeTimeLong();
-
+public class ReplicationMaxRetriesExceededException extends
ReplicationException {
/**
- * Returns safe time.
+ * The constructor.
+ *
+ * @param replicaGrpId Replication group id.
+ * @param limit Maximum possible amount of retries.
*/
- default HybridTimestamp safeTime() {
- return hybridTimestamp(safeTimeLong());
+ public ReplicationMaxRetriesExceededException(ReplicationGroupId
replicaGrpId, int limit) {
+ super(Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format(
+ "Replication retries exceeds the limit [replicaGrpId={},
limit={}]", replicaGrpId, limit));
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 915d12b2da..a140ba43d0 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -767,7 +767,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Restarts the node which stores some data.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20709")
public void nodeWithDataTest() {
IgniteImpl ignite = startNode(0);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
index 6303cbf0bb..b1fbca2662 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
@@ -40,14 +40,12 @@ import
org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Tests about interaction between Schema Synchronization and Replication.
*/
@SuppressWarnings("resource")
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-20709")
class ItSchemaSyncAndReplicationTest extends ClusterPerTestIntegrationTest {
private static final int NODES_TO_START = 3;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 5b94514a16..7060c8abef 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -177,6 +178,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20825")
void testCoordinatorMissedResponse() throws ExecutionException,
InterruptedException {
testFinishRow(this::coordinatorMissedResponse, this::commitRow);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 3713402199..27b425273f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -40,11 +40,13 @@ import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -77,7 +79,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Partition command handler.
*/
-public class PartitionListener implements RaftGroupListener {
+public class PartitionListener implements RaftGroupListener,
BeforeApplyHandler {
/** Transaction manager. */
private final TxManager txManager;
@@ -99,6 +101,15 @@ public class PartitionListener implements RaftGroupListener
{
/** Storage index tracker. */
private final PendingComparableValuesTracker<Long, Void>
storageIndexTracker;
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on
restart
+ /** Is used in order to detect and retry safe time reordering within
onBeforeApply. */
+ private long maxObservableSafeTime = -1;
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on
restart
+ /** Is used in order to assert safe time reordering within onWrite. */
+ private long maxObservableSafeTimeVerifier = -1;
+
+
/**
* The constructor.
*
@@ -148,7 +159,15 @@ public class PartitionListener implements
RaftGroupListener {
iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo)
-> {
Command command = clo.command();
- // LOG.info("CMD {}", command.getClass().getName());
+ if (command instanceof SafeTimePropagatingCommand) {
+ SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand)
command;
+ long proposedSafeTime = cmd.safeTime().longValue();
+
+ assert proposedSafeTime > maxObservableSafeTimeVerifier :
"Safe time reordering detected [current="
+ + maxObservableSafeTimeVerifier + ", proposed=" +
proposedSafeTime + "]";
+
+ maxObservableSafeTimeVerifier = proposedSafeTime;
+ }
long commandIndex = clo.index();
long commandTerm = clo.term();
@@ -212,7 +231,9 @@ public class PartitionListener implements RaftGroupListener
{
assert safeTimePropagatingCommand.safeTime() != null;
- updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime());
+ synchronized (safeTime) {
+ updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime());
+ }
}
updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
commandIndex);
@@ -455,6 +476,21 @@ public class PartitionListener implements
RaftGroupListener {
storage.close();
}
+ @Override
+ public void onBeforeApply(Command command) {
+ // This method is synchronized by replication group specific monitor,
see ActionRequestProcessor#handleRequest.
+ if (command instanceof SafeTimePropagatingCommand) {
+ SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand)
command;
+ long proposedSafeTime = cmd.safeTime().longValue();
+
+ if (proposedSafeTime > maxObservableSafeTime) {
+ maxObservableSafeTime = proposedSafeTime;
+ } else {
+ throw new SafeTimeReorderException();
+ }
+ }
+ }
+
/**
* Returns underlying storage.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fc0f7cbee3..53e32760e8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.lang.IgniteUuid;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -83,8 +84,10 @@ import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import
org.apache.ignite.internal.replicator.exception.ReplicationMaxRetriesExceededException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -187,6 +190,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Factory for creating replica command messages. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+ /** Replication retries limit. */
+ private static final int MAX_RETIES_ON_SAFE_TIME_REORDERING = 1000;
+
/** Replication group id. */
private final TablePartitionId replicationGroupId;
@@ -932,9 +938,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(null);
}
- synchronized (commandProcessingLinearizationMutex) {
- return
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTimeLong(hybridClock.nowLong()).build());
- }
+ CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+ applyCmdWithRetryOnSafeTimeReorderException(
+
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTimeLong(hybridClock.nowLong()).build(),
+ resultFuture
+ );
+
+ return resultFuture.thenApply(res -> null);
}
/**
@@ -1587,27 +1598,17 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp :
hybridClock.now();
return reliableCatalogVersionFor(tsForCatalogVersion)
- .thenCompose(catalogVersion -> {
- synchronized (commandProcessingLinearizationMutex) {
- FinishTxCommandBuilder finishTxCmdBldr =
MSG_FACTORY.finishTxCommand()
- .txId(txId)
- .commit(commit)
- .safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(txCoordinatorId)
- .requiredCatalogVersion(catalogVersion)
- .tablePartitionIds(
- aggregatedGroupIds.stream()
-
.map(PartitionReplicaListener::tablePartitionId)
- .collect(toList())
- );
-
- if (commit) {
-
finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
- }
-
- return raftClient.run(finishTxCmdBldr.build());
- }
- })
+ .thenCompose(catalogVersion -> applyFinishCommand(
+ txId,
+ commit,
+ commitTimestamp,
+ txCoordinatorId,
+ catalogVersion,
+ aggregatedGroupIds.stream()
+
.map(PartitionReplicaListener::tablePartitionId)
+ .collect(toList())
+ )
+ )
.whenComplete((o, throwable) -> {
TxState txState = commit ? COMMITED : ABORTED;
@@ -1615,6 +1616,34 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ private CompletableFuture<Object> applyFinishCommand(
+ UUID transactionId,
+ boolean commit,
+ HybridTimestamp commitTimestamp,
+ String txCoordinatorId,
+ int catalogVersion,
+ List<TablePartitionIdMessage> tablePartitionIds
+ ) {
+ synchronized (commandProcessingLinearizationMutex) {
+ FinishTxCommandBuilder finishTxCmdBldr =
MSG_FACTORY.finishTxCommand()
+ .txId(transactionId)
+ .commit(commit)
+ .safeTimeLong(hybridClock.nowLong())
+ .txCoordinatorId(txCoordinatorId)
+ .requiredCatalogVersion(catalogVersion)
+ .tablePartitionIds(tablePartitionIds);
+
+ if (commit) {
+
finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
+ }
+ CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+
applyCmdWithRetryOnSafeTimeReorderException(finishTxCmdBldr.build(),
resultFuture);
+
+ return resultFuture;
+ }
+ }
+
/**
* Processes transaction cleanup request:
@@ -1677,25 +1706,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
return reliableCatalogVersionFor(commandTimestamp)
.thenCompose(catalogVersion -> {
- synchronized (commandProcessingLinearizationMutex) {
- TxCleanupCommand txCleanupCmd =
MSG_FACTORY.txCleanupCommand()
- .txId(request.txId())
- .commit(request.commit())
-
.commitTimestampLong(request.commitTimestampLong())
- .safeTimeLong(hybridClock.nowLong())
-
.txCoordinatorId(getTxCoordinatorId(request.txId()))
- .requiredCatalogVersion(catalogVersion)
- .build();
-
-
storageUpdateHandler.handleTransactionCleanup(request.txId(), request.commit(),
request.commitTimestamp());
-
- raftClient.run(txCleanupCmd)
- .exceptionally(e -> {
- LOG.warn("Failed to complete
transaction cleanup command [txId=" + request.txId() + ']', e);
-
- return completedFuture(null);
- });
- }
+ applyCleanupCommand(
+ request.txId(),
+ request.commit(),
+ request.commitTimestamp(),
+ request.commitTimestampLong(),
+ catalogVersion
+ );
return allOffFuturesExceptionIgnored(txReadFutures,
request)
.thenRun(() -> releaseTxLocks(request.txId()));
@@ -1703,6 +1720,37 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ private CompletableFuture<Void> applyCleanupCommand(
+ UUID transactionId,
+ boolean commit,
+ HybridTimestamp commitTimestamp,
+ long commitTimestampLong,
+ int catalogVersion
+ ) {
+ TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
+ .txId(transactionId)
+ .commit(commit)
+ .commitTimestampLong(commitTimestampLong)
+ .safeTimeLong(hybridClock.nowLong())
+ .txCoordinatorId(getTxCoordinatorId(transactionId))
+ .requiredCatalogVersion(catalogVersion)
+ .build();
+
+ storageUpdateHandler.handleTransactionCleanup(transactionId, commit,
commitTimestamp);
+
+ CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+ applyCmdWithRetryOnSafeTimeReorderException(txCleanupCmd,
resultFuture);
+
+ return resultFuture
+ .exceptionally(e -> {
+ LOG.warn("Failed to complete transaction cleanup command
[txId=" + transactionId + ']', e);
+
+ return completedFuture(null);
+ })
+ .thenApply(res -> null);
+ }
+
private String getTxCoordinatorId(UUID txId) {
TxStateMeta meta = txManager.stateMeta(txId);
@@ -2371,8 +2419,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param cmd Raft command.
* @return Raft future.
*/
- private CompletableFuture<Object> applyCmdWithExceptionHandling(Command
cmd) {
- return raftClient.run(cmd).exceptionally(throwable -> {
+ private CompletableFuture<Object> applyCmdWithExceptionHandling(Command
cmd, CompletableFuture<Object> resultFuture) {
+ applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);
+
+ return resultFuture.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
throw new ReplicationTimeoutException(replicationGroupId);
} else if (throwable instanceof RuntimeException) {
@@ -2383,6 +2433,54 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd,
CompletableFuture<Object> resultFuture) {
+ applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture, 0);
+ }
+
+ private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd,
CompletableFuture<Object> resultFuture, int attemptsCounter) {
+ attemptsCounter++;
+ if (attemptsCounter >= MAX_RETIES_ON_SAFE_TIME_REORDERING) {
+ resultFuture.completeExceptionally(
+ new
ReplicationMaxRetriesExceededException(replicationGroupId,
MAX_RETIES_ON_SAFE_TIME_REORDERING));
+ }
+
+ raftClient.run(cmd).whenComplete((res, ex) -> {
+ if (ex != null) {
+ if (ex instanceof SafeTimeReorderException || ex.getCause()
instanceof SafeTimeReorderException) {
+ assert cmd instanceof SafeTimePropagatingCommand;
+
+ SafeTimePropagatingCommand safeTimePropagatingCommand =
(SafeTimePropagatingCommand) cmd;
+
+ HybridTimestamp safeTimeForRetry = hybridClock.now();
+
+ // Within primary replica it's required to update safe
time in order to prevent double storage updates in case of !1PC.
+ // Otherwise, it may be possible that a newer entry will
be overwritten by an older one that came as part of the raft
+ // replication flow:
+ // tx1 = transactions.begin();
+ // tx1.put(k1, v1) -> primary.apply(k1,v1) + asynchronous
raft replication (k1,v1)
+ // tx1.put(k1, v2) -> primary.apply(k1,v2) + asynchronous
raft replication (k1,v1)
+ // (k1,v1) replication overrides newer (k1, v2).
Eventually (k1,v2) replication will restore proper value.
+ // However it's possible that tx1.get(k1) will see v1
instead of v2.
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Better solution requied.
Given one is correct, but fragile.
+ if ((cmd instanceof UpdateCommand && !((UpdateCommand)
cmd).full())
+ || (cmd instanceof UpdateAllCommand &&
!((UpdateAllCommand) cmd).full())) {
+ synchronized (safeTime) {
+
updateTrackerIgnoringTrackerClosedException(safeTime, safeTimeForRetry);
+ }
+ }
+
+
safeTimePropagatingCommand.safeTimeLong(safeTimeForRetry.longValue());
+
+
applyCmdWithRetryOnSafeTimeReorderException(safeTimePropagatingCommand,
resultFuture);
+ } else {
+ resultFuture.completeExceptionally(ex);
+ }
+ } else {
+ resultFuture.complete(res);
+ }
+ });
+ }
+
/**
* Executes an Update command.
*
@@ -2420,34 +2518,37 @@ public class PartitionReplicaListener implements
ReplicaListener {
);
if (!cmd.full()) {
- CompletableFuture<UUID> fut =
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
- // This check guaranties the result will never be lost.
Currently always null.
- assert res == null : "Replication result is lost";
-
- // Set context for delayed response.
- return cmd.txId();
- });
-
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124
Temporary code below
synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- true,
- null,
- null,
- null);
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ true,
+ null,
+ null,
+ null);
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- }
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
}
+ CompletableFuture<UUID> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
+ .thenApply(res -> {
+ // This check guaranties the result will never be
lost. Currently always null.
+ assert res == null : "Replication result is lost";
+
+ // Set context for delayed response.
+ return cmd.txId();
+ });
+
return completedFuture(fut);
} else {
- return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+ CompletableFuture<Object> resultFuture = new
CompletableFuture<>();
+
+ applyCmdWithExceptionHandling(cmd, resultFuture);
+
+ return resultFuture.thenApply(res -> {
// This check guaranties the result will never be lost.
Currently always null.
assert res == null : "Replication result is lost";
@@ -2513,7 +2614,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param txCoordinatorId Transaction coordinator id.
* @param catalogVersion Validated catalog version associated with given
operation.
* @param skipDelayedAck {@code true} to disable the delayed ack
optimization.
- * @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
+ * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command,
CompletableFuture)}.
*/
private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
@@ -2539,68 +2640,66 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (skipDelayedAck) {
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
-
cmd.tablePartitionId().asTablePartitionId(),
- true,
- null,
- null
- );
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ true,
+ null,
+ null
+ );
-
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
- }
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
}
- return applyCmdWithExceptionHandling(cmd).thenApply(res ->
null);
+ return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<>()).thenApply(res -> null);
} else {
- CompletableFuture<Object> fut =
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
- // Currently result is always null on a successfull
execution of a replication command.
- // This check guaranties the result will never be lost.
- assert res == null : "Replication result is lost";
-
- // Set context for delayed response.
- return cmd.txId();
- });
-
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
-
cmd.tablePartitionId().asTablePartitionId(),
- true,
- null,
- null
- );
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ true,
+ null,
+ null
+ );
-
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
- }
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
}
+ CompletableFuture<Object> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
+ .thenApply(res -> {
+ // Currently result is always null on a
successfull execution of a replication command.
+ // This check guaranties the result will never
be lost.
+ assert res == null : "Replication result is
lost";
+
+ // Set context for delayed response.
+ return cmd.txId();
+ });
+
return completedFuture(fut);
}
} else {
- return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
- assert res == null : "Replication result is lost";
-
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
- // In case of full (1PC) commit double update is only a
matter of optimisation and not correctness, because
- // there's no other transaction that can rewrite given key
because of locks and same transaction re-write isn't possible
- // just because there's only one operation in 1PC.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.tablePartitionId().asTablePartitionId(),
- false,
- null,
- cmd.safeTime()
- );
+ return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<>())
+ .thenApply(res -> {
+ assert res == null : "Replication result is lost";
+
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
+ // In case of full (1PC) commit double update is
only a matter of optimisation and not correctness, because
+ // there's no other transaction that can rewrite
given key because of locks and same transaction re-write isn't
+ // possible just because there's only one
operation in 1PC.
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+
cmd.tablePartitionId().asTablePartitionId(),
+ false,
+ null,
+ cmd.safeTime()
+ );
- return null;
- });
+ return null;
+ });
}
}
}
@@ -2612,7 +2711,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param rowsToUpdate All {@link BinaryRow}s represented as {@link
TimedBinaryRowMessage}s to be updated.
* @param txCoordinatorId Transaction coordinator id.
* @param catalogVersion Validated catalog version associated with given
operation.
- * @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
+ * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command,
CompletableFuture)}.
*/
private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
ReadWriteMultiRowReplicaRequest request,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 3773ccea66..52671921cc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -347,19 +347,17 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
void testSkipWriteCommandByAppliedIndex() {
mvPartitionStorage.lastApplied(10L, 1L);
- HybridTimestamp timestamp = hybridClock.now();
-
UpdateCommand updateCommand = mock(UpdateCommand.class);
- when(updateCommand.safeTime()).thenReturn(timestamp);
+ when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
TxCleanupCommand txCleanupCommand = mock(TxCleanupCommand.class);
- when(txCleanupCommand.safeTime()).thenReturn(timestamp);
+ when(txCleanupCommand.safeTime()).thenAnswer(v -> hybridClock.now());
SafeTimeSyncCommand safeTimeSyncCommand =
mock(SafeTimeSyncCommand.class);
- when(safeTimeSyncCommand.safeTime()).thenReturn(timestamp);
+ when(safeTimeSyncCommand.safeTime()).thenAnswer(v ->
hybridClock.now());
FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
- when(finishTxCommand.safeTime()).thenReturn(timestamp);
+ when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now());
// Checks for MvPartitionStorage.
commandListener.onWrite(List.of(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index a05bb3b393..52f8ea4a7a 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -382,6 +382,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
/**
* Tests negative transfer scenario.
*/
+ @Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-17861")
public void testTxClosureAbortAsync() {
double balance1 = 10.;