szetszwo commented on code in PR #1258:
URL: https://github.com/apache/ratis/pull/1258#discussion_r2087217760
##########
ratis-docs/src/site/markdown/configurations.md:
##########
@@ -218,6 +218,14 @@ if it fails to receive any RPC responses from this peer
within this specified ti
| **Type** | TimeDuration
|
| **Default** | 60s
|
+### Event - Define event-related configuration for Leader
+
+
+| **Property** | `raft.server.read.leader.event.queue.element-limit`
|
Review Comment:
@jianghuazhu , there may be a misunderstanding -- the event queue is for the
STEP_DOWN, UPDATE_COMMIT and CHECK_STAGING events but not for read. It should
only have a few events in the queue.
I suggest to have the following improvements instead:
- Remove `newTerm` from `StateUpdateEvent` in order to make the equals
method simple. After that, it could only have 3 different events.
- Change submit(..) to avoid duplicated events and remove the duplication
checks from poll().
- The queue length (4096) is at most 3 and the capacity can be reduced to 3.
```diff
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index dd4b9dc1a9..ac7f6cb40f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -112,12 +112,10 @@ class LeaderStateImpl implements LeaderState {
}
private final Type type;
- private final long newTerm;
private final Runnable handler;
- StateUpdateEvent(Type type, long newTerm, Runnable handler) {
+ StateUpdateEvent(Type type, Runnable handler) {
this.type = type;
- this.newTerm = newTerm;
this.handler = handler;
}
@@ -133,25 +131,29 @@ class LeaderStateImpl implements LeaderState {
return false;
}
final StateUpdateEvent that = (StateUpdateEvent)obj;
- return this.type == that.type && this.newTerm == that.newTerm;
+ return this.type == that.type;
}
@Override
public int hashCode() {
- return Objects.hash(type, newTerm);
+ return type.hashCode();
}
@Override
public String toString() {
- return type + (newTerm >= 0? ":" + newTerm: "");
+ return type.name();
}
}
private class EventQueue {
private final String name =
ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
- private final BlockingQueue<StateUpdateEvent> queue = new
ArrayBlockingQueue<>(4096);
+ private final BlockingQueue<StateUpdateEvent> queue = new
ArrayBlockingQueue<>(StateUpdateEvent.Type.values().length);
- void submit(StateUpdateEvent event) {
+ // submit can be invoked by different threads -- need to be synchronized
+ synchronized void submit(StateUpdateEvent event) {
+ if (queue.contains(event)) { // avoid duplicated events
+ return;
+ }
try {
queue.put(event);
} catch (InterruptedException e) {
@@ -160,10 +162,10 @@ class LeaderStateImpl implements LeaderState {
}
}
+ // poll is invoked only by the EventProcessor thread -- synchronized is
not needed
StateUpdateEvent poll() {
- final StateUpdateEvent e;
try {
- e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
+ return queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
String s = this + ": poll() is interrupted";
@@ -174,14 +176,6 @@ class LeaderStateImpl implements LeaderState {
throw new IllegalStateException(s + " UNEXPECTEDLY", ie);
}
}
-
- if (e != null) {
- // remove duplicated events from the head.
- while(e.equals(queue.peek())) {
- queue.poll();
- }
- }
- return e;
}
@Override
@@ -323,9 +317,9 @@ class LeaderStateImpl implements LeaderState {
}
private final StateUpdateEvent updateCommitEvent =
- new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1,
this::updateCommit);
+ new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT,
this::updateCommit);
private final StateUpdateEvent checkStagingEvent =
- new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1,
this::checkStaging);
+ new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING,
this::checkStaging);
private final String name;
private final RaftServerImpl server;
@@ -704,7 +698,7 @@ class LeaderStateImpl implements LeaderState {
}
void submitStepDownEvent(long term, StepDownReason reason) {
- eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN,
term, () -> stepDown(term, reason)));
+ eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN,
() -> stepDown(term, reason)));
}
private void stepDown(long term, StepDownReason reason) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]