This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 0cbf690183b75e0947ded926a1d20857d1f3549a Author: jianghuazhu <740087...@qq.com> AuthorDate: Thu May 15 02:30:01 2025 +0800 RATIS-2290. Simply the EventQueue in leader (#1258) --- .../apache/ratis/server/impl/LeaderStateImpl.java | 37 ++++++++++------------ 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index cd1d9ebea..8358f063d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -111,12 +111,10 @@ class LeaderStateImpl implements LeaderState { } private final Type type; - private final long newTerm; private final Runnable handler; - StateUpdateEvent(Type type, long newTerm, Runnable handler) { + StateUpdateEvent(Type type, Runnable handler) { this.type = type; - this.newTerm = newTerm; this.handler = handler; } @@ -132,25 +130,30 @@ class LeaderStateImpl implements LeaderState { return false; } final StateUpdateEvent that = (StateUpdateEvent)obj; - return this.type == that.type && this.newTerm == that.newTerm; + return this.type == that.type; } @Override public int hashCode() { - return Objects.hash(type, newTerm); + return type.hashCode(); } @Override public String toString() { - return type + (newTerm >= 0? ":" + newTerm: ""); + return type.name(); } } private class EventQueue { private final String name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()); - private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096); + private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>( + StateUpdateEvent.Type.values().length);; - void submit(StateUpdateEvent event) { + // submit can be invoked by different threads -- need to be synchronized + synchronized void submit(StateUpdateEvent event) { + if (queue.contains(event)) { // avoid duplicated events + return; + } try { queue.put(event); } catch (InterruptedException e) { @@ -159,10 +162,10 @@ class LeaderStateImpl implements LeaderState { } } + // poll is invoked only by the EventProcessor thread -- synchronized is not needed StateUpdateEvent poll() { - final StateUpdateEvent e; try { - e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS); + return queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); String s = this + ": poll() is interrupted"; @@ -173,14 +176,6 @@ class LeaderStateImpl implements LeaderState { throw new IllegalStateException(s + " UNEXPECTEDLY", ie); } } - - if (e != null) { - // remove duplicated events from the head. - while(e.equals(queue.peek())) { - queue.poll(); - } - } - return e; } @Override @@ -322,9 +317,9 @@ class LeaderStateImpl implements LeaderState { } private final StateUpdateEvent updateCommitEvent = - new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit); + new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, this::updateCommit); private final StateUpdateEvent checkStagingEvent = - new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging); + new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, this::checkStaging); private final String name; private final RaftServerImpl server; @@ -702,7 +697,7 @@ class LeaderStateImpl implements LeaderState { } void submitStepDownEvent(long term, StepDownReason reason) { - eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term, reason))); + eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, () -> stepDown(term, reason))); } private void stepDown(long term, StepDownReason reason) {