This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a78c210be40 IGNITE-26777 Port several commits from Jraft regarding
ThreadId (#6811)
a78c210be40 is described below
commit a78c210be40fdd43512b4f1a9102bbc5015062e5
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Oct 21 17:43:26 2025 +0400
IGNITE-26777 Port several commits from Jraft regarding ThreadId (#6811)
---
.../apache/ignite/raft/jraft/core/Replicator.java | 6 +-
.../apache/ignite/raft/jraft/util/ThreadId.java | 87 +++++++---------------
.../ignite/raft/jraft/core/ReplicatorTest.java | 2 +-
.../ignite/raft/jraft/util/ThreadIdTest.java | 16 ++--
4 files changed, 35 insertions(+), 76 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index d35f3b542e5..9066cbd8baf 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -198,6 +198,10 @@ public class Replicator implements ThreadId.OnError {
gauges.put("install-snapshot-times", (Gauge<Long>) () ->
this.r.installSnapshotCounter);
gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
gauges.put("append-entries-times", (Gauge<Long>) () ->
this.r.appendEntriesCounter);
+ gauges.put("consecutive-error-times", (Gauge<Long>) () -> (long)
this.r.consecutiveErrorTimes);
+ gauges.put("state", (Gauge<Long>) () -> (long)
this.r.state.ordinal());
+ gauges.put("running-state", (Gauge<Long>) () -> (long)
this.r.statInfo.runningState.ordinal());
+ gauges.put("locked", (Gauge<Long>) () -> (null == this.r.id ? -1L
: this.r.id.isLocked() ? 1L : 0L));
return gauges;
}
}
@@ -1068,11 +1072,9 @@ public class Replicator implements ThreadId.OnError {
}
}
else if (errorCode == RaftError.ETIMEDOUT.getNumber()) {
- id.unlock();
Utils.runInThread(options.getCommonExecutor(), () ->
sendHeartbeat(id));
}
else {
- id.unlock();
//noinspection ConstantConditions
Requires.requireTrue(false, "Unknown error code for replicator: "
+ errorCode);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
index d2790ff8e86..468f6814a0c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ThreadId.java
@@ -16,9 +16,7 @@
*/
package org.apache.ignite.raft.jraft.util;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -29,11 +27,8 @@ public class ThreadId {
private static final IgniteLogger LOG = Loggers.forClass(ThreadId.class);
- private static final int TRY_LOCK_TIMEOUT_MS = 10;
-
private final Object data;
- private final NonReentrantLock lock = new NonReentrantLock();
- private final List<Integer> pendingErrors = new ArrayList<>();
+ private final ReentrantLock lock = new ReentrantLock();
private final OnError onError;
private volatile boolean destroyed;
@@ -43,7 +38,7 @@ public class ThreadId {
public interface OnError {
/**
- * Error callback, it will be called in lock, but should take care of
unlocking it.
+ * Error callback, it will be called in lock.
*
* @param id the thread id
* @param data the data
@@ -72,17 +67,7 @@ public class ThreadId {
if (this.destroyed) {
return null;
}
- try {
- while (!this.lock.tryLock(TRY_LOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
- if (this.destroyed) {
- return null;
- }
- }
- }
- catch (final InterruptedException e) {
- Thread.currentThread().interrupt(); // reset
- return null;
- }
+ this.lock.lock();
// Got the lock, double checking state.
if (this.destroyed) {
// should release lock
@@ -94,31 +79,10 @@ public class ThreadId {
public void unlock() {
if (!this.lock.isHeldByCurrentThread()) {
- LOG.warn("Fail to unlock with {}, the lock is held by {} and
current thread is {}.", this.data,
- this.lock.getOwner(), Thread.currentThread());
+ LOG.warn("Fail to unlock with {}, the lock is not held by current
thread {}.", this.data, Thread.currentThread());
return;
}
- // calls all pending errors before unlock
- boolean doUnlock = true;
- try {
- final List<Integer> errors;
- synchronized (this.pendingErrors) {
- errors = new ArrayList<>(this.pendingErrors);
- this.pendingErrors.clear();
- }
- for (final Integer code : errors) {
- // The lock will be unlocked in onError.
- doUnlock = false;
- if (this.onError != null) {
- this.onError.onError(this, this.data, code);
- }
- }
- }
- finally {
- if (doUnlock) {
- this.lock.unlock();
- }
- }
+ this.lock.unlock();
}
public void join() {
@@ -137,38 +101,37 @@ public class ThreadId {
return;
}
this.destroyed = true;
- if (!this.lock.isHeldByCurrentThread()) {
- LOG.warn("Fail to unlockAndDestroy with {}, the lock is held by {}
and current thread is {}.", this.data,
- this.lock.getOwner(), Thread.currentThread());
- return;
- }
- this.lock.unlock();
+ unlock();
}
/**
- * Set error code, if it tryLock success, run the onError callback with
code immediately, else add it into pending
- * errors and will be called before unlock.
+ * Set error code, run the onError callback with code immediately in lock.
*
* @param errorCode error code
*/
public void setError(final int errorCode) {
if (this.destroyed) {
+ LOG.warn("ThreadId: {} already destroyed, ignore error code: {}",
this.data, errorCode);
return;
}
- synchronized (pendingErrors) {
- if (this.lock.tryLock()) {
- if (this.destroyed) {
- this.lock.unlock();
- return;
- }
- if (this.onError != null) {
- // The lock will be unlocked in onError.
- this.onError.onError(this, this.data, errorCode);
- }
+ this.lock.lock();
+ try {
+ if (this.destroyed) {
+ LOG.warn("ThreadId: {} already destroyed, ignore error code:
{}", this.data, errorCode);
+ return;
+ }
+ if (this.onError != null) {
+ this.onError.onError(this, this.data, errorCode);
}
- else {
- this.pendingErrors.add(errorCode);
+ } finally {
+ // It may have been released during onError to avoid throwing an
exception.
+ if (this.lock.isHeldByCurrentThread()) {
+ this.lock.unlock();
}
}
}
+
+ public boolean isLocked() {
+ return this.lock.isLocked();
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index bb91417391e..a07acb055ec 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -185,7 +185,7 @@ public class ReplicatorTest extends BaseIgniteAbstractTest {
assertNotNull(r);
assertSame(r.getOpts(), this.opts);
Set<String> metrics =
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
- assertEquals(7, metrics.size());
+ assertEquals(11, metrics.size());
r.destroy();
metrics =
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(0, metrics.size());
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
index af312a4a455..978d4adc891 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
@@ -16,6 +16,11 @@
*/
package org.apache.ignite.raft.jraft.util;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -25,11 +30,6 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
public class ThreadIdTest extends BaseIgniteAbstractTest implements
ThreadId.OnError {
private ThreadId id;
private volatile int errorCode = -1;
@@ -38,7 +38,6 @@ public class ThreadIdTest extends BaseIgniteAbstractTest
implements ThreadId.OnE
public void onError(final ThreadId id, final Object data, final int
errorCode) {
assertSame(id, this.id);
this.errorCode = errorCode;
- id.unlock();
}
@BeforeEach
@@ -75,7 +74,6 @@ public class ThreadIdTest extends BaseIgniteAbstractTest
implements ThreadId.OnE
public void testSetError() throws Exception {
this.id.setError(100);
assertEquals(100, this.errorCode);
- this.id.lock();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(() -> {
ThreadIdTest.this.id.setError(99);
@@ -84,10 +82,6 @@ public class ThreadIdTest extends BaseIgniteAbstractTest
implements ThreadId.OnE
try {
t.start();
latch.await();
- //just go into pending errors.
- assertEquals(100, this.errorCode);
- //invoke onError when unlock
- this.id.unlock();
assertEquals(99, this.errorCode);
} finally {
t.join();