Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r167850253
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
@@ -671,13 +674,68 @@ public void jobFinishedByOther() {
log.info("Job {} was finished by other JobManager.",
jobId);
runAsync(
- () -> {
- try {
- removeJob(jobId, false);
- } catch (Exception e) {
- log.warn("Could not properly
remove job {} from the dispatcher.", jobId, e);
- }
- });
+ () ->
Dispatcher.this.jobFinishedByOther(jobId));
}
}
+
+ //------------------------------------------------------
+ // Factories
+ //------------------------------------------------------
+
+ /**
+ * Factory for a {@link JobManagerRunner}.
+ */
+ @FunctionalInterface
+ public interface JobManagerRunnerFactory {
+ JobManagerRunner createJobManagerRunner(
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ BlobServer blobServer,
+ JobManagerServices jobManagerServices,
+ MetricRegistry metricRegistry,
+ OnCompletionActions onCompleteActions,
+ FatalErrorHandler fatalErrorHandler,
+ @Nullable String restAddress) throws Exception;
+ }
+
+ /**
+ * Singleton default factory for {@link JobManagerRunner}.
+ */
+ public enum DefaultJobManagerRunnerFactory implements
JobManagerRunnerFactory {
+ INSTANCE {
--- End diff --
True. Will change it.
---