This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c93a132e4 IGNITE-20709 Fix safe time reordering wihtin partitions 
(#2820)
7c93a132e4 is described below

commit 7c93a132e424daeddf9ede6f209b717d37d29a1e
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Nov 13 17:01:44 2023 +0200

    IGNITE-20709 Fix safe time reordering wihtin partitions (#2820)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   2 +
 .../internal/lang/SafeTimeReorderException.java}   |  23 +-
 .../ignite/internal/raft/RaftGroupServiceImpl.java |   5 +
 .../apache/ignite/raft/jraft/error/RaftError.java  |   7 +-
 .../jraft/rpc/impl/ActionRequestProcessor.java     |  11 +-
 .../command/SafeTimePropagatingCommand.java        |   9 +
 .../ReplicationMaxRetriesExceededException.java}   |  28 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   1 -
 .../schemasync/ItSchemaSyncAndReplicationTest.java |   2 -
 .../ignite/internal/table/ItDurableFinishTest.java |   2 +
 .../table/distributed/raft/PartitionListener.java  |  42 ++-
 .../replicator/PartitionReplicaListener.java       | 333 +++++++++++++--------
 .../raft/PartitionCommandListenerTest.java         |  10 +-
 .../ignite/internal/table/TxAbstractTest.java      |   1 +
 14 files changed, 315 insertions(+), 161 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 77ecdb68c9..e980295ca1 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -382,6 +382,8 @@ public class ErrorGroups {
         /** Stopping replica exception code. */
         public static final int REPLICA_STOPPING_ERR = 
REPLICATOR_ERR_GROUP.registerErrorCode((short) 8);
 
+        /** Replication safe time reordering. */
+        public static final int REPLICATION_SAFE_TIME_REORDERING_ERR = 
REPLICATOR_ERR_GROUP.registerErrorCode((short) 9);
     }
 
     /** Storage error group. */
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
similarity index 60%
copy from 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
index e47229aa2c..9c5faab605 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.replicator.command;
+package org.apache.ignite.internal.lang;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
+import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICATION_SAFE_TIME_REORDERING_ERR;
 
 /**
- * Common interface for commands carrying safe time.
+ * This exception is used to indicate a detection of a safe time reordering.
  */
-public interface SafeTimePropagatingCommand extends WriteCommand {
-    /**
-     * Returns safe time.
-     */
-    long safeTimeLong();
+public class SafeTimeReorderException extends IgniteInternalException {
 
     /**
-     * Returns safe time.
+     * The constructor.
      */
-    default HybridTimestamp safeTime() {
-        return hybridTimestamp(safeTimeLong());
+    public SafeTimeReorderException() {
+        super(REPLICATION_SAFE_TIME_REORDERING_ERR, "Replication safe time 
reordering detected.");
     }
+
 }
+
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index d5309ecd49..66ca55ccbf 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -637,6 +638,10 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                     scheduleRetry(() -> sendWithRetry(leader, requestFactory, 
stopTime, fut));
                 }
 
+                break;
+            case EREORDER:
+                fut.completeExceptionally(new SafeTimeReorderException());
+
                 break;
 
             default:
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
index 74c8b4b2ae..35dd2061c0 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
@@ -238,7 +238,12 @@ public enum RaftError {
     /**
      * Permission denied
      */
-    EACCES(1016);
+    EACCES(1016),
+
+    /**
+     * Command reordering detected. This is not the original JRAFT error code, 
but an Ignite specific one.
+     */
+    EREORDER(2017);
 
     private static final Map<Integer, RaftError> RAFT_ERROR_MAP = new 
HashMap<>();
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index 02273d94e9..aa9bbcf185 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -22,12 +22,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
-import org.apache.ignite.internal.raft.WriteCommand;
 import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
 import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -87,7 +87,14 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
         if (request instanceof WriteActionRequest) {
             if (fsm.getListener() instanceof BeforeApplyHandler) {
                 synchronized (groupIdSyncMonitor(request.groupId())) {
-                    callOnBeforeApply(request, fsm);
+                    try {
+                        callOnBeforeApply(request, fsm);
+                    } catch (SafeTimeReorderException e) {
+                        
rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.EREORDER.getNumber()).build());
+
+                        return;
+                    }
+
                     applyWrite(node, (WriteActionRequest) request, rpcCtx);
                 }
             } else {
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
index e47229aa2c..cb2bc49c1b 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.network.annotations.WithSetter;
 
 /**
  * Common interface for commands carrying safe time.
@@ -29,8 +30,16 @@ public interface SafeTimePropagatingCommand extends 
WriteCommand {
     /**
      * Returns safe time.
      */
+    @WithSetter
     long safeTimeLong();
 
+    /**
+     * Setter for the safeTime field.
+     */
+    default void safeTimeLong(long safeTime) {
+        // No-op.
+    }
+
     /**
      * Returns safe time.
      */
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationMaxRetriesExceededException.java
similarity index 50%
copy from 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
copy to 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationMaxRetriesExceededException.java
index e47229aa2c..f7ffeefa34 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationMaxRetriesExceededException.java
@@ -15,26 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.replicator.command;
+package org.apache.ignite.internal.replicator.exception;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.ErrorGroups.Replicator;
 
 /**
- * Common interface for commands carrying safe time.
+ * The exception is thrown when an amount of replication retries exceeds the 
limit.
  */
-public interface SafeTimePropagatingCommand extends WriteCommand {
-    /**
-     * Returns safe time.
-     */
-    long safeTimeLong();
-
+public class ReplicationMaxRetriesExceededException extends 
ReplicationException {
     /**
-     * Returns safe time.
+     * The constructor.
+     *
+     * @param replicaGrpId Replication group id.
+     * @param limit Maximum possible amount of retries.
      */
-    default HybridTimestamp safeTime() {
-        return hybridTimestamp(safeTimeLong());
+    public ReplicationMaxRetriesExceededException(ReplicationGroupId 
replicaGrpId, int limit) {
+        super(Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format(
+                "Replication retries exceeds the limit [replicaGrpId={}, 
limit={}]", replicaGrpId, limit));
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 915d12b2da..a140ba43d0 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -767,7 +767,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      * Restarts the node which stores some data.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20709";)
     public void nodeWithDataTest() {
         IgniteImpl ignite = startNode(0);
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
index 6303cbf0bb..b1fbca2662 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
@@ -40,14 +40,12 @@ import 
org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Tests about interaction between Schema Synchronization and Replication.
  */
 @SuppressWarnings("resource")
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-20709";)
 class ItSchemaSyncAndReplicationTest extends ClusterPerTestIntegrationTest {
     private static final int NODES_TO_START = 3;
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 5b94514a16..7060c8abef 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -177,6 +178,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20825";)
     void testCoordinatorMissedResponse() throws ExecutionException, 
InterruptedException {
         testFinishRow(this::coordinatorMissedResponse, this::commitRow);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 3713402199..27b425273f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -40,11 +40,13 @@ import java.util.function.Consumer;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.CommittedConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -77,7 +79,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Partition command handler.
  */
-public class PartitionListener implements RaftGroupListener {
+public class PartitionListener implements RaftGroupListener, 
BeforeApplyHandler {
     /** Transaction manager. */
     private final TxManager txManager;
 
@@ -99,6 +101,15 @@ public class PartitionListener implements RaftGroupListener 
{
     /** Storage index tracker. */
     private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker;
 
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on 
restart
+    /** Is used in order to detect and retry safe time reordering within 
onBeforeApply. */
+    private long maxObservableSafeTime = -1;
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on 
restart
+    /** Is used in order to assert safe time reordering within onWrite. */
+    private long maxObservableSafeTimeVerifier = -1;
+
+
     /**
      * The constructor.
      *
@@ -148,7 +159,15 @@ public class PartitionListener implements 
RaftGroupListener {
         iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) 
-> {
             Command command = clo.command();
 
-            // LOG.info("CMD {}", command.getClass().getName());
+            if (command instanceof SafeTimePropagatingCommand) {
+                SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) 
command;
+                long proposedSafeTime = cmd.safeTime().longValue();
+
+                assert proposedSafeTime > maxObservableSafeTimeVerifier : 
"Safe time reordering detected [current="
+                        + maxObservableSafeTimeVerifier + ", proposed=" + 
proposedSafeTime + "]";
+
+                maxObservableSafeTimeVerifier = proposedSafeTime;
+            }
 
             long commandIndex = clo.index();
             long commandTerm = clo.term();
@@ -212,7 +231,9 @@ public class PartitionListener implements RaftGroupListener 
{
 
                 assert safeTimePropagatingCommand.safeTime() != null;
 
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime());
+                synchronized (safeTime) {
+                    updateTrackerIgnoringTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime());
+                }
             }
 
             updateTrackerIgnoringTrackerClosedException(storageIndexTracker, 
commandIndex);
@@ -455,6 +476,21 @@ public class PartitionListener implements 
RaftGroupListener {
         storage.close();
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        // This method is synchronized by replication group specific monitor, 
see ActionRequestProcessor#handleRequest.
+        if (command instanceof SafeTimePropagatingCommand) {
+            SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) 
command;
+            long proposedSafeTime = cmd.safeTime().longValue();
+
+            if (proposedSafeTime > maxObservableSafeTime) {
+                maxObservableSafeTime = proposedSafeTime;
+            } else {
+                throw new SafeTimeReorderException();
+            }
+        }
+    }
+
     /**
      * Returns underlying storage.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fc0f7cbee3..53e32760e8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.lang.IgniteUuid;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -83,8 +84,10 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import 
org.apache.ignite.internal.replicator.exception.ReplicationMaxRetriesExceededException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import 
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -187,6 +190,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Factory for creating replica command messages. */
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
+    /** Replication retries limit. */
+    private static final int MAX_RETIES_ON_SAFE_TIME_REORDERING = 1000;
+
     /** Replication group id. */
     private final TablePartitionId replicationGroupId;
 
@@ -932,9 +938,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return completedFuture(null);
         }
 
-        synchronized (commandProcessingLinearizationMutex) {
-            return 
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTimeLong(hybridClock.nowLong()).build());
-        }
+        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+        applyCmdWithRetryOnSafeTimeReorderException(
+                
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTimeLong(hybridClock.nowLong()).build(),
+                resultFuture
+        );
+
+        return resultFuture.thenApply(res -> null);
     }
 
     /**
@@ -1587,27 +1598,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : 
hybridClock.now();
 
         return reliableCatalogVersionFor(tsForCatalogVersion)
-                .thenCompose(catalogVersion -> {
-                    synchronized (commandProcessingLinearizationMutex) {
-                        FinishTxCommandBuilder finishTxCmdBldr = 
MSG_FACTORY.finishTxCommand()
-                                .txId(txId)
-                                .commit(commit)
-                                .safeTimeLong(hybridClock.nowLong())
-                                .txCoordinatorId(txCoordinatorId)
-                                .requiredCatalogVersion(catalogVersion)
-                                .tablePartitionIds(
-                                        aggregatedGroupIds.stream()
-                                                
.map(PartitionReplicaListener::tablePartitionId)
-                                                .collect(toList())
-                                );
-
-                        if (commit) {
-                            
finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
-                        }
-
-                        return raftClient.run(finishTxCmdBldr.build());
-                    }
-                })
+                .thenCompose(catalogVersion -> applyFinishCommand(
+                                txId,
+                                commit,
+                                commitTimestamp,
+                                txCoordinatorId,
+                                catalogVersion,
+                                aggregatedGroupIds.stream()
+                                        
.map(PartitionReplicaListener::tablePartitionId)
+                                        .collect(toList())
+                        )
+                )
                 .whenComplete((o, throwable) -> {
                     TxState txState = commit ? COMMITED : ABORTED;
 
@@ -1615,6 +1616,34 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
+    private CompletableFuture<Object> applyFinishCommand(
+            UUID transactionId,
+            boolean commit,
+            HybridTimestamp commitTimestamp,
+            String txCoordinatorId,
+            int catalogVersion,
+            List<TablePartitionIdMessage> tablePartitionIds
+    ) {
+        synchronized (commandProcessingLinearizationMutex) {
+            FinishTxCommandBuilder finishTxCmdBldr = 
MSG_FACTORY.finishTxCommand()
+                    .txId(transactionId)
+                    .commit(commit)
+                    .safeTimeLong(hybridClock.nowLong())
+                    .txCoordinatorId(txCoordinatorId)
+                    .requiredCatalogVersion(catalogVersion)
+                    .tablePartitionIds(tablePartitionIds);
+
+            if (commit) {
+                
finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
+            }
+            CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+            
applyCmdWithRetryOnSafeTimeReorderException(finishTxCmdBldr.build(), 
resultFuture);
+
+            return resultFuture;
+        }
+    }
+
 
     /**
      * Processes transaction cleanup request:
@@ -1677,25 +1706,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             return reliableCatalogVersionFor(commandTimestamp)
                     .thenCompose(catalogVersion -> {
-                        synchronized (commandProcessingLinearizationMutex) {
-                            TxCleanupCommand txCleanupCmd = 
MSG_FACTORY.txCleanupCommand()
-                                    .txId(request.txId())
-                                    .commit(request.commit())
-                                    
.commitTimestampLong(request.commitTimestampLong())
-                                    .safeTimeLong(hybridClock.nowLong())
-                                    
.txCoordinatorId(getTxCoordinatorId(request.txId()))
-                                    .requiredCatalogVersion(catalogVersion)
-                                    .build();
-
-                            
storageUpdateHandler.handleTransactionCleanup(request.txId(), request.commit(), 
request.commitTimestamp());
-
-                            raftClient.run(txCleanupCmd)
-                                    .exceptionally(e -> {
-                                        LOG.warn("Failed to complete 
transaction cleanup command [txId=" + request.txId() + ']', e);
-
-                                        return completedFuture(null);
-                                    });
-                        }
+                        applyCleanupCommand(
+                                request.txId(),
+                                request.commit(),
+                                request.commitTimestamp(),
+                                request.commitTimestampLong(),
+                                catalogVersion
+                        );
 
                         return allOffFuturesExceptionIgnored(txReadFutures, 
request)
                                 .thenRun(() -> releaseTxLocks(request.txId()));
@@ -1703,6 +1720,37 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
+    private CompletableFuture<Void> applyCleanupCommand(
+            UUID transactionId,
+            boolean commit,
+            HybridTimestamp commitTimestamp,
+            long commitTimestampLong,
+            int catalogVersion
+    ) {
+        TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
+                .txId(transactionId)
+                .commit(commit)
+                .commitTimestampLong(commitTimestampLong)
+                .safeTimeLong(hybridClock.nowLong())
+                .txCoordinatorId(getTxCoordinatorId(transactionId))
+                .requiredCatalogVersion(catalogVersion)
+                .build();
+
+        storageUpdateHandler.handleTransactionCleanup(transactionId, commit, 
commitTimestamp);
+
+        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+        applyCmdWithRetryOnSafeTimeReorderException(txCleanupCmd, 
resultFuture);
+
+        return resultFuture
+                .exceptionally(e -> {
+                    LOG.warn("Failed to complete transaction cleanup command 
[txId=" + transactionId + ']', e);
+
+                    return completedFuture(null);
+                })
+                .thenApply(res -> null);
+    }
+
     private String getTxCoordinatorId(UUID txId) {
         TxStateMeta meta = txManager.stateMeta(txId);
 
@@ -2371,8 +2419,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param cmd Raft command.
      * @return Raft future.
      */
-    private CompletableFuture<Object> applyCmdWithExceptionHandling(Command 
cmd) {
-        return raftClient.run(cmd).exceptionally(throwable -> {
+    private CompletableFuture<Object> applyCmdWithExceptionHandling(Command 
cmd, CompletableFuture<Object> resultFuture) {
+        applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);
+
+        return resultFuture.exceptionally(throwable -> {
             if (throwable instanceof TimeoutException) {
                 throw new ReplicationTimeoutException(replicationGroupId);
             } else if (throwable instanceof RuntimeException) {
@@ -2383,6 +2433,54 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
+    private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture, 0);
+    }
+
+    private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<Object> resultFuture, int attemptsCounter) {
+        attemptsCounter++;
+        if (attemptsCounter >= MAX_RETIES_ON_SAFE_TIME_REORDERING) {
+            resultFuture.completeExceptionally(
+                    new 
ReplicationMaxRetriesExceededException(replicationGroupId, 
MAX_RETIES_ON_SAFE_TIME_REORDERING));
+        }
+
+        raftClient.run(cmd).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof SafeTimeReorderException || ex.getCause() 
instanceof SafeTimeReorderException) {
+                    assert cmd instanceof SafeTimePropagatingCommand;
+
+                    SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) cmd;
+
+                    HybridTimestamp safeTimeForRetry = hybridClock.now();
+
+                    // Within primary replica it's required to update safe 
time in order to prevent double storage updates in case of !1PC.
+                    // Otherwise, it may be possible that a newer entry will 
be overwritten by an older one that came as part of the raft
+                    // replication flow:
+                    // tx1 = transactions.begin();
+                    // tx1.put(k1, v1) -> primary.apply(k1,v1) + asynchronous 
raft replication (k1,v1)
+                    // tx1.put(k1, v2) -> primary.apply(k1,v2) + asynchronous 
raft replication (k1,v1)
+                    // (k1,v1) replication overrides newer (k1, v2). 
Eventually (k1,v2) replication will restore proper value.
+                    // However it's possible that tx1.get(k1) will see v1 
instead of v2.
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Better solution requied. 
Given one is correct, but fragile.
+                    if ((cmd instanceof UpdateCommand && !((UpdateCommand) 
cmd).full())
+                            || (cmd instanceof UpdateAllCommand && 
!((UpdateAllCommand) cmd).full())) {
+                        synchronized (safeTime) {
+                            
updateTrackerIgnoringTrackerClosedException(safeTime, safeTimeForRetry);
+                        }
+                    }
+
+                    
safeTimePropagatingCommand.safeTimeLong(safeTimeForRetry.longValue());
+
+                    
applyCmdWithRetryOnSafeTimeReorderException(safeTimePropagatingCommand, 
resultFuture);
+                } else {
+                    resultFuture.completeExceptionally(ex);
+                }
+            } else {
+                resultFuture.complete(res);
+            }
+        });
+    }
+
     /**
      * Executes an Update command.
      *
@@ -2420,34 +2518,37 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             );
 
             if (!cmd.full()) {
-                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                    // This check guaranties the result will never be lost. 
Currently always null.
-                    assert res == null : "Replication result is lost";
-
-                    // Set context for delayed response.
-                    return cmd.txId();
-                });
-
                 // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 
Temporary code below
                 synchronized (safeTime) {
-                    if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                        storageUpdateHandler.handleUpdate(
-                                cmd.txId(),
-                                cmd.rowUuid(),
-                                cmd.tablePartitionId().asTablePartitionId(),
-                                cmd.rowToUpdate(),
-                                true,
-                                null,
-                                null,
-                                null);
+                    storageUpdateHandler.handleUpdate(
+                            cmd.txId(),
+                            cmd.rowUuid(),
+                            cmd.tablePartitionId().asTablePartitionId(),
+                            cmd.rowToUpdate(),
+                            true,
+                            null,
+                            null,
+                            null);
 
-                        updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-                    }
+                    updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                 }
 
+                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
+                        .thenApply(res -> {
+                            // This check guaranties the result will never be 
lost. Currently always null.
+                            assert res == null : "Replication result is lost";
+
+                            // Set context for delayed response.
+                            return cmd.txId();
+                        });
+
                 return completedFuture(fut);
             } else {
-                return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+                CompletableFuture<Object> resultFuture = new 
CompletableFuture<>();
+
+                applyCmdWithExceptionHandling(cmd, resultFuture);
+
+                return resultFuture.thenApply(res -> {
                     // This check guaranties the result will never be lost. 
Currently always null.
                     assert res == null : "Replication result is lost";
 
@@ -2513,7 +2614,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param txCoordinatorId Transaction coordinator id.
      * @param catalogVersion Validated catalog version associated with given 
operation.
      * @param skipDelayedAck {@code true} to disable the delayed ack 
optimization.
-     * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
+     * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command, 
CompletableFuture)}.
      */
     private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
             Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
@@ -2539,68 +2640,66 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 if (skipDelayedAck) {
                     // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
                     synchronized (safeTime) {
-                        if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                            storageUpdateHandler.handleUpdateAll(
-                                    cmd.txId(),
-                                    cmd.rowsToUpdate(),
-                                    
cmd.tablePartitionId().asTablePartitionId(),
-                                    true,
-                                    null,
-                                    null
-                            );
+                        storageUpdateHandler.handleUpdateAll(
+                                cmd.txId(),
+                                cmd.rowsToUpdate(),
+                                cmd.tablePartitionId().asTablePartitionId(),
+                                true,
+                                null,
+                                null
+                        );
 
-                            
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
-                        }
+                        updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                     }
 
-                    return applyCmdWithExceptionHandling(cmd).thenApply(res -> 
null);
+                    return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<>()).thenApply(res -> null);
                 } else {
-                    CompletableFuture<Object> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                        // Currently result is always null on a successfull 
execution of a replication command.
-                        // This check guaranties the result will never be lost.
-                        assert res == null : "Replication result is lost";
-
-                        // Set context for delayed response.
-                        return cmd.txId();
-                    });
-
                     // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
                     synchronized (safeTime) {
-                        if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                            storageUpdateHandler.handleUpdateAll(
-                                    cmd.txId(),
-                                    cmd.rowsToUpdate(),
-                                    
cmd.tablePartitionId().asTablePartitionId(),
-                                    true,
-                                    null,
-                                    null
-                            );
+                        storageUpdateHandler.handleUpdateAll(
+                                cmd.txId(),
+                                cmd.rowsToUpdate(),
+                                cmd.tablePartitionId().asTablePartitionId(),
+                                true,
+                                null,
+                                null
+                        );
 
-                            
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
-                        }
+                        updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                     }
 
+                    CompletableFuture<Object> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
+                            .thenApply(res -> {
+                                // Currently result is always null on a 
successfull execution of a replication command.
+                                // This check guaranties the result will never 
be lost.
+                                assert res == null : "Replication result is 
lost";
+
+                                // Set context for delayed response.
+                                return cmd.txId();
+                            });
+
                     return completedFuture(fut);
                 }
             } else {
-                return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                    assert res == null : "Replication result is lost";
-
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
-                    // In case of full (1PC) commit double update is only a 
matter of optimisation and not correctness, because
-                    // there's no other transaction that can rewrite given key 
because of locks and same transaction re-write isn't possible
-                    // just because there's only one operation in 1PC.
-                    storageUpdateHandler.handleUpdateAll(
-                            cmd.txId(),
-                            cmd.rowsToUpdate(),
-                            cmd.tablePartitionId().asTablePartitionId(),
-                            false,
-                            null,
-                            cmd.safeTime()
-                    );
+                return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<>())
+                        .thenApply(res -> {
+                            assert res == null : "Replication result is lost";
+
+                            // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
+                            // In case of full (1PC) commit double update is 
only a matter of optimisation and not correctness, because
+                            // there's no other transaction that can rewrite 
given key because of locks and same transaction re-write isn't
+                            // possible just because there's only one 
operation in 1PC.
+                            storageUpdateHandler.handleUpdateAll(
+                                    cmd.txId(),
+                                    cmd.rowsToUpdate(),
+                                    
cmd.tablePartitionId().asTablePartitionId(),
+                                    false,
+                                    null,
+                                    cmd.safeTime()
+                            );
 
-                    return null;
-                });
+                            return null;
+                        });
             }
         }
     }
@@ -2612,7 +2711,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
TimedBinaryRowMessage}s to be updated.
      * @param txCoordinatorId Transaction coordinator id.
      * @param catalogVersion Validated catalog version associated with given 
operation.
-     * @return Raft future, see {@link 
#applyCmdWithExceptionHandling(Command)}.
+     * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command, 
CompletableFuture)}.
      */
     private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
             ReadWriteMultiRowReplicaRequest request,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 3773ccea66..52671921cc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -347,19 +347,17 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     void testSkipWriteCommandByAppliedIndex() {
         mvPartitionStorage.lastApplied(10L, 1L);
 
-        HybridTimestamp timestamp = hybridClock.now();
-
         UpdateCommand updateCommand = mock(UpdateCommand.class);
-        when(updateCommand.safeTime()).thenReturn(timestamp);
+        when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
 
         TxCleanupCommand txCleanupCommand = mock(TxCleanupCommand.class);
-        when(txCleanupCommand.safeTime()).thenReturn(timestamp);
+        when(txCleanupCommand.safeTime()).thenAnswer(v -> hybridClock.now());
 
         SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
-        when(safeTimeSyncCommand.safeTime()).thenReturn(timestamp);
+        when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
 
         FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
-        when(finishTxCommand.safeTime()).thenReturn(timestamp);
+        when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now());
 
         // Checks for MvPartitionStorage.
         commandListener.onWrite(List.of(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index a05bb3b393..52f8ea4a7a 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -382,6 +382,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
     /**
      * Tests negative transfer scenario.
      */
+    @Test
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-17861";)
     public void testTxClosureAbortAsync() {
         double balance1 = 10.;


Reply via email to