mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r709379895



##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import 
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import 
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes 
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, 
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an 
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory 
streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, 
MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new 
JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, 
metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new 
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new 
JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       I noticed some places have ClusterType as Kubernetes (may be no in this 
PR). Should this be `Kubernetes` instead of broadly classified as `Non_Yarn`? 
What are the benefits of modeling this as more generic? It seems to give a vibe 
that standalone can fall into this bucket too.

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link 
CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobModelProvider jobModelProvider;
+  private final Config config;
+  private final MetricsRegistry metricsRegistry;
+
+  public CoordinatorCommunicationContext(JobModelProvider jobModelProvider, 
Config config,
+      MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.jobModelProvider = jobModelProvider;
+    this.metricsRegistry = metricsRegistry;

Review comment:
       Is the pattern of having a provider instead of direct job model because 
job model is absent? Or, is it tied to the fact that job model can change 
within the lifecycle and hence the provider.
   
   If latter, what about config? Are we treating them as immutable within the 
lifecycle of the JC?

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) 
{
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new 
JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, 
metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap 
metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, 
finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> 
metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       Can we use job coordinator factory instead and just have a common 
interface for job coordinators to expose `run` method? I saw your note on 
consolidation although this seems even high layer that can be consolidated so 
that the iteration to this class can be minimized with any iteration within the 
`StaticResourceJobCoordinator`?

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor 
streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;
+  }
+
+  public void start() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.start();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.start();
+    }
+  }
+
+  public void stop() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.stop();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.stop();
+    }
+  }

Review comment:
       We have an implicit assumption that start is invoked on the monitors in 
the event of non-null monitors. Do we need an explicit signal to ensure `start` 
has indeed been called and completed?
   
   If not, are we relying on the underlying monitor contracts to treat `stop` 
on incomplete `start` as no-op.

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import 
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import 
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes 
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, 
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an 
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, 
MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new 
JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, 
metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new 
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new 
JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new 
StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, 
StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, 
streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, 
jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, 
changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, 
JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, 
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager 
changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();

Review comment:
       shouldn't this be handling for null scenario? Why not use optional 
instead? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor 
streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;

Review comment:
       It will be ideal to have this take non-null monitors or no-op monitors 
in case of monitors being absent. 
   
   I noticed some of the creation flows in the previous have optional being 
created from these factories. Seems a bit of inconsistent with factories 
returning optional but these entities accepting `null`. 
   
   I also don't like optional being passed in through construction as its an 
anti-pattern. Hence no-op seemed like cleaner or you can have the instance 
variable store optional which isn't terrible.

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import 
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import 
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes 
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, 
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an 
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, 
MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new 
JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, 
metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new 
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new 
JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new 
StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, 
StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, 
streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, 
jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, 
changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, 
JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, 
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager 
changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, 
jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = 
checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();

Review comment:
       same as above. needs to handle null or convert it to optional. I'd 
prefer latter so that people are forced to know this can be absent.

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import 
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import 
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes 
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, 
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an 
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, 
MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new 
JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, 
metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new 
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new 
JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new 
StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, 
StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, 
streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, 
jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, 
changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, 
JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, 
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager 
changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, 
jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = 
checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the 
shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore 
metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new 
TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, 
taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, 
this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, 
JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      
this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in 
which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a 
shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);
+    }
+  }
+
+  private Set<JobMetadataChange> 
checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = 
this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return 
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+  }
+
+  private void waitForShutdownQuietly() {
+    try {
+      waitForShutdown();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to shutdown", e);
+    }
+  }

Review comment:
       What is the purpose of this wrapper around `waitForShutdown`? What 
happens if it throws exception while shutdown.

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements 
CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication 
coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new 
JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", 
clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       What about locality servlet? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -33,6 +33,8 @@
   public final static String DEFAULT_COORDINATOR_FACTORY = 
ZkJobCoordinatorFactory.class.getName();
   private static final String AZURE_COORDINATION_UTILS_FACTORY = 
"org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
   private static final String AZURE_COORDINATOR_FACTORY = 
"org.apache.samza.coordinator.AzureJobCoordinatorFactory";
+  public static final String USE_STATIC_RESOURCE_JOB_COORDINATOR =

Review comment:
       Referring to earlier comment, can we use the factory instead of having a 
boolean here?

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import 
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import 
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes 
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, 
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an 
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, 
MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new 
JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, 
metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new 
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new 
JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new 
StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, 
StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, 
streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, 
jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, 
changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, 
JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, 
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager 
changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, 
jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = 
checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the 
shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore 
metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new 
TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, 
taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, 
this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, 
JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      
this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in 
which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a 
shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       Right now, the signal shutdown seems to set the boolean and countdown 
the latch. If so, why do we need a timeout here? Can we just await forever as 
there isn't anything within `signalShutdown` to stall shutdown sequence and 
needs to be timeboxed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to