This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 0cbf690183b75e0947ded926a1d20857d1f3549a
Author: jianghuazhu <740087...@qq.com>
AuthorDate: Thu May 15 02:30:01 2025 +0800

    RATIS-2290. Simply the EventQueue in leader (#1258)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 37 ++++++++++------------
 1 file changed, 16 insertions(+), 21 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index cd1d9ebea..8358f063d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -111,12 +111,10 @@ class LeaderStateImpl implements LeaderState {
     }
 
     private final Type type;
-    private final long newTerm;
     private final Runnable handler;
 
-    StateUpdateEvent(Type type, long newTerm, Runnable handler) {
+    StateUpdateEvent(Type type, Runnable handler) {
       this.type = type;
-      this.newTerm = newTerm;
       this.handler = handler;
     }
 
@@ -132,25 +130,30 @@ class LeaderStateImpl implements LeaderState {
         return false;
       }
       final StateUpdateEvent that = (StateUpdateEvent)obj;
-      return this.type == that.type && this.newTerm == that.newTerm;
+      return this.type == that.type;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(type, newTerm);
+      return type.hashCode();
     }
 
     @Override
     public String toString() {
-      return type + (newTerm >= 0? ":" + newTerm: "");
+      return type.name();
     }
   }
 
   private class EventQueue {
     private final String name = 
ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
-    private final BlockingQueue<StateUpdateEvent> queue = new 
ArrayBlockingQueue<>(4096);
+    private final BlockingQueue<StateUpdateEvent> queue = new 
ArrayBlockingQueue<>(
+        StateUpdateEvent.Type.values().length);;
 
-    void submit(StateUpdateEvent event) {
+    // submit can be invoked by different threads -- need to be synchronized
+    synchronized void submit(StateUpdateEvent event) {
+      if (queue.contains(event)) { // avoid duplicated events
+        return;
+      }
       try {
         queue.put(event);
       } catch (InterruptedException e) {
@@ -159,10 +162,10 @@ class LeaderStateImpl implements LeaderState {
       }
     }
 
+    // poll is invoked only by the EventProcessor thread -- synchronized is 
not needed
     StateUpdateEvent poll() {
-      final StateUpdateEvent e;
       try {
-        e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
+        return queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
         String s = this + ": poll() is interrupted";
@@ -173,14 +176,6 @@ class LeaderStateImpl implements LeaderState {
           throw new IllegalStateException(s + " UNEXPECTEDLY", ie);
         }
       }
-
-      if (e != null) {
-        // remove duplicated events from the head.
-        while(e.equals(queue.peek())) {
-          queue.poll();
-        }
-      }
-      return e;
     }
 
     @Override
@@ -322,9 +317,9 @@ class LeaderStateImpl implements LeaderState {
   }
 
   private final StateUpdateEvent updateCommitEvent =
-      new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, 
this::updateCommit);
+      new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, 
this::updateCommit);
   private final StateUpdateEvent checkStagingEvent =
-      new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, 
this::checkStaging);
+      new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, 
this::checkStaging);
 
   private final String name;
   private final RaftServerImpl server;
@@ -702,7 +697,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void submitStepDownEvent(long term, StepDownReason reason) {
-    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, 
term, () -> stepDown(term, reason)));
+    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, () 
-> stepDown(term, reason)));
   }
 
   private void stepDown(long term, StepDownReason reason) {

Reply via email to