This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a86c71d114 IGNITE-23503 Add maxObservableSafeTime awaiting logic on
SafeTimeReorderException (#4612)
a86c71d114 is described below
commit a86c71d1145524f2ed7e4905e743613731441237
Author: Alexander Lapin <[email protected]>
AuthorDate: Wed Oct 23 13:12:45 2024 +0300
IGNITE-23503 Add maxObservableSafeTime awaiting logic on
SafeTimeReorderException (#4612)
---
.../internal/lang/SafeTimeReorderException.java | 15 +++++++++++-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 5 +++-
.../apache/ignite/raft/jraft/rpc/RpcRequests.java | 8 +++++++
.../jraft/rpc/impl/ActionRequestProcessor.java | 7 +++++-
.../table/distributed/raft/PartitionListener.java | 2 +-
.../replicator/PartitionReplicaListener.java | 28 +++++++++++++++-------
.../apache/ignite/distributed/ItTxTestCluster.java | 25 ++++++++++++++++---
7 files changed, 75 insertions(+), 15 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
index 9c5faab605..647ba3f4be 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/SafeTimeReorderException.java
@@ -23,13 +23,26 @@ import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICATION_SAFE_TIM
* This exception is used to indicate a detection of a safe time reordering.
*/
public class SafeTimeReorderException extends IgniteInternalException {
+ /** maxObservableSafeTime at the moment of violation. */
+ private final long maxObservableSafeTimeViolatedValue;
/**
* The constructor.
+ *
+ * @param maxObservableSafeTimeViolatedValue maxObservableSafeTime at the
moment of violation
*/
- public SafeTimeReorderException() {
+ public SafeTimeReorderException(long maxObservableSafeTimeViolatedValue) {
super(REPLICATION_SAFE_TIME_REORDERING_ERR, "Replication safe time
reordering detected.");
+ this.maxObservableSafeTimeViolatedValue =
maxObservableSafeTimeViolatedValue;
}
+ /**
+ * Returns {@code maxObservableSafeTime} at the moment of violation.
+ *
+ * @return maxObservableSafeTime at the moment of violation.
+ */
+ public long maxObservableSafeTimeViolatedValue() {
+ return maxObservableSafeTimeViolatedValue;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index be6578b27c..1f4e1e6ebd 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -701,7 +701,10 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
case EREORDER:
- fut.completeExceptionally(new SafeTimeReorderException());
+ assert resp.maxObservableSafeTimeViolatedValue() != null :
+ "Unexpected combination of EREORDER error type and
null in maxObservableSafeTimeViolatedValue.";
+
+ fut.completeExceptionally(new
SafeTimeReorderException(resp.maxObservableSafeTimeViolatedValue()));
break;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 0028324c5b..92e361136e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -71,6 +71,14 @@ public final class RpcRequests {
*/
@Nullable
String leaderId();
+
+ /**
+ * Violated maxObservableSafeTime if safe time reordering was
detected, null otherwise.
+ *
+ * @return maxObservableSafeTime if safe time reordering was
detected, null otherwise.
+ */
+ @Nullable
+ Long maxObservableSafeTimeViolatedValue();
}
@Transferable(value =
RaftMessageGroup.RpcRequestsMessageGroup.SM_ERROR_RESPONSE)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index 407746cca9..cc08fcb7e4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -114,7 +114,12 @@ public class ActionRequestProcessor implements
RpcProcessor<ActionRequest> {
try {
writeRequest = patchCommandBeforeApply(writeRequest,
(BeforeApplyHandler) listener, command, commandsMarshaller);
} catch (SafeTimeReorderException e) {
-
rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.EREORDER.getNumber()).build());
+ rpcCtx.sendResponse(
+ factory.errorResponse()
+
.maxObservableSafeTimeViolatedValue(e.maxObservableSafeTimeViolatedValue())
+ .errorCode(RaftError.EREORDER.getNumber())
+ .build()
+ );
return;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 569cc6308c..91cfe46c9e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -596,7 +596,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
if (proposedSafeTime >= maxObservableSafeTime) {
maxObservableSafeTime = proposedSafeTime;
} else {
- throw new SafeTimeReorderException();
+ throw new SafeTimeReorderException(maxObservableSafeTime);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 8ac3bdeea7..56c78a78b8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -259,7 +259,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
/** Replication retries limit. */
- private static final int MAX_RETIES_ON_SAFE_TIME_REORDERING = 1000;
+ private static final int MAX_RETRIES_ON_SAFE_TIME_REORDERING = 1000;
/** Replication group id. */
private final TablePartitionId replicationGroupId;
@@ -2657,25 +2657,37 @@ public class PartitionReplicaListener implements
ReplicaListener {
private <T> void applyCmdWithRetryOnSafeTimeReorderException(Command cmd,
CompletableFuture<T> resultFuture, int attemptsCounter) {
attemptsCounter++;
- if (attemptsCounter >= MAX_RETIES_ON_SAFE_TIME_REORDERING) {
+ if (attemptsCounter >= MAX_RETRIES_ON_SAFE_TIME_REORDERING) {
resultFuture.completeExceptionally(
- new
ReplicationMaxRetriesExceededException(replicationGroupId,
MAX_RETIES_ON_SAFE_TIME_REORDERING));
+ new
ReplicationMaxRetriesExceededException(replicationGroupId,
MAX_RETRIES_ON_SAFE_TIME_REORDERING));
}
+ int attemptsCounter0 = attemptsCounter;
raftClient.run(cmd).whenComplete((res, ex) -> {
if (ex != null) {
if (ex instanceof SafeTimeReorderException || ex.getCause()
instanceof SafeTimeReorderException) {
assert cmd instanceof SafeTimePropagatingCommand;
+ SafeTimeReorderException safeTimeReorderException =
(SafeTimeReorderException) (ex instanceof SafeTimeReorderException
+ ? ex : ex.getCause());
+
SafeTimePropagatingCommand safeTimePropagatingCommand =
(SafeTimePropagatingCommand) cmd;
- HybridTimestamp safeTimeForRetry = clockService.now();
+
clockService.waitFor(hybridTimestamp(safeTimeReorderException.maxObservableSafeTimeViolatedValue())).thenRun(
+ () -> {
+ HybridTimestamp safeTimeForRetry =
clockService.now();
- SafeTimePropagatingCommand
clonedSafeTimePropagatingCommand =
- (SafeTimePropagatingCommand)
safeTimePropagatingCommand.clone();
-
clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry);
+ SafeTimePropagatingCommand
clonedSafeTimePropagatingCommand =
+ (SafeTimePropagatingCommand)
safeTimePropagatingCommand.clone();
+
clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry);
-
applyCmdWithRetryOnSafeTimeReorderException(clonedSafeTimePropagatingCommand,
resultFuture);
+ applyCmdWithRetryOnSafeTimeReorderException(
+ clonedSafeTimePropagatingCommand,
+ resultFuture,
+ attemptsCounter0
+ );
+ }
+ );
} else {
resultFuture.completeExceptionally(ex);
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 0b98c34dc5..5cd302fcac 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -74,6 +75,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -213,9 +215,11 @@ public class ItTxTestCluster {
private ClusterService client;
private HybridClock clientClock;
+ private ClockWaiter clientClockWaiter;
private ClockService clientClockService;
private Map<String, HybridClock> clocks;
+ private Collection<ClockWaiter> clockWaiters;
protected Map<String, ClockService> clockServices;
private ReplicaService clientReplicaSvc;
@@ -373,6 +377,7 @@ public class ItTxTestCluster {
// Start raft servers. Each raft server can hold multiple groups.
clocks = new HashMap<>(nodes);
+ clockWaiters = new ArrayList<>(nodes);
clockServices = new HashMap<>(nodes);
raftServers = new HashMap<>(nodes);
replicaManagers = new HashMap<>(nodes);
@@ -399,11 +404,14 @@ public class ItTxTestCluster {
ClusterNode node = clusterService.topologyService().localMember();
HybridClock clock = new HybridClockImpl();
- TestClockService clockService = new TestClockService(clock);
+ ClockWaiter clockWaiter = new ClockWaiter("test-node" + i, clock);
+ assertThat(clockWaiter.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ TestClockService clockService = new TestClockService(clock,
clockWaiter);
String nodeName = node.name();
clocks.put(nodeName, clock);
+ clockWaiters.add(clockWaiter);
clockServices.put(nodeName, clockService);
Path partitionsWorkDir = workDir.resolve("node" + i);
@@ -985,6 +993,16 @@ public class ItTxTestCluster {
if (clientTxManager != null) {
assertThat(clientTxManager.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
}
+
+ if (clientClockWaiter != null) {
+ assertThat(clientClockWaiter.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
+ }
+
+ if (clockWaiters != null) {
+ for (ClockWaiter clockWaiter : clockWaiters) {
+ assertThat(clockWaiter.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
+ }
+ }
}
/**
@@ -1010,8 +1028,9 @@ public class ItTxTestCluster {
assertTrue(waitForTopology(client, nodes + 1, 1000));
clientClock = new HybridClockImpl();
-
- clientClockService = new TestClockService(clientClock);
+ clientClockWaiter = new ClockWaiter("client-node", clientClock);
+ assertThat(clientClockWaiter.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ clientClockService = new TestClockService(clientClock,
clientClockWaiter);
LOG.info("Replica manager has been started, node=[" +
client.topologyService().localMember() + ']');