mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712263815
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config)
{
CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
DiagnosticsUtil.createDiagnosticsStream(finalConfig);
- ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
- metrics,
- metadataStore,
- finalConfig);
- jc.run();
+ if (new
JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+ runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+ } else {
+ ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics,
metadataStore, finalConfig);
+ jc.run();
+ }
+ }
+
+ private static void runStaticResourceJobCoordinator(MetricsRegistryMap
metrics, MetadataStore metadataStore,
+ Config finalConfig) {
+ StaticResourceJobCoordinator staticResourceJobCoordinator =
+ StaticResourceJobCoordinator.build(metrics, metadataStore,
finalConfig);
+ addShutdownHook(staticResourceJobCoordinator);
+ Map<String, MetricsReporter> metricsReporters =
+ MetricsReporterLoader.getMetricsReporters(new
MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+ metricsReporters.values()
+ .forEach(metricsReporter ->
metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+ metricsReporters.values().forEach(MetricsReporter::start);
+ staticResourceJobCoordinator.run();
+ metricsReporters.values().forEach(MetricsReporter::stop);
+ }
+
Review comment:
The intent behind introducing `JobCoordinator` is to also have it used
with `ClusterBasedJobCoordinator`. It isn't tied to `StreamProcessor` as start,
stop, job model and listeners are agnostic to choice of streamprocessor vs
otherwise. I'd not consider the deprecated `getProcessorId` as a tie up to
`StreamProcessor`.
The divergence here with each job coordinator not following a fixed contract
further takes us away from consolidating the job coordinator flows and have
future leverage across these job coordinators in terms of feature evolution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]