Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r167853480
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
---
@@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData)
throws FlinkException {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
-
- terminationFuture.complete(true);
}
if (exception != null) {
throw new FlinkException("Could not properly shut down
the cluster services.", exception);
}
}
+ protected void stopClusterComponents() throws Exception {
+ synchronized (lock) {
+ Throwable exception = null;
+
+ if (webMonitorEndpoint != null) {
+ webMonitorEndpoint.shutdown(Time.seconds(10L));
+ }
+
+ if (dispatcherLeaderRetrievalService != null) {
+ try {
+ dispatcherLeaderRetrievalService.stop();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (dispatcher != null) {
+ try {
+ dispatcher.shutDown();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (resourceManagerRetrievalService != null) {
+ try {
+ resourceManagerRetrievalService.stop();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (resourceManager != null) {
+ try {
+ resourceManager.shutDown();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (archivedExecutionGraphStore != null) {
+ try {
+ archivedExecutionGraphStore.close();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (transientBlobCache != null) {
+ try {
+ transientBlobCache.close();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly
shut down the session cluster entry point.", exception);
+ }
+ }
+ }
+
@Override
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred in the cluster entrypoint.",
exception);
System.exit(RUNTIME_FAILURE_RETURN_CODE);
}
- protected abstract void startClusterComponents(
+ // --------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------
+
+ private void shutDownAndTerminate(
+ int returnCode,
+ ApplicationStatus applicationStatus,
+ @Nullable String diagnostics,
--- End diff --
True at the moment it won't be used. Initially I left it in because of the
Yarn based cluster entrypoints. The old implementation actively deregisters
from yarn passing this as additional information. For the moment, I'll remove
it.
---