This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 505d76485d8 IGNITE-27288 Enforce monotonic safe ts assignment
505d76485d8 is described below

commit 505d76485d84f7e6ade05d5637b40c2786dab009
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Dec 19 10:20:02 2025 +0300

    IGNITE-27288 Enforce monotonic safe ts assignment
---
 .../internal/util/SafeTimeValuesTracker.java       | 33 ++++++++++++---
 .../replicator/raft/ZonePartitionRaftListener.java | 37 +++++++++--------
 .../internal/raft/service/CommandClosure.java      |  2 +-
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |  2 +-
 .../ignite/raft/jraft/core/MockStateMachine.java   |  2 +-
 .../internal/raft/server/impl/JraftServerImpl.java | 48 +++++++++-------------
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 48 +++++++++++++---------
 .../raft/jraft/core/StateMachineAdapter.java       | 25 +++++++----
 .../raft/StateMachineFailureHandlerTest.java       |  3 ++
 9 files changed, 118 insertions(+), 82 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
index 828dbb35d55..70fd674feb4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SafeTimeValuesTracker.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.util;
 
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
 import java.util.Map;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,8 +33,20 @@ public class SafeTimeValuesTracker extends 
PendingComparableValuesTracker<Hybrid
         super(initialValue);
     }
 
-    @Override
-    public void update(HybridTimestamp newValue, @Nullable Void futureResult) {
+    // Holds successful application context.
+    private long commandIndex;
+    private long commandTerm;
+    private String commandClassName;
+
+    /**
+     * Update safe timestamp.
+     *
+     * @param safeTs The value.
+     * @param commandIndex Command index.
+     * @param commandTerm Command term.
+     * @param command The command.
+     */
+    public void update(HybridTimestamp safeTs, long commandIndex, long 
commandTerm, Object command) {
         if (!enterBusy()) {
             throw new TrackerClosedException();
         }
@@ -39,15 +54,23 @@ public class SafeTimeValuesTracker extends 
PendingComparableValuesTracker<Hybrid
         try {
             Map.Entry<HybridTimestamp, @Nullable Void> current = this.current;
 
-            IgniteBiTuple<HybridTimestamp, @Nullable Void> newEntry = new 
IgniteBiTuple<>(newValue, futureResult);
+            IgniteBiTuple<HybridTimestamp, @Nullable Void> newEntry = new 
IgniteBiTuple<>(safeTs, null);
 
             // Entries from the same batch receive equal safe timestamps.
             if (comparator.compare(newEntry, current) < 0) {
-                throw new AssertionError("Reordering detected: [old=" + 
current.getKey() + ", new=" + newEntry.get1() + ']');
+                throw new IgniteInternalException(INTERNAL_ERR,
+                        "Reordering detected: [old=" + current.getKey() + ", 
new=" + newEntry.get1()
+                                + ", oldIndex=" + this.commandIndex
+                                + ", oldTerm=" + this.commandTerm
+                                + ", oldCommandClassName=" + 
this.commandClassName
+                                + ']');
             }
 
             CURRENT.set(this, newEntry);
-            completeWaitersOnUpdate(newValue, futureResult);
+
+            this.commandIndex = commandIndex;
+            this.commandTerm = commandTerm;
+            this.commandClassName = command.getClass().getName();
         } finally {
             leaveBusy();
         }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 27a3609d4d0..cd6b45b38a5 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -161,14 +161,14 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
             try {
                 processWriteCommand(clo);
             } catch (Throwable t) {
+                clo.result(t);
+
                 LOG.error(
-                        "Unknown error while processing command 
[commandIndex={}, commandTerm={}, command={}]",
+                        "Failed to process write command [commandIndex={}, 
commandTerm={}, command={}]",
                         t,
-                        clo.index(), clo.index(), clo.command()
+                        clo.index(), clo.term(), clo.command()
                 );
 
-                clo.result(t);
-
                 throw t;
             }
         });
@@ -232,10 +232,18 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                 if (result.wasApplied()) {
                     // Adjust safe time before completing update to reduce 
waiting.
                     if (safeTimestamp != null) {
-                        
updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTimestamp);
+                        try {
+                            safeTimeTracker.update(safeTimestamp, 
commandIndex, commandTerm, command);
+                        } catch (TrackerClosedException ignored) {
+                            // Ignored.
+                        }
                     }
 
-                    
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex);
+                    try {
+                        storageIndexTracker.update(commandIndex, null);
+                    } catch (TrackerClosedException ignored) {
+                        // Ignored.
+                    }
                 }
 
                 lastAppliedIndex = max(lastAppliedIndex, commandIndex);
@@ -347,7 +355,11 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
             this.lastAppliedIndex = max(this.lastAppliedIndex, 
lastAppliedIndex);
             this.lastAppliedTerm = max(this.lastAppliedTerm, lastAppliedTerm);
 
-            updateTrackerIgnoringTrackerClosedException(storageIndexTracker, 
lastAppliedIndex);
+            try {
+                storageIndexTracker.update(lastAppliedIndex, null);
+            } catch (TrackerClosedException ignored) {
+                // Ignored.
+            }
         }
     }
 
@@ -478,17 +490,6 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
         }
     }
 
-    private static <T extends Comparable<T>> void 
updateTrackerIgnoringTrackerClosedException(
-            PendingComparableValuesTracker<T, Void> tracker,
-            T newValue
-    ) {
-        try {
-            tracker.update(newValue, null);
-        } catch (TrackerClosedException ignored) {
-            // No-op.
-        }
-    }
-
     private void cleanupSnapshots() {
         partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey);
     }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
index 7074b4b5328..51dc910a7b7 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java
@@ -59,7 +59,7 @@ public interface CommandClosure<R extends Command> {
     R command();
 
     /**
-     * Must be called after a command has been processed normally.
+     * Must be called after a command has been processed (with result or 
exception).
      *
      * @param res Execution result.
      */
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 206e651427e..6bdecc2d05d 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -459,7 +459,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         CountDownLatch readIndexLatch = new CountDownLatch(1);
         AtomicInteger currentValue = new AtomicInteger(-1);
         String errorMsg = testInfo.getDisplayName();
-        StateMachine fsm = new StateMachineAdapter() {
+        StateMachine fsm = new StateMachineAdapter("test") {
 
             @Override
             public void onApply(Iterator iter) {
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
index 1de8279352e..cbf324067d3 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
@@ -62,7 +62,7 @@ public class MockStateMachine extends StateMachineAdapter {
     }
 
     public MockStateMachine(final PeerId peerId) {
-        super();
+        super("test");
         this.peerId = peerId;
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 9a98ab4c0d2..1efb37bdbcf 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -509,7 +509,8 @@ public class JraftServerImpl implements RaftServer {
                 
nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller());
             }
 
-            nodeOptions.setFsm(new DelegatingStateMachine(lsnr, 
nodeOptions.getCommandsMarshaller(), failureManager));
+            nodeOptions.setFsm(
+                    new DelegatingStateMachine(nodeId.groupId().toString(), 
lsnr, nodeOptions.getCommandsMarshaller(), failureManager));
 
             nodeOptions.setRaftGrpEvtsLsnr(new 
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor, 
evLsnr));
 
@@ -839,11 +840,13 @@ public class JraftServerImpl implements RaftServer {
         /**
          * Constructor.
          *
+         * @param label State machine label.
          * @param listener The listener.
          * @param marshaller Marshaller.
          * @param failureManager Failure processor that is used to handle 
critical errors.
          */
-        public DelegatingStateMachine(RaftGroupListener listener, Marshaller 
marshaller, FailureManager failureManager) {
+        public DelegatingStateMachine(String label, RaftGroupListener 
listener, Marshaller marshaller, FailureManager failureManager) {
+            super(label);
             this.listener = listener;
             this.marshaller = marshaller;
             this.failureManager = failureManager;
@@ -855,11 +858,13 @@ public class JraftServerImpl implements RaftServer {
 
         @Override
         public void onApply(Iterator iter) {
-            var writeCommandIterator = new WriteCommandIterator(iter, 
marshaller);
+            var iterWrapper = new WriteCommandIterator(iter, marshaller);
 
             try {
-                listener.onWrite(writeCommandIterator);
+                listener.onWrite(iterWrapper);
             } catch (Throwable err) {
+                LOG.error("Unexpected error while processing command 
[label={}]", err, label);
+
                 Status st;
 
                 if (err.getMessage() != null) {
@@ -868,13 +873,9 @@ public class JraftServerImpl implements RaftServer {
                     st = new Status(RaftError.ESTATEMACHINE, "Unknown state 
machine error.");
                 }
 
-                // This is necessary so that IndexOutOfBoundsException is not 
thrown in a situation where the listener, when processing a
-                // command, catch any exception and does clo.result(throwable) 
(that actually advances the iterator) and then throws the
-                // caught exception.
-                Closure done = writeCommandIterator.doneForExceptionHandling();
-
-                if (done != null) {
-                    done.run(st);
+                if (iterWrapper.done != null) {
+                    // Trigger internal error for state machine.
+                    iterWrapper.done.run(st);
                 }
 
                 iter.setErrorAndRollback(1, st);
@@ -979,7 +980,7 @@ public class JraftServerImpl implements RaftServer {
 
         private final Marshaller marshaller;
 
-        private @Nullable Closure latestDone;
+        private @Nullable Closure done;
 
         private WriteCommandIterator(Iterator iter, Marshaller marshaller) {
             this.iter = iter;
@@ -993,15 +994,14 @@ public class JraftServerImpl implements RaftServer {
 
         @Override
         public CommandClosure<WriteCommand> next() {
-            @Nullable Closure currentDone = iter.done();
-            latestDone = currentDone;
+            done = iter.done(); // Save for later error processing.
 
-            @Nullable CommandClosure<WriteCommand> done = 
(CommandClosure<WriteCommand>) currentDone;
+            @Nullable CommandClosure<WriteCommand> localDone = 
(CommandClosure<WriteCommand>) done;
             ByteBuffer data = iter.getData();
 
-            // done != null means we are on the leader, otherwise a command 
has been read from the log.
-            WriteCommand command = done == null ? marshaller.unmarshall(data) 
: done.command();
-            HybridTimestamp safeTs = done == null ? command.safeTime() : 
done.safeTimestamp();
+            // localDone != null means we are on the leader, otherwise a 
command has been read from the log.
+            WriteCommand command = localDone == null ? 
marshaller.unmarshall(data) : localDone.command();
+            HybridTimestamp safeTs = localDone == null ? command.safeTime() : 
localDone.safeTimestamp();
 
             long commandIndex = iter.getIndex();
             long commandTerm = iter.getTerm();
@@ -1029,21 +1029,13 @@ public class JraftServerImpl implements RaftServer {
 
                 @Override
                 public void result(Serializable res) {
-                    if (done != null) {
-                        done.result(res);
+                    if (localDone != null) {
+                        localDone.result(res);
                     }
 
                     iter.next();
                 }
             };
         }
-
-        private @Nullable Closure doneForExceptionHandling() {
-            if (latestDone == null) {
-                latestDone = iter.done();
-            }
-
-            return latestDone;
-        }
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 506aa462e51..2df29468667 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -291,8 +291,6 @@ public class NodeImpl implements Node, RaftServerService {
         // task list for batch
         private final List<LogEntryAndClosure> tasks = new 
ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
 
-        private @Nullable HybridTimestamp safeTs = null;
-
         @Override
         public void onEvent(final LogEntryAndClosure event, final long 
sequence, final boolean endOfBatch) {
             if (event.shutdownLatch != null) {
@@ -304,23 +302,6 @@ public class NodeImpl implements Node, RaftServerService {
                 return;
             }
 
-            // Patch the command.
-            if (event.done instanceof SafeTimeAwareCommandClosure) {
-                SafeTimeAwareCommandClosure clo = 
(SafeTimeAwareCommandClosure) event.done;
-                WriteCommand command = clo.command();
-                HybridTimestamp timestamp = command.initiatorTime();
-
-                if (timestamp != null) {
-                    if (safeTs == null) {
-                        safeTs = clock.update(timestamp);
-                    } else if (timestamp.compareTo(safeTs) > 0) {
-                        safeTs = clock.update(timestamp);
-                    }
-
-                    clo.safeTimestamp(safeTs);
-                }
-            }
-
             this.tasks.add(event);
             if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() 
|| endOfBatch) {
                 executeApplyingTasks(this.tasks);
@@ -333,7 +314,6 @@ public class NodeImpl implements Node, RaftServerService {
                 task.reset();
             }
             this.tasks.clear();
-            this.safeTs = null;
         }
     }
 
@@ -1715,9 +1695,13 @@ public class NodeImpl implements Node, RaftServerService 
{
                 });
                 return;
             }
+
+            @Nullable HybridTimestamp safeTs = null;
+
             final List<LogEntry> entries = new ArrayList<>(size);
             for (int i = 0; i < size; i++) {
                 final LogEntryAndClosure task = tasks.get(i);
+
                 if (task.expectedTerm != -1 && task.expectedTerm != 
this.currTerm) {
                     LOG.debug("Node {} can't apply task whose expectedTerm={} 
doesn't match currTerm={}.", getNodeId(),
                         task.expectedTerm, this.currTerm);
@@ -1735,6 +1719,10 @@ public class NodeImpl implements Node, RaftServerService 
{
                     task.reset();
                     continue;
                 }
+
+                // To prevent safe timestamp values from becoming stale, we 
must assign them under a valid leader lock.
+                safeTs = tryAssignSafeTimestamp(task, safeTs);
+
                 // set task entry info before adding to list.
                 task.entry.getId().setTerm(this.currTerm);
                 task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
@@ -1750,6 +1738,26 @@ public class NodeImpl implements Node, RaftServerService 
{
         }
     }
 
+    private @Nullable HybridTimestamp 
tryAssignSafeTimestamp(LogEntryAndClosure task, @Nullable HybridTimestamp 
safeTs) {
+        if (task.done instanceof SafeTimeAwareCommandClosure) {
+            SafeTimeAwareCommandClosure clo = (SafeTimeAwareCommandClosure) 
task.done;
+            WriteCommand command = clo.command();
+            HybridTimestamp timestamp = command.initiatorTime();
+
+            if (timestamp != null) {
+                if (safeTs == null) {
+                    safeTs = clock.update(timestamp);
+                } else if (timestamp.compareTo(safeTs) > 0) {
+                    safeTs = clock.update(timestamp);
+                }
+
+                clo.safeTimestamp(safeTs);
+            }
+        }
+
+        return safeTs;
+    }
+
     /**
      * Builds a status for 'Cannot apply because this node is not a leader' 
situation.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
index 9e67e9905c2..ab6a0d26391 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
@@ -37,9 +37,18 @@ public abstract class StateMachineAdapter implements 
StateMachine {
     /** The logger */
     private static final IgniteLogger LOG = 
Loggers.forClass(StateMachineAdapter.class);
 
+    protected final String label;
+
+    /**
+     * @param label The state machine label.
+     */
+    public StateMachineAdapter(String label) {
+        this.label = label;
+    }
+
     @Override
     public void onShutdown() {
-        LOG.info("onShutdown.");
+        LOG.info("onShutdown: label={}.", label);
     }
 
     @Override
@@ -61,34 +70,34 @@ public abstract class StateMachineAdapter implements 
StateMachine {
     @Override
     public void onLeaderStart(final long term) {
         this.leaderTerm = term;
-        LOG.info("onLeaderStart: term={}.", term);
+        LOG.info("onLeaderStart: term={}, label={}.", term, label);
     }
 
     @Override
     public void onLeaderStop(final Status status) {
-        LOG.info("onLeaderStop: status={}.", status);
+        LOG.info("onLeaderStop: status={}, label={}.", status, label);
     }
 
     @Override
     public void onError(final RaftException e) {
         LOG.error(
-            "Encountered an error={} on StateMachine {}, it's highly 
recommended to implement this method as raft stops working since some error 
occurs, you should figure out the cause and repair or remove this node.",
-            e, e.getStatus(), getClassName());
+            "Encountered an error={} on StateMachine {}, label={}",
+            e, e.getStatus(), label);
     }
 
     @Override
     public void onConfigurationCommitted(final Configuration conf) {
-        LOG.info("onConfigurationCommitted: {}.", conf);
+        LOG.info("onConfigurationCommitted: {}, label={}.", conf, label);
     }
 
     @Override
     public void onStopFollowing(final LeaderChangeContext ctx) {
-        LOG.info("onStopFollowing: {}.", ctx);
+        LOG.info("onStopFollowing: {}, label={}.", ctx, label);
     }
 
     @Override
     public void onStartFollowing(final LeaderChangeContext ctx) {
-        LOG.info("onStartFollowing: {}.", ctx);
+        LOG.info("onStartFollowing: {}, label={}.", ctx, label);
     }
 
     @SuppressWarnings("SameParameterValue")
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
index e8afdbf0a52..565df7f603d 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/StateMachineFailureHandlerTest.java
@@ -84,6 +84,7 @@ public class StateMachineFailureHandlerTest extends 
BaseIgniteAbstractTest {
         AtomicBoolean reached = new AtomicBoolean();
 
         DelegatingStateMachine sm = new JraftServerImpl.DelegatingStateMachine(
+                "test",
                 TEST_LISTENER,
                 mock(Marshaller.class),
                 testFailureManager(reached)
@@ -99,6 +100,7 @@ public class StateMachineFailureHandlerTest extends 
BaseIgniteAbstractTest {
         AtomicBoolean reached = new AtomicBoolean();
 
         DelegatingStateMachine sm = new JraftServerImpl.DelegatingStateMachine(
+                "test",
                 TEST_LISTENER,
                 mock(Marshaller.class),
                 testFailureManager(reached)
@@ -118,6 +120,7 @@ public class StateMachineFailureHandlerTest extends 
BaseIgniteAbstractTest {
         AtomicBoolean reached = new AtomicBoolean();
 
         DelegatingStateMachine sm = new JraftServerImpl.DelegatingStateMachine(
+                "test",
                 TEST_LISTENER,
                 mock(Marshaller.class),
                 testFailureManager(reached)

Reply via email to