This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch release-3.1.3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 48f6e529d71b76bba699a5f36b722e3255616f26 Author: William Song <[email protected]> AuthorDate: Fri Jan 3 03:23:19 2025 +0800 RATIS-2234. Remove lock race between heartbeat and append log channels (#1205) --- .../src/main/java/org/apache/ratis/util/AutoCloseableLock.java | 8 ++++++++ .../main/java/org/apache/ratis/server/raftlog/RaftLogBase.java | 9 ++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java index 8a5409baf..9581e925a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.util; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -45,6 +46,13 @@ public final class AutoCloseableLock implements AutoCloseable { return new AutoCloseableLock(lock, preUnlock); } + public static AutoCloseableLock tryAcquire(final Lock lock, Runnable preUnlock, TimeDuration timeout) + throws InterruptedException { + Objects.requireNonNull(timeout, "timeout == null"); + final boolean locked = lock.tryLock(timeout.getDuration(), timeout.getUnit()); + return locked? new AutoCloseableLock(lock, preUnlock): null; + } + private final Lock underlying; private final AtomicBoolean closed = new AtomicBoolean(false); private final Runnable preUnlock; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java index 9b0367213..b37d40c28 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java @@ -119,7 +119,7 @@ public abstract class RaftLogBase implements RaftLog { @Override public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) { - try(AutoCloseableLock writeLock = writeLock()) { + try(AutoCloseableLock writeLock = tryWriteLock(TimeDuration.ONE_SECOND)) { final long oldCommittedIndex = getLastCommittedIndex(); final long newCommitIndex = Math.min(majorityIndex, getFlushIndex()); if (oldCommittedIndex < newCommitIndex) { @@ -133,6 +133,9 @@ public abstract class RaftLogBase implements RaftLog { return commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange); } } + } catch (InterruptedException e) { + LOG.warn("{}: Interrupted to updateCommitIndex: majorityIndex={}, currentTerm={}, isLeader={}", + getName(), majorityIndex, currentTerm, isLeader, e); } return false; } @@ -375,6 +378,10 @@ public abstract class RaftLogBase implements RaftLog { return AutoCloseableLock.acquire(lock.writeLock()); } + public AutoCloseableLock tryWriteLock(TimeDuration timeout) throws InterruptedException { + return AutoCloseableLock.tryAcquire(lock.writeLock(), null, timeout); + } + public boolean hasWriteLock() { return this.lock.isWriteLockedByCurrentThread(); }
