mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712263815



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) 
{
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new 
JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, 
metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap 
metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, 
finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> 
metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       The intent behind introducing `JobCoordinator` is to also have it used 
with `ClusterBasedJobCoordinator`. It isn't tied to `StreamProcessor` as start, 
stop, job model and listeners are agnostic to choice of streamprocessor vs 
otherwise. I'd not consider the deprecated `getProcessorId` as a tie up to 
`StreamProcessor`.
   
   The divergence here with each job coordinator not following a fixed contract 
further takes us away from consolidating the job coordinator flows and have 
future leverage across these job coordinators in terms of feature evolution.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to