[
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361030#comment-16361030
]
ASF GitHub Bot commented on FLINK-8608:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r167610671
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
---
@@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData)
throws FlinkException {
exception =
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
-
- terminationFuture.complete(true);
}
if (exception != null) {
throw new FlinkException("Could not properly shut down
the cluster services.", exception);
}
}
+ protected void stopClusterComponents() throws Exception {
+ synchronized (lock) {
+ Throwable exception = null;
+
+ if (webMonitorEndpoint != null) {
+ webMonitorEndpoint.shutdown(Time.seconds(10L));
+ }
+
+ if (dispatcherLeaderRetrievalService != null) {
+ try {
+ dispatcherLeaderRetrievalService.stop();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (dispatcher != null) {
+ try {
+ dispatcher.shutDown();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (resourceManagerRetrievalService != null) {
+ try {
+ resourceManagerRetrievalService.stop();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (resourceManager != null) {
+ try {
+ resourceManager.shutDown();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (archivedExecutionGraphStore != null) {
+ try {
+ archivedExecutionGraphStore.close();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (transientBlobCache != null) {
+ try {
+ transientBlobCache.close();
+ } catch (Throwable t) {
+ exception =
ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly
shut down the session cluster entry point.", exception);
+ }
+ }
+ }
+
@Override
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred in the cluster entrypoint.",
exception);
System.exit(RUNTIME_FAILURE_RETURN_CODE);
}
- protected abstract void startClusterComponents(
+ // --------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------
+
+ private void shutDownAndTerminate(
+ int returnCode,
+ ApplicationStatus applicationStatus,
+ @Nullable String diagnostics,
--- End diff --
`diagnostics` is unused. You already log errors so I don't think this
argument is needed.
> Add MiniDispatcher for job mode
> -------------------------------
>
> Key: FLINK-8608
> URL: https://issues.apache.org/jira/browse/FLINK-8608
> Project: Flink
> Issue Type: New Feature
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which
> is started with a pre initialized {{JobGraph}} and launches a single
> {{JobManagerRunner}} with this job. Once the job is completed and if the
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should
> terminate.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)