reswqa commented on code in PR #22422:
URL: https://github.com/apache/flink/pull/22422#discussion_r1171309501
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java:
##########
@@ -36,14 +37,44 @@ public interface LeaderElectionEventHandler {
* Called by specific {@link LeaderElectionDriver} when the leadership is
granted.
*
* @param newLeaderSessionId the valid leader session id
+ * @param executorService the {@link ExecutorService} this call should be
executed on.
*/
- void onGrantLeadership(UUID newLeaderSessionId);
+ void onGrantLeadershipAsync(UUID newLeaderSessionId, ExecutorService
executorService);
+
+ /**
+ * Called by specific {@link LeaderElectionDriver} when the leadership is
granted.
+ *
+ * @param newLeaderSessionId the valid leader session id
+ */
+ void onGrantLeadershipAsync(UUID newLeaderSessionId);
Review Comment:
I'm a bit confused, why is this method named with async suffix? I know this
is to make `DefaultMultipleComponentLeaderElectionService` use the executor in
`DefaultLeaderElectionService`, but from the protocol of the interface, it's a
bit difficult to understand. At least we can emphasize in Java doc that this
method must not be executed on the caller thread.
The following is the doc for the `thenRunAsync` method of `CompletionStage`,
which looks much more intuitive.
```
/*
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using this stage's default
* asynchronous execution facility.
*/
public CompletionStage<Void> thenRunAsync(Runnable action);
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -67,7 +72,20 @@
// this.running=true ensures that leaderContender != null
private LeaderElectionDriver leaderElectionDriver;
+ @GuardedBy("lock")
+ private final ExecutorService leadershipOperationExecutor;
Review Comment:
Why this field is guarded by lock?
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java:
##########
@@ -195,12 +196,14 @@ private class LeaderCallbackHandlerImpl extends
KubernetesLeaderElector.LeaderCa
@Override
public void isLeader() {
- leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+ leaderElectionEventHandler.onGrantLeadershipAsync(
+ UUID.randomUUID(), Executors.newDirectExecutorService());
Review Comment:
Can `Executors.directExecutor()` be used here to replace
`Executors.newDirectExecutorService()` 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -67,7 +72,20 @@
// this.running=true ensures that leaderContender != null
private LeaderElectionDriver leaderElectionDriver;
+ @GuardedBy("lock")
+ private final ExecutorService leadershipOperationExecutor;
+
+ @VisibleForTesting
Review Comment:
Why does it need to be marked as `VisibleForTesting` here, Isn't there still
a lot of external production code calling it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]