This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 505d76485d8 IGNITE-27288 Enforce monotonic safe ts assignment
505d76485d8 is described below
commit 505d76485d84f7e6ade05d5637b40c2786dab009
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Dec 19 10:20:02 2025 +0300
IGNITE-27288 Enforce monotonic safe ts assignment
---
.../internal/util/SafeTimeValuesTracker.java | 33 ++++++++++++---
.../replicator/raft/ZonePartitionRaftListener.java | 37 +++++++++--------
.../internal/raft/service/CommandClosure.java | 2 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 2 +-
.../ignite/raft/jraft/core/MockStateMachine.java | 2 +-
.../internal/raft/server/impl/JraftServerImpl.java | 48 +++++++++-------------
.../apache/ignite/raft/jraft/core/NodeImpl.java | 48 +++++++++++++---------
.../raft/jraft/core/StateMachineAdapter.java | 25 +++++++----
.../raft/StateMachineFailureHandlerTest.java | 3 ++
9 files changed, 118 insertions(+), 82 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
index 828dbb35d55..70fd674feb4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.util;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
import java.util.Map;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,8 +33,20 @@ public class SafeTimeValuesTracker extends
PendingComparableValuesTracker<Hybrid
super(initialValue);
}
- @Override
- public void update(HybridTimestamp newValue, @Nullable Void futureResult) {
+ // Holds successful application context.
+ private long commandIndex;
+ private long commandTerm;
+ private String commandClassName;
+
+ /**
+ * Update safe timestamp.
+ *
+ * @param safeTs The value.
+ * @param commandIndex Command index.
+ * @param commandTerm Command term.
+ * @param command The command.
+ */
+ public void update(HybridTimestamp safeTs, long commandIndex, long
commandTerm, Object command) {
if (!enterBusy()) {
throw new TrackerClosedException();
}
@@ -39,15 +54,23 @@ public class SafeTimeValuesTracker extends
PendingComparableValuesTracker<Hybrid
try {
Map.Entry<HybridTimestamp, @Nullable Void> current = this.current;
- IgniteBiTuple<HybridTimestamp, @Nullable Void> newEntry = new
IgniteBiTuple<>(newValue, futureResult);
+ IgniteBiTuple<HybridTimestamp, @Nullable Void> newEntry = new
IgniteBiTuple<>(safeTs, null);
// Entries from the same batch receive equal safe timestamps.
if (comparator.compare(newEntry, current) < 0) {
- throw new AssertionError("Reordering detected: [old=" +
current.getKey() + ", new=" + newEntry.get1() + ']');
+ throw new IgniteInternalException(INTERNAL_ERR,
+ "Reordering detected: [old=" + current.getKey() + ",
new=" + newEntry.get1()
+ + ", oldIndex=" + this.commandIndex
+ + ", oldTerm=" + this.commandTerm
+ + ", oldCommandClassName=" +
this.commandClassName
+ + ']');
}
CURRENT.set(this, newEntry);
- completeWaitersOnUpdate(newValue, futureResult);
+
+ this.commandIndex = commandIndex;
+ this.commandTerm = commandTerm;
+ this.commandClassName = command.getClass().getName();
} finally {
leaveBusy();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 27a3609d4d0..cd6b45b38a5 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -161,14 +161,14 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
try {
processWriteCommand(clo);
} catch (Throwable t) {
+ clo.result(t);
+
LOG.error(
- "Unknown error while processing command
[commandIndex={}, commandTerm={}, command={}]",
+ "Failed to process write command [commandIndex={},
commandTerm={}, command={}]",
t,
- clo.index(), clo.index(), clo.command()
+ clo.index(), clo.term(), clo.command()
);
- clo.result(t);
-
throw t;
}
});
@@ -232,10 +232,18 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
if (result.wasApplied()) {
// Adjust safe time before completing update to reduce
waiting.
if (safeTimestamp != null) {
-
updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTimestamp);
+ try {
+ safeTimeTracker.update(safeTimestamp,
commandIndex, commandTerm, command);
+ } catch (TrackerClosedException ignored) {
+ // Ignored.
+ }
}
-
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex);
+ try {
+ storageIndexTracker.update(commandIndex, null);
+ } catch (TrackerClosedException ignored) {
+ // Ignored.
+ }
}
lastAppliedIndex = max(lastAppliedIndex, commandIndex);
@@ -347,7 +355,11 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
this.lastAppliedIndex = max(this.lastAppliedIndex,
lastAppliedIndex);
this.lastAppliedTerm = max(this.lastAppliedTerm, lastAppliedTerm);
- updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
lastAppliedIndex);
+ try {
+ storageIndexTracker.update(lastAppliedIndex, null);
+ } catch (TrackerClosedException ignored) {
+ // Ignored.
+ }
}
}
@@ -478,17 +490,6 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
}
}
- private static <T extends Comparable<T>> void
updateTrackerIgnoringTrackerClosedException(
- PendingComparableValuesTracker<T, Void> tracker,
- T newValue
- ) {
- try {
- tracker.update(newValue, null);
- } catch (TrackerClosedException ignored) {
- // No-op.
- }
- }
-
private void cleanupSnapshots() {
partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey);
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
index 7074b4b5328..51dc910a7b7 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
@@ -59,7 +59,7 @@ public interface CommandClosure<R extends Command> {
R command();
/**
- * Must be called after a command has been processed normally.
+ * Must be called after a command has been processed (with result or
exception).
*
* @param res Execution result.
*/
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 206e651427e..6bdecc2d05d 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -459,7 +459,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
CountDownLatch readIndexLatch = new CountDownLatch(1);
AtomicInteger currentValue = new AtomicInteger(-1);
String errorMsg = testInfo.getDisplayName();
- StateMachine fsm = new StateMachineAdapter() {
+ StateMachine fsm = new StateMachineAdapter("test") {
@Override
public void onApply(Iterator iter) {
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
index 1de8279352e..cbf324067d3 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
@@ -62,7 +62,7 @@ public class MockStateMachine extends StateMachineAdapter {
}
public MockStateMachine(final PeerId peerId) {
- super();
+ super("test");
this.peerId = peerId;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 9a98ab4c0d2..1efb37bdbcf 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -509,7 +509,8 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller());
}
- nodeOptions.setFsm(new DelegatingStateMachine(lsnr,
nodeOptions.getCommandsMarshaller(), failureManager));
+ nodeOptions.setFsm(
+ new DelegatingStateMachine(nodeId.groupId().toString(),
lsnr, nodeOptions.getCommandsMarshaller(), failureManager));
nodeOptions.setRaftGrpEvtsLsnr(new
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor,
evLsnr));
@@ -839,11 +840,13 @@ public class JraftServerImpl implements RaftServer {
/**
* Constructor.
*
+ * @param label State machine label.
* @param listener The listener.
* @param marshaller Marshaller.
* @param failureManager Failure processor that is used to handle
critical errors.
*/
- public DelegatingStateMachine(RaftGroupListener listener, Marshaller
marshaller, FailureManager failureManager) {
+ public DelegatingStateMachine(String label, RaftGroupListener
listener, Marshaller marshaller, FailureManager failureManager) {
+ super(label);
this.listener = listener;
this.marshaller = marshaller;
this.failureManager = failureManager;
@@ -855,11 +858,13 @@ public class JraftServerImpl implements RaftServer {
@Override
public void onApply(Iterator iter) {
- var writeCommandIterator = new WriteCommandIterator(iter,
marshaller);
+ var iterWrapper = new WriteCommandIterator(iter, marshaller);
try {
- listener.onWrite(writeCommandIterator);
+ listener.onWrite(iterWrapper);
} catch (Throwable err) {
+ LOG.error("Unexpected error while processing command
[label={}]", err, label);
+
Status st;
if (err.getMessage() != null) {
@@ -868,13 +873,9 @@ public class JraftServerImpl implements RaftServer {
st = new Status(RaftError.ESTATEMACHINE, "Unknown state
machine error.");
}
- // This is necessary so that IndexOutOfBoundsException is not
thrown in a situation where the listener, when processing a
- // command, catch any exception and does clo.result(throwable)
(that actually advances the iterator) and then throws the
- // caught exception.
- Closure done = writeCommandIterator.doneForExceptionHandling();
-
- if (done != null) {
- done.run(st);
+ if (iterWrapper.done != null) {
+ // Trigger internal error for state machine.
+ iterWrapper.done.run(st);
}
iter.setErrorAndRollback(1, st);
@@ -979,7 +980,7 @@ public class JraftServerImpl implements RaftServer {
private final Marshaller marshaller;
- private @Nullable Closure latestDone;
+ private @Nullable Closure done;
private WriteCommandIterator(Iterator iter, Marshaller marshaller) {
this.iter = iter;
@@ -993,15 +994,14 @@ public class JraftServerImpl implements RaftServer {
@Override
public CommandClosure<WriteCommand> next() {
- @Nullable Closure currentDone = iter.done();
- latestDone = currentDone;
+ done = iter.done(); // Save for later error processing.
- @Nullable CommandClosure<WriteCommand> done =
(CommandClosure<WriteCommand>) currentDone;
+ @Nullable CommandClosure<WriteCommand> localDone =
(CommandClosure<WriteCommand>) done;
ByteBuffer data = iter.getData();
- // done != null means we are on the leader, otherwise a command
has been read from the log.
- WriteCommand command = done == null ? marshaller.unmarshall(data)
: done.command();
- HybridTimestamp safeTs = done == null ? command.safeTime() :
done.safeTimestamp();
+ // localDone != null means we are on the leader, otherwise a
command has been read from the log.
+ WriteCommand command = localDone == null ?
marshaller.unmarshall(data) : localDone.command();
+ HybridTimestamp safeTs = localDone == null ? command.safeTime() :
localDone.safeTimestamp();
long commandIndex = iter.getIndex();
long commandTerm = iter.getTerm();
@@ -1029,21 +1029,13 @@ public class JraftServerImpl implements RaftServer {
@Override
public void result(Serializable res) {
- if (done != null) {
- done.result(res);
+ if (localDone != null) {
+ localDone.result(res);
}
iter.next();
}
};
}
-
- private @Nullable Closure doneForExceptionHandling() {
- if (latestDone == null) {
- latestDone = iter.done();
- }
-
- return latestDone;
- }
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 506aa462e51..2df29468667 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -291,8 +291,6 @@ public class NodeImpl implements Node, RaftServerService {
// task list for batch
private final List<LogEntryAndClosure> tasks = new
ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
- private @Nullable HybridTimestamp safeTs = null;
-
@Override
public void onEvent(final LogEntryAndClosure event, final long
sequence, final boolean endOfBatch) {
if (event.shutdownLatch != null) {
@@ -304,23 +302,6 @@ public class NodeImpl implements Node, RaftServerService {
return;
}
- // Patch the command.
- if (event.done instanceof SafeTimeAwareCommandClosure) {
- SafeTimeAwareCommandClosure clo =
(SafeTimeAwareCommandClosure) event.done;
- WriteCommand command = clo.command();
- HybridTimestamp timestamp = command.initiatorTime();
-
- if (timestamp != null) {
- if (safeTs == null) {
- safeTs = clock.update(timestamp);
- } else if (timestamp.compareTo(safeTs) > 0) {
- safeTs = clock.update(timestamp);
- }
-
- clo.safeTimestamp(safeTs);
- }
- }
-
this.tasks.add(event);
if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch()
|| endOfBatch) {
executeApplyingTasks(this.tasks);
@@ -333,7 +314,6 @@ public class NodeImpl implements Node, RaftServerService {
task.reset();
}
this.tasks.clear();
- this.safeTs = null;
}
}
@@ -1715,9 +1695,13 @@ public class NodeImpl implements Node, RaftServerService
{
});
return;
}
+
+ @Nullable HybridTimestamp safeTs = null;
+
final List<LogEntry> entries = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
final LogEntryAndClosure task = tasks.get(i);
+
if (task.expectedTerm != -1 && task.expectedTerm !=
this.currTerm) {
LOG.debug("Node {} can't apply task whose expectedTerm={}
doesn't match currTerm={}.", getNodeId(),
task.expectedTerm, this.currTerm);
@@ -1735,6 +1719,10 @@ public class NodeImpl implements Node, RaftServerService
{
task.reset();
continue;
}
+
+ // To prevent safe timestamp values from becoming stale, we
must assign them under a valid leader lock.
+ safeTs = tryAssignSafeTimestamp(task, safeTs);
+
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
@@ -1750,6 +1738,26 @@ public class NodeImpl implements Node, RaftServerService
{
}
}
+ private @Nullable HybridTimestamp
tryAssignSafeTimestamp(LogEntryAndClosure task, @Nullable HybridTimestamp
safeTs) {
+ if (task.done instanceof SafeTimeAwareCommandClosure) {
+ SafeTimeAwareCommandClosure clo = (SafeTimeAwareCommandClosure)
task.done;
+ WriteCommand command = clo.command();
+ HybridTimestamp timestamp = command.initiatorTime();
+
+ if (timestamp != null) {
+ if (safeTs == null) {
+ safeTs = clock.update(timestamp);
+ } else if (timestamp.compareTo(safeTs) > 0) {
+ safeTs = clock.update(timestamp);
+ }
+
+ clo.safeTimestamp(safeTs);
+ }
+ }
+
+ return safeTs;
+ }
+
/**
* Builds a status for 'Cannot apply because this node is not a leader'
situation.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
index 9e67e9905c2..ab6a0d26391 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
@@ -37,9 +37,18 @@ public abstract class StateMachineAdapter implements
StateMachine {
/** The logger */
private static final IgniteLogger LOG =
Loggers.forClass(StateMachineAdapter.class);
+ protected final String label;
+
+ /**
+ * @param label The state machine label.
+ */
+ public StateMachineAdapter(String label) {
+ this.label = label;
+ }
+
@Override
public void onShutdown() {
- LOG.info("onShutdown.");
+ LOG.info("onShutdown: label={}.", label);
}
@Override
@@ -61,34 +70,34 @@ public abstract class StateMachineAdapter implements
StateMachine {
@Override
public void onLeaderStart(final long term) {
this.leaderTerm = term;
- LOG.info("onLeaderStart: term={}.", term);
+ LOG.info("onLeaderStart: term={}, label={}.", term, label);
}
@Override
public void onLeaderStop(final Status status) {
- LOG.info("onLeaderStop: status={}.", status);
+ LOG.info("onLeaderStop: status={}, label={}.", status, label);
}
@Override
public void onError(final RaftException e) {
LOG.error(
- "Encountered an error={} on StateMachine {}, it's highly
recommended to implement this method as raft stops working since some error
occurs, you should figure out the cause and repair or remove this node.",
- e, e.getStatus(), getClassName());
+ "Encountered an error={} on StateMachine {}, label={}",
+ e, e.getStatus(), label);
}
@Override
public void onConfigurationCommitted(final Configuration conf) {
- LOG.info("onConfigurationCommitted: {}.", conf);
+ LOG.info("onConfigurationCommitted: {}, label={}.", conf, label);
}
@Override
public void onStopFollowing(final LeaderChangeContext ctx) {
- LOG.info("onStopFollowing: {}.", ctx);
+ LOG.info("onStopFollowing: {}, label={}.", ctx, label);
}
@Override
public void onStartFollowing(final LeaderChangeContext ctx) {
- LOG.info("onStartFollowing: {}.", ctx);
+ LOG.info("onStartFollowing: {}, label={}.", ctx, label);
}
@SuppressWarnings("SameParameterValue")
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
index e8afdbf0a52..565df7f603d 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
@@ -84,6 +84,7 @@ public class StateMachineFailureHandlerTest extends
BaseIgniteAbstractTest {
AtomicBoolean reached = new AtomicBoolean();
DelegatingStateMachine sm = new JraftServerImpl.DelegatingStateMachine(
+ "test",
TEST_LISTENER,
mock(Marshaller.class),
testFailureManager(reached)
@@ -99,6 +100,7 @@ public class StateMachineFailureHandlerTest extends
BaseIgniteAbstractTest {
AtomicBoolean reached = new AtomicBoolean();
DelegatingStateMachine sm = new JraftServerImpl.DelegatingStateMachine(
+ "test",
TEST_LISTENER,
mock(Marshaller.class),
testFailureManager(reached)
@@ -118,6 +120,7 @@ public class StateMachineFailureHandlerTest extends
BaseIgniteAbstractTest {
AtomicBoolean reached = new AtomicBoolean();
DelegatingStateMachine sm = new JraftServerImpl.DelegatingStateMachine(
+ "test",
TEST_LISTENER,
mock(Marshaller.class),
testFailureManager(reached)