Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5431#discussion_r167610671
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
    @@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) 
throws FlinkException {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }
    -
    -                   terminationFuture.complete(true);
                }
     
                if (exception != null) {
                        throw new FlinkException("Could not properly shut down 
the cluster services.", exception);
                }
        }
     
    +   protected void stopClusterComponents() throws Exception {
    +           synchronized (lock) {
    +                   Throwable exception = null;
    +
    +                   if (webMonitorEndpoint != null) {
    +                           webMonitorEndpoint.shutdown(Time.seconds(10L));
    +                   }
    +
    +                   if (dispatcherLeaderRetrievalService != null) {
    +                           try {
    +                                   dispatcherLeaderRetrievalService.stop();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (dispatcher != null) {
    +                           try {
    +                                   dispatcher.shutDown();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (resourceManagerRetrievalService != null) {
    +                           try {
    +                                   resourceManagerRetrievalService.stop();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (resourceManager != null) {
    +                           try {
    +                                   resourceManager.shutDown();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (archivedExecutionGraphStore != null) {
    +                           try {
    +                                   archivedExecutionGraphStore.close();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (transientBlobCache != null) {
    +                           try {
    +                                   transientBlobCache.close();
    +                           } catch (Throwable t) {
    +                                   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
    +                           }
    +                   }
    +
    +                   if (exception != null) {
    +                           throw new FlinkException("Could not properly 
shut down the session cluster entry point.", exception);
    +                   }
    +           }
    +   }
    +
        @Override
        public void onFatalError(Throwable exception) {
                LOG.error("Fatal error occurred in the cluster entrypoint.", 
exception);
     
                System.exit(RUNTIME_FAILURE_RETURN_CODE);
        }
     
    -   protected abstract void startClusterComponents(
    +   // --------------------------------------------------
    +   // Internal methods
    +   // --------------------------------------------------
    +
    +   private void shutDownAndTerminate(
    +           int returnCode,
    +           ApplicationStatus applicationStatus,
    +           @Nullable String diagnostics,
    --- End diff --
    
    `diagnostics` is unused. You already log errors so I don't think this 
argument is needed.


---

Reply via email to