tillrohrmann commented on a change in pull request #15396:
URL: https://github.com/apache/flink/pull/15396#discussion_r605059579



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -579,6 +580,19 @@ protected static Configuration loadConfiguration(
 
     public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {
 
+        // Register the signal handler so that we could have the chance to 
execute the shutdown
+        // supplier before handling the signal.
+        SignalHandler.register(
+                LOG,
+                () -> {
+                    if (clusterEntrypoint.isShutDown.get()) {
+                        LOG.info("Waiting for concurrent shutDownAsync 
finished.");
+                        return 
clusterEntrypoint.terminationFuture.thenAccept(ignored -> {});
+                    } else {
+                        return FutureUtils.completedVoidFuture();
+                    }
+                });

Review comment:
       It would also be great if we could guard this behaviour with a test.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
##########
@@ -53,6 +63,13 @@ public void handle(Signal signal) {
                     "RECEIVED SIGNAL {}: SIG{}. Shutting down as requested.",
                     signal.getNumber(),
                     signal.getName());
+            try {
+                shutdownSupplier
+                        .get()
+                        .get(SHUTDOWN_TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);

Review comment:
       Shouldn't we not wait on `SIGKILL`? Or differently said: Shouldn't we 
only wait on `SIGTERM`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -579,6 +580,19 @@ protected static Configuration loadConfiguration(
 
     public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {
 
+        // Register the signal handler so that we could have the chance to 
execute the shutdown
+        // supplier before handling the signal.
+        SignalHandler.register(
+                LOG,
+                () -> {
+                    if (clusterEntrypoint.isShutDown.get()) {
+                        LOG.info("Waiting for concurrent shutDownAsync 
finished.");
+                        return 
clusterEntrypoint.terminationFuture.thenAccept(ignored -> {});
+                    } else {
+                        return FutureUtils.completedVoidFuture();
+                    }
+                });

Review comment:
       I am not 100% convinced that we should use the `SignalHandler`. I still 
like the idea of registering a shutdown hook which simply calls 
`clusterEntrypoint.closeAsync().join()`. That way we don't have to distinguish 
between the different signals (SIGTERM and SIGKILL for example).
   
   In order to solve the problem that we don't want to deregister the 
application if `closeAsync` is called, we could introduce something like a shut 
down reason. If Flink wants to shut down, then we deregister the application. 
If somebody else kills the process, then we don't deregister the application. 
Similar to how we handle the ha cleanup. What do you think?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -366,7 +366,7 @@ protected MetricRegistryImpl createMetricRegistry(
         return shutDownAsync(
                         ApplicationStatus.UNKNOWN,
                         "Cluster entrypoint has been closed externally.",
-                        true)
+                        false)

Review comment:
       Can we add a test for this behaviour?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to