szetszwo commented on code in PR #1258:
URL: https://github.com/apache/ratis/pull/1258#discussion_r2087217760


##########
ratis-docs/src/site/markdown/configurations.md:
##########
@@ -218,6 +218,14 @@ if it fails to receive any RPC responses from this peer 
within this specified ti
 | **Type**        | TimeDuration                                               
                    |
 | **Default**     | 60s                                                        
                    |
 
+### Event - Define event-related configuration for Leader
+
+
+| **Property**    | `raft.server.read.leader.event.queue.element-limit`        
                                             |

Review Comment:
   @jianghuazhu , there may be a misunderstanding -- the event queue is for the 
STEP_DOWN, UPDATE_COMMIT and CHECK_STAGING events but not for read.  It should 
only have a few events in the queue. 
   
   I suggest to have the following improvements instead:
   - Remove `newTerm` from `StateUpdateEvent` in order to make the equals 
method simple.  After that, it could only have 3 different events.
   - Change submit(..) to avoid duplicated events and remove the duplication 
checks from poll().
   - The queue length (4096) is at most 3 and the capacity can be reduced to 3.
   
   ```diff
   diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
   index dd4b9dc1a9..ac7f6cb40f 100644
   --- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
   +++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
   @@ -112,12 +112,10 @@ class LeaderStateImpl implements LeaderState {
        }
    
        private final Type type;
   -    private final long newTerm;
        private final Runnable handler;
    
   -    StateUpdateEvent(Type type, long newTerm, Runnable handler) {
   +    StateUpdateEvent(Type type, Runnable handler) {
          this.type = type;
   -      this.newTerm = newTerm;
          this.handler = handler;
        }
    
   @@ -133,25 +131,29 @@ class LeaderStateImpl implements LeaderState {
            return false;
          }
          final StateUpdateEvent that = (StateUpdateEvent)obj;
   -      return this.type == that.type && this.newTerm == that.newTerm;
   +      return this.type == that.type;
        }
    
        @Override
        public int hashCode() {
   -      return Objects.hash(type, newTerm);
   +      return type.hashCode();
        }
    
        @Override
        public String toString() {
   -      return type + (newTerm >= 0? ":" + newTerm: "");
   +      return type.name();
        }
      }
    
      private class EventQueue {
        private final String name = 
ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
   -    private final BlockingQueue<StateUpdateEvent> queue = new 
ArrayBlockingQueue<>(4096);
   +    private final BlockingQueue<StateUpdateEvent> queue = new 
ArrayBlockingQueue<>(StateUpdateEvent.Type.values().length);
    
   -    void submit(StateUpdateEvent event) {
   +    // submit can be invoked by different threads -- need to be synchronized
   +    synchronized void submit(StateUpdateEvent event) {
   +      if (queue.contains(event)) { // avoid duplicated events
   +        return;
   +      }
          try {
            queue.put(event);
          } catch (InterruptedException e) {
   @@ -160,10 +162,10 @@ class LeaderStateImpl implements LeaderState {
          }
        }
    
   +    // poll is invoked only by the EventProcessor thread -- synchronized is 
not needed
        StateUpdateEvent poll() {
   -      final StateUpdateEvent e;
          try {
   -        e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
   +        return queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            String s = this + ": poll() is interrupted";
   @@ -174,14 +176,6 @@ class LeaderStateImpl implements LeaderState {
              throw new IllegalStateException(s + " UNEXPECTEDLY", ie);
            }
          }
   -
   -      if (e != null) {
   -        // remove duplicated events from the head.
   -        while(e.equals(queue.peek())) {
   -          queue.poll();
   -        }
   -      }
   -      return e;
        }
    
        @Override
   @@ -323,9 +317,9 @@ class LeaderStateImpl implements LeaderState {
      }
    
      private final StateUpdateEvent updateCommitEvent =
   -      new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, 
this::updateCommit);
   +      new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, 
this::updateCommit);
      private final StateUpdateEvent checkStagingEvent =
   -      new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, 
this::checkStaging);
   +      new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, 
this::checkStaging);
    
      private final String name;
      private final RaftServerImpl server;
   @@ -704,7 +698,7 @@ class LeaderStateImpl implements LeaderState {
      }
    
      void submitStepDownEvent(long term, StepDownReason reason) {
   -    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, 
term, () -> stepDown(term, reason)));
   +    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, 
() -> stepDown(term, reason)));
      }
    
      private void stepDown(long term, StepDownReason reason) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to