[
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361018#comment-16361018
]
ASF GitHub Bot commented on FLINK-8608:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r167593183
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
@@ -671,13 +674,68 @@ public void jobFinishedByOther() {
log.info("Job {} was finished by other JobManager.",
jobId);
runAsync(
- () -> {
- try {
- removeJob(jobId, false);
- } catch (Exception e) {
- log.warn("Could not properly
remove job {} from the dispatcher.", jobId, e);
- }
- });
+ () ->
Dispatcher.this.jobFinishedByOther(jobId));
}
}
+
+ //------------------------------------------------------
+ // Factories
+ //------------------------------------------------------
+
+ /**
+ * Factory for a {@link JobManagerRunner}.
+ */
+ @FunctionalInterface
+ public interface JobManagerRunnerFactory {
+ JobManagerRunner createJobManagerRunner(
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ BlobServer blobServer,
+ JobManagerServices jobManagerServices,
+ MetricRegistry metricRegistry,
+ OnCompletionActions onCompleteActions,
+ FatalErrorHandler fatalErrorHandler,
+ @Nullable String restAddress) throws Exception;
+ }
+
+ /**
+ * Singleton default factory for {@link JobManagerRunner}.
+ */
+ public enum DefaultJobManagerRunnerFactory implements
JobManagerRunnerFactory {
+ INSTANCE {
--- End diff --
Since there is only one instance, it is ok to write:
```
public enum DefaultJobManagerRunnerFactory implements
JobManagerRunnerFactory {
INSTANCE;
@Override
public JobManagerRunner createJobManagerRunner(
ResourceID resourceId,
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception {
return new JobManagerRunner(
resourceId,
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
onCompleteActions,
fatalErrorHandler,
restAddress);
}
}
```
Saves one level of indentation.
> Add MiniDispatcher for job mode
> -------------------------------
>
> Key: FLINK-8608
> URL: https://issues.apache.org/jira/browse/FLINK-8608
> Project: Flink
> Issue Type: New Feature
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which
> is started with a pre initialized {{JobGraph}} and launches a single
> {{JobManagerRunner}} with this job. Once the job is completed and if the
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should
> terminate.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)