Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167610671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) throws FlinkException { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } - - terminationFuture.complete(true); } if (exception != null) { throw new FlinkException("Could not properly shut down the cluster services.", exception); } } + protected void stopClusterComponents() throws Exception { + synchronized (lock) { + Throwable exception = null; + + if (webMonitorEndpoint != null) { + webMonitorEndpoint.shutdown(Time.seconds(10L)); + } + + if (dispatcherLeaderRetrievalService != null) { + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (dispatcher != null) { + try { + dispatcher.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManagerRetrievalService != null) { + try { + resourceManagerRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManager != null) { + try { + resourceManager.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (archivedExecutionGraphStore != null) { + try { + archivedExecutionGraphStore.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (transientBlobCache != null) { + try { + transientBlobCache.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the session cluster entry point.", exception); + } + } + } + @Override public void onFatalError(Throwable exception) { LOG.error("Fatal error occurred in the cluster entrypoint.", exception); System.exit(RUNTIME_FAILURE_RETURN_CODE); } - protected abstract void startClusterComponents( + // -------------------------------------------------- + // Internal methods + // -------------------------------------------------- + + private void shutDownAndTerminate( + int returnCode, + ApplicationStatus applicationStatus, + @Nullable String diagnostics, --- End diff -- `diagnostics` is unused. You already log errors so I don't think this argument is needed.
---