This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a86c71d114 IGNITE-23503 Add maxObservableSafeTime awaiting logic on 
SafeTimeReorderException (#4612)
a86c71d114 is described below

commit a86c71d1145524f2ed7e4905e743613731441237
Author: Alexander Lapin <[email protected]>
AuthorDate: Wed Oct 23 13:12:45 2024 +0300

    IGNITE-23503 Add maxObservableSafeTime awaiting logic on 
SafeTimeReorderException (#4612)
---
 .../internal/lang/SafeTimeReorderException.java    | 15 +++++++++++-
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  5 +++-
 .../apache/ignite/raft/jraft/rpc/RpcRequests.java  |  8 +++++++
 .../jraft/rpc/impl/ActionRequestProcessor.java     |  7 +++++-
 .../table/distributed/raft/PartitionListener.java  |  2 +-
 .../replicator/PartitionReplicaListener.java       | 28 +++++++++++++++-------
 .../apache/ignite/distributed/ItTxTestCluster.java | 25 ++++++++++++++++---
 7 files changed, 75 insertions(+), 15 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
index 9c5faab605..647ba3f4be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
@@ -23,13 +23,26 @@ import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICATION_SAFE_TIM
  * This exception is used to indicate a detection of a safe time reordering.
  */
 public class SafeTimeReorderException extends IgniteInternalException {
+    /** maxObservableSafeTime at the moment of violation. */
+    private final long maxObservableSafeTimeViolatedValue;
 
     /**
      * The constructor.
+     *
+     * @param maxObservableSafeTimeViolatedValue maxObservableSafeTime at the 
moment of violation
      */
-    public SafeTimeReorderException() {
+    public SafeTimeReorderException(long maxObservableSafeTimeViolatedValue) {
         super(REPLICATION_SAFE_TIME_REORDERING_ERR, "Replication safe time 
reordering detected.");
+        this.maxObservableSafeTimeViolatedValue = 
maxObservableSafeTimeViolatedValue;
     }
 
+    /**
+     * Returns {@code maxObservableSafeTime} at the moment of violation.
+     *
+     * @return maxObservableSafeTime at the moment of violation.
+     */
+    public long maxObservableSafeTimeViolatedValue() {
+        return maxObservableSafeTimeViolatedValue;
+    }
 }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index be6578b27c..1f4e1e6ebd 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -701,7 +701,10 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             }
 
             case EREORDER:
-                fut.completeExceptionally(new SafeTimeReorderException());
+                assert resp.maxObservableSafeTimeViolatedValue() != null :
+                        "Unexpected combination of EREORDER error type and 
null in maxObservableSafeTimeViolatedValue.";
+
+                fut.completeExceptionally(new 
SafeTimeReorderException(resp.maxObservableSafeTimeViolatedValue()));
 
                 break;
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 0028324c5b..92e361136e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -71,6 +71,14 @@ public final class RpcRequests {
          */
         @Nullable
         String leaderId();
+
+         /**
+          * Violated maxObservableSafeTime if safe time reordering was 
detected, null otherwise.
+          *
+          * @return maxObservableSafeTime if safe time reordering was 
detected, null otherwise.
+          */
+        @Nullable
+        Long maxObservableSafeTimeViolatedValue();
     }
 
     @Transferable(value = 
RaftMessageGroup.RpcRequestsMessageGroup.SM_ERROR_RESPONSE)
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index 407746cca9..cc08fcb7e4 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -114,7 +114,12 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
                     try {
                         writeRequest = patchCommandBeforeApply(writeRequest, 
(BeforeApplyHandler) listener, command, commandsMarshaller);
                     } catch (SafeTimeReorderException e) {
-                        
rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.EREORDER.getNumber()).build());
+                        rpcCtx.sendResponse(
+                                factory.errorResponse()
+                                    
.maxObservableSafeTimeViolatedValue(e.maxObservableSafeTimeViolatedValue())
+                                    .errorCode(RaftError.EREORDER.getNumber())
+                                    .build()
+                        );
 
                         return;
                     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 569cc6308c..91cfe46c9e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -596,7 +596,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
             if (proposedSafeTime >= maxObservableSafeTime) {
                 maxObservableSafeTime = proposedSafeTime;
             } else {
-                throw new SafeTimeReorderException();
+                throw new SafeTimeReorderException(maxObservableSafeTime);
             }
         }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 8ac3bdeea7..56c78a78b8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -259,7 +259,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
 
     /** Replication retries limit. */
-    private static final int MAX_RETIES_ON_SAFE_TIME_REORDERING = 1000;
+    private static final int MAX_RETRIES_ON_SAFE_TIME_REORDERING = 1000;
 
     /** Replication group id. */
     private final TablePartitionId replicationGroupId;
@@ -2657,25 +2657,37 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private <T> void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<T> resultFuture, int attemptsCounter) {
         attemptsCounter++;
-        if (attemptsCounter >= MAX_RETIES_ON_SAFE_TIME_REORDERING) {
+        if (attemptsCounter >= MAX_RETRIES_ON_SAFE_TIME_REORDERING) {
             resultFuture.completeExceptionally(
-                    new 
ReplicationMaxRetriesExceededException(replicationGroupId, 
MAX_RETIES_ON_SAFE_TIME_REORDERING));
+                    new 
ReplicationMaxRetriesExceededException(replicationGroupId, 
MAX_RETRIES_ON_SAFE_TIME_REORDERING));
         }
 
+        int attemptsCounter0 = attemptsCounter;
         raftClient.run(cmd).whenComplete((res, ex) -> {
             if (ex != null) {
                 if (ex instanceof SafeTimeReorderException || ex.getCause() 
instanceof SafeTimeReorderException) {
                     assert cmd instanceof SafeTimePropagatingCommand;
 
+                    SafeTimeReorderException safeTimeReorderException = 
(SafeTimeReorderException) (ex instanceof SafeTimeReorderException
+                            ? ex : ex.getCause());
+
                     SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) cmd;
 
-                    HybridTimestamp safeTimeForRetry = clockService.now();
+                    
clockService.waitFor(hybridTimestamp(safeTimeReorderException.maxObservableSafeTimeViolatedValue())).thenRun(
+                            () -> {
+                                HybridTimestamp safeTimeForRetry = 
clockService.now();
 
-                    SafeTimePropagatingCommand 
clonedSafeTimePropagatingCommand =
-                            (SafeTimePropagatingCommand) 
safeTimePropagatingCommand.clone();
-                    
clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry);
+                                SafeTimePropagatingCommand 
clonedSafeTimePropagatingCommand =
+                                        (SafeTimePropagatingCommand) 
safeTimePropagatingCommand.clone();
+                                
clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry);
 
-                    
applyCmdWithRetryOnSafeTimeReorderException(clonedSafeTimePropagatingCommand, 
resultFuture);
+                                applyCmdWithRetryOnSafeTimeReorderException(
+                                        clonedSafeTimePropagatingCommand,
+                                        resultFuture,
+                                        attemptsCounter0
+                                );
+                            }
+                    );
                 } else {
                     resultFuture.completeExceptionally(ex);
                 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 0b98c34dc5..5cd302fcac 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -74,6 +75,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -213,9 +215,11 @@ public class ItTxTestCluster {
     private ClusterService client;
 
     private HybridClock clientClock;
+    private ClockWaiter clientClockWaiter;
     private ClockService clientClockService;
 
     private Map<String, HybridClock> clocks;
+    private Collection<ClockWaiter> clockWaiters;
     protected Map<String, ClockService> clockServices;
 
     private ReplicaService clientReplicaSvc;
@@ -373,6 +377,7 @@ public class ItTxTestCluster {
 
         // Start raft servers. Each raft server can hold multiple groups.
         clocks = new HashMap<>(nodes);
+        clockWaiters = new ArrayList<>(nodes);
         clockServices = new HashMap<>(nodes);
         raftServers = new HashMap<>(nodes);
         replicaManagers = new HashMap<>(nodes);
@@ -399,11 +404,14 @@ public class ItTxTestCluster {
             ClusterNode node = clusterService.topologyService().localMember();
 
             HybridClock clock = new HybridClockImpl();
-            TestClockService clockService = new TestClockService(clock);
+            ClockWaiter clockWaiter = new ClockWaiter("test-node" + i, clock);
+            assertThat(clockWaiter.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+            TestClockService clockService = new TestClockService(clock, 
clockWaiter);
 
             String nodeName = node.name();
 
             clocks.put(nodeName, clock);
+            clockWaiters.add(clockWaiter);
             clockServices.put(nodeName, clockService);
 
             Path partitionsWorkDir = workDir.resolve("node" + i);
@@ -985,6 +993,16 @@ public class ItTxTestCluster {
         if (clientTxManager != null) {
             assertThat(clientTxManager.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
         }
+
+        if (clientClockWaiter != null) {
+            assertThat(clientClockWaiter.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+        }
+
+        if (clockWaiters != null) {
+            for (ClockWaiter clockWaiter : clockWaiters) {
+                assertThat(clockWaiter.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+            }
+        }
     }
 
     /**
@@ -1010,8 +1028,9 @@ public class ItTxTestCluster {
         assertTrue(waitForTopology(client, nodes + 1, 1000));
 
         clientClock = new HybridClockImpl();
-
-        clientClockService = new TestClockService(clientClock);
+        clientClockWaiter = new ClockWaiter("client-node", clientClock);
+        assertThat(clientClockWaiter.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+        clientClockService = new TestClockService(clientClock, 
clientClockWaiter);
 
         LOG.info("Replica manager has been started, node=[" + 
client.topologyService().localMember() + ']');
 

Reply via email to