This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 539e80405 RATIS-2234. Remove lock race between heartbeat and append
log channels (#1205)
539e80405 is described below
commit 539e80405ec6d9d767e09e3c84da4778cf704a33
Author: William Song <[email protected]>
AuthorDate: Fri Jan 3 03:23:19 2025 +0800
RATIS-2234. Remove lock race between heartbeat and append log channels
(#1205)
---
.../src/main/java/org/apache/ratis/util/AutoCloseableLock.java | 8 ++++++++
.../main/java/org/apache/ratis/server/raftlog/RaftLogBase.java | 9 ++++++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
index 8a5409baf..9581e925a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
@@ -45,6 +46,13 @@ public final class AutoCloseableLock implements
AutoCloseable {
return new AutoCloseableLock(lock, preUnlock);
}
+ public static AutoCloseableLock tryAcquire(final Lock lock, Runnable
preUnlock, TimeDuration timeout)
+ throws InterruptedException {
+ Objects.requireNonNull(timeout, "timeout == null");
+ final boolean locked = lock.tryLock(timeout.getDuration(),
timeout.getUnit());
+ return locked? new AutoCloseableLock(lock, preUnlock): null;
+ }
+
private final Lock underlying;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Runnable preUnlock;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 9353612c8..d6314bc13 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -122,7 +122,7 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public boolean updateCommitIndex(long majorityIndex, long currentTerm,
boolean isLeader) {
- try(AutoCloseableLock writeLock = writeLock()) {
+ try(AutoCloseableLock writeLock = tryWriteLock(TimeDuration.ONE_SECOND)) {
final long oldCommittedIndex = getLastCommittedIndex();
final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
if (oldCommittedIndex < newCommitIndex) {
@@ -136,6 +136,9 @@ public abstract class RaftLogBase implements RaftLog {
return commitIndex.updateIncreasingly(newCommitIndex,
traceIndexChange);
}
}
+ } catch (InterruptedException e) {
+ LOG.warn("{}: Interrupted to updateCommitIndex: majorityIndex={},
currentTerm={}, isLeader={}",
+ getName(), majorityIndex, currentTerm, isLeader, e);
}
return false;
}
@@ -389,6 +392,10 @@ public abstract class RaftLogBase implements RaftLog {
return AutoCloseableLock.acquire(lock.writeLock());
}
+ public AutoCloseableLock tryWriteLock(TimeDuration timeout) throws
InterruptedException {
+ return AutoCloseableLock.tryAcquire(lock.writeLock(), null, timeout);
+ }
+
public boolean hasWriteLock() {
return this.lock.isWriteLockedByCurrentThread();
}