This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a78c210be40 IGNITE-26777 Port several commits from Jraft regarding 
ThreadId (#6811)
a78c210be40 is described below

commit a78c210be40fdd43512b4f1a9102bbc5015062e5
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Oct 21 17:43:26 2025 +0400

    IGNITE-26777 Port several commits from Jraft regarding ThreadId (#6811)
---
 .../apache/ignite/raft/jraft/core/Replicator.java  |  6 +-
 .../apache/ignite/raft/jraft/util/ThreadId.java    | 87 +++++++---------------
 .../ignite/raft/jraft/core/ReplicatorTest.java     |  2 +-
 .../ignite/raft/jraft/util/ThreadIdTest.java       | 16 ++--
 4 files changed, 35 insertions(+), 76 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index d35f3b542e5..9066cbd8baf 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -198,6 +198,10 @@ public class Replicator implements ThreadId.OnError {
             gauges.put("install-snapshot-times", (Gauge<Long>) () -> 
this.r.installSnapshotCounter);
             gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
             gauges.put("append-entries-times", (Gauge<Long>) () -> 
this.r.appendEntriesCounter);
+            gauges.put("consecutive-error-times", (Gauge<Long>) () -> (long) 
this.r.consecutiveErrorTimes);
+            gauges.put("state", (Gauge<Long>) () -> (long) 
this.r.state.ordinal());
+            gauges.put("running-state", (Gauge<Long>) () -> (long) 
this.r.statInfo.runningState.ordinal());
+            gauges.put("locked", (Gauge<Long>) () ->  (null == this.r.id ? -1L 
: this.r.id.isLocked() ? 1L : 0L));
             return gauges;
         }
     }
@@ -1068,11 +1072,9 @@ public class Replicator implements ThreadId.OnError {
             }
         }
         else if (errorCode == RaftError.ETIMEDOUT.getNumber()) {
-            id.unlock();
             Utils.runInThread(options.getCommonExecutor(), () -> 
sendHeartbeat(id));
         }
         else {
-            id.unlock();
             //noinspection ConstantConditions
             Requires.requireTrue(false, "Unknown error code for replicator: " 
+ errorCode);
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
index d2790ff8e86..468f6814a0c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
@@ -16,9 +16,7 @@
  */
 package org.apache.ignite.raft.jraft.util;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 
@@ -29,11 +27,8 @@ public class ThreadId {
 
     private static final IgniteLogger LOG = Loggers.forClass(ThreadId.class);
 
-    private static final int TRY_LOCK_TIMEOUT_MS = 10;
-
     private final Object data;
-    private final NonReentrantLock lock = new NonReentrantLock();
-    private final List<Integer> pendingErrors = new ArrayList<>();
+    private final ReentrantLock lock = new ReentrantLock();
     private final OnError onError;
     private volatile boolean destroyed;
 
@@ -43,7 +38,7 @@ public class ThreadId {
     public interface OnError {
 
         /**
-         * Error callback, it will be called in lock, but should take care of 
unlocking it.
+         * Error callback, it will be called in lock.
          *
          * @param id the thread id
          * @param data the data
@@ -72,17 +67,7 @@ public class ThreadId {
         if (this.destroyed) {
             return null;
         }
-        try {
-            while (!this.lock.tryLock(TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
-                if (this.destroyed) {
-                    return null;
-                }
-            }
-        }
-        catch (final InterruptedException e) {
-            Thread.currentThread().interrupt(); // reset
-            return null;
-        }
+        this.lock.lock();
         // Got the lock, double checking state.
         if (this.destroyed) {
             // should release lock
@@ -94,31 +79,10 @@ public class ThreadId {
 
     public void unlock() {
         if (!this.lock.isHeldByCurrentThread()) {
-            LOG.warn("Fail to unlock with {}, the lock is held by {} and 
current thread is {}.", this.data,
-                this.lock.getOwner(), Thread.currentThread());
+            LOG.warn("Fail to unlock with {}, the lock is not held by current 
thread {}.", this.data, Thread.currentThread());
             return;
         }
-        // calls all pending errors before unlock
-        boolean doUnlock = true;
-        try {
-            final List<Integer> errors;
-            synchronized (this.pendingErrors) {
-                errors = new ArrayList<>(this.pendingErrors);
-                this.pendingErrors.clear();
-            }
-            for (final Integer code : errors) {
-                // The lock will be unlocked in onError.
-                doUnlock = false;
-                if (this.onError != null) {
-                    this.onError.onError(this, this.data, code);
-                }
-            }
-        }
-        finally {
-            if (doUnlock) {
-                this.lock.unlock();
-            }
-        }
+        this.lock.unlock();
     }
 
     public void join() {
@@ -137,38 +101,37 @@ public class ThreadId {
             return;
         }
         this.destroyed = true;
-        if (!this.lock.isHeldByCurrentThread()) {
-            LOG.warn("Fail to unlockAndDestroy with {}, the lock is held by {} 
and current thread is {}.", this.data,
-                this.lock.getOwner(), Thread.currentThread());
-            return;
-        }
-        this.lock.unlock();
+        unlock();
     }
 
     /**
-     * Set error code, if it tryLock success, run the onError callback with 
code immediately, else add it into pending
-     * errors and will be called before unlock.
+     * Set error code, run the onError callback with code immediately in lock.
      *
      * @param errorCode error code
      */
     public void setError(final int errorCode) {
         if (this.destroyed) {
+            LOG.warn("ThreadId: {} already destroyed, ignore error code: {}", 
this.data, errorCode);
             return;
         }
-        synchronized (pendingErrors) {
-            if (this.lock.tryLock()) {
-                if (this.destroyed) {
-                    this.lock.unlock();
-                    return;
-                }
-                if (this.onError != null) {
-                    // The lock will be unlocked in onError.
-                    this.onError.onError(this, this.data, errorCode);
-                }
+        this.lock.lock();
+        try {
+            if (this.destroyed) {
+                LOG.warn("ThreadId: {} already destroyed, ignore error code: 
{}", this.data, errorCode);
+                return;
+            }
+            if (this.onError != null) {
+                this.onError.onError(this, this.data, errorCode);
             }
-            else {
-                this.pendingErrors.add(errorCode);
+        } finally {
+            // It may have been released during onError to avoid throwing an 
exception.
+            if (this.lock.isHeldByCurrentThread()) {
+                this.lock.unlock();
             }
         }
     }
+
+    public boolean isLocked() {
+        return this.lock.isLocked();
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index bb91417391e..a07acb055ec 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -185,7 +185,7 @@ public class ReplicatorTest extends BaseIgniteAbstractTest {
         assertNotNull(r);
         assertSame(r.getOpts(), this.opts);
         Set<String> metrics = 
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
-        assertEquals(7, metrics.size());
+        assertEquals(11, metrics.size());
         r.destroy();
         metrics = 
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
         assertEquals(0, metrics.size());
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
index af312a4a455..978d4adc891 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.ignite.raft.jraft.util;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -25,11 +30,6 @@ import 
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 public class ThreadIdTest extends BaseIgniteAbstractTest implements 
ThreadId.OnError {
     private ThreadId id;
     private volatile int errorCode = -1;
@@ -38,7 +38,6 @@ public class ThreadIdTest extends BaseIgniteAbstractTest 
implements ThreadId.OnE
     public void onError(final ThreadId id, final Object data, final int 
errorCode) {
         assertSame(id, this.id);
         this.errorCode = errorCode;
-        id.unlock();
     }
 
     @BeforeEach
@@ -75,7 +74,6 @@ public class ThreadIdTest extends BaseIgniteAbstractTest 
implements ThreadId.OnE
     public void testSetError() throws Exception {
         this.id.setError(100);
         assertEquals(100, this.errorCode);
-        this.id.lock();
         CountDownLatch latch = new CountDownLatch(1);
         Thread t = new Thread(() -> {
             ThreadIdTest.this.id.setError(99);
@@ -84,10 +82,6 @@ public class ThreadIdTest extends BaseIgniteAbstractTest 
implements ThreadId.OnE
         try {
             t.start();
             latch.await();
-            //just go into pending errors.
-            assertEquals(100, this.errorCode);
-            //invoke onError when unlock
-            this.id.unlock();
             assertEquals(99, this.errorCode);
         } finally {
             t.join();

Reply via email to