reswqa commented on code in PR #22422:
URL: https://github.com/apache/flink/pull/22422#discussion_r1171309501


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java:
##########
@@ -36,14 +37,44 @@ public interface LeaderElectionEventHandler {
      * Called by specific {@link LeaderElectionDriver} when the leadership is 
granted.
      *
      * @param newLeaderSessionId the valid leader session id
+     * @param executorService the {@link ExecutorService} this call should be 
executed on.
      */
-    void onGrantLeadership(UUID newLeaderSessionId);
+    void onGrantLeadershipAsync(UUID newLeaderSessionId, ExecutorService 
executorService);
+
+    /**
+     * Called by specific {@link LeaderElectionDriver} when the leadership is 
granted.
+     *
+     * @param newLeaderSessionId the valid leader session id
+     */
+    void onGrantLeadershipAsync(UUID newLeaderSessionId);

Review Comment:
   I'm a bit confused, why is this method named with async suffix? I know this 
is to make `DefaultMultipleComponentLeaderElectionService` use the executor in 
`DefaultLeaderElectionService`, but from the protocol of the interface, it's a 
bit difficult to understand. At least we can emphasize in Java doc that this 
method must not be executed on the caller thread.
   
   The following is the doc for the `thenRunAsync` method of `CompletionStage`, 
which looks much more intuitive.
   ```
       /*
        * Returns a new CompletionStage that, when this stage completes
        * normally, executes the given action using this stage's default
        * asynchronous execution facility.
        */
       public CompletionStage<Void> thenRunAsync(Runnable action);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -67,7 +72,20 @@
     // this.running=true ensures that leaderContender != null
     private LeaderElectionDriver leaderElectionDriver;
 
+    @GuardedBy("lock")
+    private final ExecutorService leadershipOperationExecutor;

Review Comment:
   Why this field is guarded by lock?



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java:
##########
@@ -195,12 +196,14 @@ private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCa
 
         @Override
         public void isLeader() {
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            leaderElectionEventHandler.onGrantLeadershipAsync(
+                    UUID.randomUUID(), Executors.newDirectExecutorService());

Review Comment:
   Can `Executors.directExecutor()` be used here to replace 
`Executors.newDirectExecutorService()` 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -67,7 +72,20 @@
     // this.running=true ensures that leaderContender != null
     private LeaderElectionDriver leaderElectionDriver;
 
+    @GuardedBy("lock")
+    private final ExecutorService leadershipOperationExecutor;
+
+    @VisibleForTesting

Review Comment:
   Why does it need to be marked as `VisibleForTesting` here, Isn't there still 
a lot of external production code calling it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to