This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c570a3d  SAMZA-2685: Add job coordinator which does not do resource 
management (#1529)
c570a3d is described below

commit c570a3ddfc91becb6269926231f5737038f02fff
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Sep 28 13:06:24 2021 -0700

    SAMZA-2685: Add job coordinator which does not do resource management 
(#1529)
    
    Adding a job coordinator which does not do resource management. For Samza 
on YARN, the ClusterBasedJobCoordinator does resource management. However, for 
Samza on Kubernetes, it is not necessary to have a job coordinator which does 
resource management, because Kubernetes controllers can take care of resource 
management.
    
    API changes (all changes are backwards compatible):
    1. Set the config job.coordinator.factory to 
org.apache.samza.coordinator.staticresource.StaticResourceJobCoordinatorFactory 
in order to use the new coordinator.
    2. Set the config job.coordinator.restart.signal.factory to define how to 
restart the Samza job when an input stream changes which will change the job 
model. This plug-in is dependent on where the Samza job is running (e.g. 
Kubernetes). Currently, there is only a no-op implementation of this restart 
signal.
---
 .../clustermanager/JobCoordinatorLaunchUtil.java   |  71 +++++-
 .../apache/samza/config/JobCoordinatorConfig.java  |  14 +-
 .../apache/samza/coordinator/JobCoordinator.java   |  16 +-
 .../NoProcessorJobCoordinatorListener.java         |  62 +++++
 .../communication/CoordinatorCommunication.java    |  38 +++
 .../CoordinatorCommunicationContext.java           |  64 +++++
 .../CoordinatorToWorkerCommunicationFactory.java   |  35 +++
 .../HttpCoordinatorCommunication.java              |  60 +++++
 ...ttpCoordinatorToWorkerCommunicationFactory.java |  41 +++
 .../coordinator/communication/JobInfoProvider.java |  30 +++
 .../communication/JobInfoServingContext.java       |  48 ++++
 .../communication/JobModelHttpServlet.java         |  75 ++++++
 .../StaticResourceJobCoordinator.java              | 168 +++++++++++++
 .../StaticResourceJobCoordinatorFactory.java       |  88 +++++++
 .../apache/samza/metrics/BaseServerMetrics.java    |  34 +++
 .../TestJobCoordinatorLaunchUtil.java              | 132 ++++++++--
 .../samza/config/TestJobCoordinatorConfig.java     |  53 ++++
 .../TestNoProcessorJobCoordinatorListener.java     |  55 +++++
 .../TestHttpCoordinatorCommunication.java          |  63 +++++
 .../communication/TestJobModelHttpServlet.java     | 128 ++++++++++
 .../communication/TestJobModelServingContext.java  |  76 ++++++
 .../TestStaticResourceJobCoordinator.java          | 275 +++++++++++++++++++++
 22 files changed, 1589 insertions(+), 37 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index 63a1f5c..cb84956 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -19,6 +19,10 @@
 package org.apache.samza.clustermanager;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
@@ -26,13 +30,23 @@ import 
org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -40,6 +54,13 @@ import org.apache.samza.util.DiagnosticsUtil;
  * This util is being used by both high/low and beam API Samza jobs.
  */
 public class JobCoordinatorLaunchUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobCoordinatorLaunchUtil.class);
+  private static final String JOB_COORDINATOR_SOURCE_NAME = "JobCoordinator";
+  /**
+   * There is no processor associated with this job coordinator, so adding a 
placeholder value.
+   */
+  private static final String JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER = 
"samza-job-coordinator";
+
   /**
    * Run {@link ClusterBasedJobCoordinator} with full job config.
    *
@@ -76,11 +97,51 @@ public class JobCoordinatorLaunchUtil {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    Optional<String> jobCoordinatorFactoryClassName =
+        new 
JobCoordinatorConfig(config).getOptionalJobCoordinatorFactoryClassName();
+    if (jobCoordinatorFactoryClassName.isPresent()) {
+      runJobCoordinator(jobCoordinatorFactoryClassName.get(), metrics, 
metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, 
metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runJobCoordinator(String jobCoordinatorClassName, 
MetricsRegistryMap metrics,
+      MetadataStore metadataStore, Config finalConfig) {
+    JobCoordinatorFactory jobCoordinatorFactory =
+        ReflectionUtil.getObj(jobCoordinatorClassName, 
JobCoordinatorFactory.class);
+    JobCoordinator jobCoordinator =
+        
jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER,
 finalConfig, metrics,
+            metadataStore);
+    addShutdownHook(jobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> 
metricsReporter.register(JOB_COORDINATOR_SOURCE_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    CountDownLatch waitForShutdownLatch = new CountDownLatch(1);
+    jobCoordinator.setListener(new 
NoProcessorJobCoordinatorListener(waitForShutdownLatch));
+    jobCoordinator.start();
+    try {
+      waitForShutdownLatch.await();
+    } catch (InterruptedException e) {
+      String errorMessage = "Error while waiting for coordinator to complete";
+      LOG.error(errorMessage, e);
+      throw new SamzaException(errorMessage, e);
+    } finally {
+      metricsReporters.values().forEach(MetricsReporter::stop);
+    }
+  }
+
+  /**
+   * This is a separate method so it can be stubbed in tests, since adding a 
real shutdown hook will cause the hook to
+   * added to the test suite JVM.
+   */
+  @VisibleForTesting
+  static void addShutdownHook(JobCoordinator jobCoordinator) {
+    Runtime.getRuntime()
+        .addShutdownHook(new Thread(jobCoordinator::stop, "Samza Job 
Coordinator Shutdown Hook Thread"));
   }
 
   private JobCoordinatorLaunchUtil() {}
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index 5f2ca29..c97628c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.config;
 
+import java.util.Optional;
 import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
@@ -70,11 +71,12 @@ public class JobCoordinatorConfig extends MapConfig {
   }
 
   public String getJobCoordinatorFactoryClassName() {
-    String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
-    if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
-      return ZkJobCoordinatorFactory.class.getName();
-    } else {
-      return jobCoordinatorFactoryClassName;
-    }
+    return getOptionalJobCoordinatorFactoryClassName()
+        .filter(className -> !Strings.isNullOrEmpty(className))
+        .orElse(ZkJobCoordinatorFactory.class.getName());
+  }
+
+  public Optional<String> getOptionalJobCoordinatorFactoryClassName() {
+    return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY));
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index cd10acb..f35c1fc 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -22,14 +22,10 @@ import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.job.model.JobModel;
 
 /**
- *  A JobCoordinator is a pluggable module in each process that provides the 
JobModel and the ID to the StreamProcessor.
+ * A JobCoordinator is a pluggable module in each process that coordinates 
with workers to do processing.
  *
- *  It is the responsibility of the JobCoordinator to assign a unique 
identifier to the StreamProcessor
- *  based on the underlying environment. In some cases, ID assignment is 
completely config driven, while in other
- *  cases, ID assignment may require coordination with JobCoordinators of 
other StreamProcessors.
- *
- *  StreamProcessor registers a {@link JobCoordinatorListener} in order to get 
notified about JobModel changes and
- *  Coordinator state change.
+ * The process running a {@link JobCoordinator} can register a {@link 
JobCoordinatorListener} in order to get notified
+ * about JobModel changes and coordinator state change.
  *
  * <pre>
  *   {@code
@@ -38,9 +34,9 @@ import org.apache.samza.job.model.JobModel;
  *  *                 *         onNewJobModel    ************                  
*
  *  *                 *<<------------------------* Job      *                  
*
  *  *                 *     onJobModelExpired    * Co-      *                  
*
- *  *                 *<<------------------------* ordinator*                  
*
- *  * StreamProcessor *     onCoordinatorStop    * Listener *  JobCoordinator  
*
- *  *                 *<<------------------------*          *                  
*
+ *  * StreamProcessor *<<------------------------* ordinator*                  
*
+ *  *        or       *     onCoordinatorStop    * Listener *  JobCoordinator  
*
+ *  * other JC process*<<------------------------*          *                  
*
  *  *                 *  onCoordinatorFailure    *          *                  
*
  *  *                 *<<------------------------************                  
*
  *  *                 *  stop()                             *                  
*
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
new file mode 100644
index 0000000..d6da8c6
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.job.model.JobModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link JobCoordinatorListener} for a {@link JobCoordinator} which does not 
run alongside a processor.
+ */
+public class NoProcessorJobCoordinatorListener implements 
JobCoordinatorListener {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NoProcessorJobCoordinatorListener.class);
+  private final CountDownLatch waitForShutdownLatch;
+
+  public NoProcessorJobCoordinatorListener(CountDownLatch 
waitForShutdownLatch) {
+    this.waitForShutdownLatch = waitForShutdownLatch;
+  }
+
+  @Override
+  public void onJobModelExpired() {
+    // nothing to notify so far about job model expiration
+  }
+
+  @Override
+  public void onNewJobModel(String processorId, JobModel jobModel) {
+    // nothing to notify so far about new job model
+  }
+
+  @Override
+  public void onCoordinatorStop() {
+    this.waitForShutdownLatch.countDown();
+  }
+
+  /**
+   * There is currently no use case for bubbling up this exception, so just 
log the error and allow shutdown for now. If
+   * we get a future use case where it is useful to bubble up the exception, 
then we can update this class.
+   */
+  @Override
+  public void onCoordinatorFailure(Throwable t) {
+    LOG.error("Failure in coordinator, allowing shutdown to begin", t);
+    this.waitForShutdownLatch.countDown();
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java
new file mode 100644
index 0000000..daf0cc7
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+/**
+ * Interface for setting up communication components on the job coordinator 
side for coordinator-to-worker
+ * communication. For example, this could be implemented by an HTTP server.
+ *
+ * See {@link CoordinatorCommunicationContext} for the communication paths 
that need to be handled by this
+ * {@link CoordinatorCommunication} component.
+ */
+public interface CoordinatorCommunication {
+  /**
+   * Start the communication components.
+   */
+  void start();
+
+  /**
+   * Stop the communication components. This may be called even if {@link 
#start()} has not yet been called.
+   */
+  void stop();
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
new file mode 100644
index 0000000..42fd860
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link 
CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobInfoProvider jobInfoProvider;
+  private final Config configForFactory;
+  private final MetricsRegistry metricsRegistry;
+
+  /**
+   * @param jobInfoProvider provides dynamic access to job info to pass along 
to workers (e.g. job model, configs)
+   * @param configForFactory config to use to build the {@link 
CoordinatorCommunication}
+   */
+  public CoordinatorCommunicationContext(JobInfoProvider jobInfoProvider, 
Config configForFactory,
+      MetricsRegistry metricsRegistry) {
+    this.configForFactory = configForFactory;
+    this.jobInfoProvider = jobInfoProvider;
+    this.metricsRegistry = metricsRegistry;
+  }
+
+  /**
+   * Use this to access job model.
+   */
+  public JobInfoProvider getJobInfoProvider() {
+    return jobInfoProvider;
+  }
+
+  /**
+   * Use this to access the {@link Config} for building the {@link 
CoordinatorCommunication}.
+   * Do not use this as the {@link Config} to pass along to the workers. Use 
{@link #getJobInfoProvider()} for accessing
+   * the job model and config to pass along to workers.
+   */
+  public Config getConfigForFactory() {
+    return configForFactory;
+  }
+
+  public MetricsRegistry getMetricsRegistry() {
+    return metricsRegistry;
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorToWorkerCommunicationFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorToWorkerCommunicationFactory.java
new file mode 100644
index 0000000..a0b8fa1
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorToWorkerCommunicationFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+/**
+ * Factory for generating components for coordinator-to-work communication.
+ *
+ * SAMZA-2673: There should be a "WorkerCommunication" interface which pairs 
with the {@link CoordinatorCommunication},
+ * but that abstraction layer is not implemented yet. One communication path 
to be handled by WorkerCommunication is the
+ * query to get the job model.
+ */
+public interface CoordinatorToWorkerCommunicationFactory {
+  /**
+   * Build a {@link CoordinatorCommunication} to handle communication with the 
workers for the job.
+   */
+  CoordinatorCommunication 
coordinatorCommunication(CoordinatorCommunicationContext context);
+
+  // SAMZA-2673: an opportunity for improvement here is to add the 
WorkerCommunication layer
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorCommunication.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorCommunication.java
new file mode 100644
index 0000000..1b01a68
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorCommunication.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Runs an {@link HttpServer} to communicate with workers.
+ */
+public class HttpCoordinatorCommunication implements CoordinatorCommunication {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HttpCoordinatorCommunication.class);
+
+  private final HttpServer httpServer;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+  public HttpCoordinatorCommunication(HttpServer httpServer) {
+    this.httpServer = httpServer;
+  }
+
+  @Override
+  public void start() {
+    if (!this.isRunning.compareAndSet(false, true)) {
+      LOG.warn("Tried to start, but already started");
+    } else {
+      this.httpServer.start();
+      LOG.info("Started http server");
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (!this.isRunning.compareAndSet(true, false)) {
+      LOG.warn("Tried to stop, but not currently running");
+    } else {
+      this.httpServer.stop();
+      LOG.info("Stopped http server");
+    }
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
new file mode 100644
index 0000000..d7a2363
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements 
CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication 
coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(context.getConfigForFactory());
+    JobModelHttpServlet jobModelHttpServlet = new 
JobModelHttpServlet(context.getJobInfoProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", 
clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);
+    return new HttpCoordinatorCommunication(httpServer);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoProvider.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoProvider.java
new file mode 100644
index 0000000..32b5a33
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.util.Optional;
+
+
+/**
+ * Used by implementors of {@link CoordinatorCommunication} to access job 
information for coordinator-to-worker
+ * communication.
+ */
+public interface JobInfoProvider {
+  Optional<byte[]> getSerializedJobModel();
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoServingContext.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoServingContext.java
new file mode 100644
index 0000000..b1914b5
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoServingContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.util.Optional;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+
+
+/**
+ * Provides a way to access and set the job model.
+ * The {@link JobInfoProvider} part of this class is used by a {@link 
CoordinatorCommunication} implementation, and the
+ * "set job model" part is called by the job coordinator when there is a new 
job model.
+ */
+public class JobInfoServingContext implements JobInfoProvider {
+  private volatile byte[] serializedJobModel = null;
+
+  @Override
+  public Optional<byte[]> getSerializedJobModel() {
+    return Optional.ofNullable(this.serializedJobModel);
+  }
+
+  public void setJobModel(JobModel jobModel) {
+    try {
+      this.serializedJobModel = 
SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel);
+    } catch (JsonProcessingException e) {
+      throw new SamzaException("Failed to serialize job model", e);
+    }
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobModelHttpServlet.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobModelHttpServlet.java
new file mode 100644
index 0000000..2e946c0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobModelHttpServlet.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.metrics.BaseServerMetrics;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link HttpServlet} which provides access to job model.
+ */
+public class JobModelHttpServlet extends HttpServlet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobModelHttpServlet.class);
+  private static final String CONTENT_TYPE = "application/json;charset=UTF-8";
+
+  private final JobInfoProvider jobInfoProvider;
+  private final Metrics metrics;
+
+  public JobModelHttpServlet(JobInfoProvider jobInfoProvider, Metrics metrics) 
{
+    this.jobInfoProvider = jobInfoProvider;
+    this.metrics = metrics;
+  }
+
+  protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
+    this.metrics.incomingRequests.inc();
+    Optional<byte[]> serializedJobModel = 
this.jobInfoProvider.getSerializedJobModel();
+    if (serializedJobModel.isPresent()) {
+      try {
+        response.getOutputStream().write(serializedJobModel.get());
+        response.setContentType(CONTENT_TYPE);
+        response.setStatus(HttpServletResponse.SC_OK);
+        this.metrics.successfulResponses.inc();
+      } catch (IOException e) {
+        LOG.error("Failed to write response body", e);
+        this.metrics.failedResponses.inc();
+        response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+      }
+    } else {
+      LOG.warn("Received request for job model before job model is available");
+      this.metrics.failedResponses.inc();
+      response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+    }
+  }
+
+  public static class Metrics extends BaseServerMetrics {
+    private static final String GROUP = 
JobModelHttpServlet.class.getSimpleName();
+
+    public Metrics(MetricsRegistry metricsRegistry) {
+      super(metricsRegistry, GROUP);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
new file mode 100644
index 0000000..b9fe86f
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.staticresource;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.coordinator.JobModelHelper;
+import org.apache.samza.coordinator.MetadataResourceUtil;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.JobInfoServingContext;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.SystemAdmins;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes 
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, 
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an 
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator implements JobCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobInfoServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final Optional<StartpointManager> startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final String processorId;
+  private final Config config;
+
+  private Optional<JobModel> currentJobModel = Optional.empty();
+  private Optional<JobCoordinatorListener> jobCoordinatorListener = 
Optional.empty();
+
+  StaticResourceJobCoordinator(String processorId, JobModelHelper 
jobModelHelper, JobInfoServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, 
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager 
changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = Optional.ofNullable(startpointManager);
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.processorId = processorId;
+    this.config = config;
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.ifPresent(StartpointManager::start);
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, 
jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = 
checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      this.currentJobModel = Optional.of(jobModel);
+      this.jobCoordinatorListener.ifPresent(listener -> 
listener.onNewJobModel(this.processorId, jobModel));
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      
this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onJobModelExpired);
+      this.coordinatorCommunication.stop();
+      this.startpointManager.ifPresent(StartpointManager::stop);
+      this.systemAdmins.stop();
+    } finally {
+      
this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onCoordinatorStop);
+    }
+  }
+
+  @Override
+  public String getProcessorId() {
+    return this.processorId;
+  }
+
+  @Override
+  public void setListener(JobCoordinatorListener jobCoordinatorListener) {
+    this.jobCoordinatorListener = Optional.ofNullable(jobCoordinatorListener);
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return this.currentJobModel.orElse(null);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, 
this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, 
JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      
this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in 
which a new job model can trigger a fan out
+    if (this.startpointManager.isPresent() && !jobMetadataChanges.isEmpty()) {
+      
this.startpointManager.get().fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private Set<JobMetadataChange> 
checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = 
this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return 
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
new file mode 100644
index 0000000..23ee54c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.staticresource;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.JobModelCalculator;
+import org.apache.samza.coordinator.JobModelHelper;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import 
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import 
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobInfoServingContext;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+
+
+public class StaticResourceJobCoordinatorFactory implements 
JobCoordinatorFactory {
+  @Override
+  public JobCoordinator getJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry,
+      MetadataStore metadataStore) {
+    JobInfoServingContext jobModelServingContext = new JobInfoServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, 
metricsRegistry);
+    CoordinatorCommunication coordinatorCommunication =
+        new 
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new 
JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metricsRegistry);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new 
StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, 
StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, 
streamMetadataCache);
+    return new StaticResourceJobCoordinator(processorId, jobModelHelper, 
jobModelServingContext,
+        coordinatorCommunication, jobCoordinatorMetadataManager, 
startpointManager, changelogStreamManager,
+        metricsRegistry, systemAdmins, config);
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore 
metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new 
TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, 
taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/metrics/BaseServerMetrics.java 
b/samza-core/src/main/java/org/apache/samza/metrics/BaseServerMetrics.java
new file mode 100644
index 0000000..b6b822c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/BaseServerMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics;
+
+/**
+ * Provides basic metrics for components that are serving data.
+ */
+public class BaseServerMetrics {
+  public final Counter incomingRequests;
+  public final Counter successfulResponses;
+  public final Counter failedResponses;
+
+  public BaseServerMetrics(MetricsRegistry metricsRegistry, String group) {
+    this.incomingRequests = metricsRegistry.newCounter(group, 
"incoming-requests");
+    this.successfulResponses = metricsRegistry.newCounter(group, 
"successful-responses");
+    this.failedResponses = metricsRegistry.newCounter(group, 
"failed-responses");
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index af27423..9a95d33 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -18,28 +18,42 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.application.MockStreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.ReflectionUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -49,20 +63,19 @@ import static 
org.powermock.api.mockito.PowerMockito.verifyStatic;
 
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({
-    CoordinatorStreamUtil.class,
+@PrepareForTest({CoordinatorStreamUtil.class,
     JobCoordinatorLaunchUtil.class,
     CoordinatorStreamStore.class,
-    RemoteJobPlanner.class})
+    RemoteJobPlanner.class,
+    ReflectionUtil.class,
+    MetricsReporterLoader.class,
+    NoProcessorJobCoordinatorListener.class})
 public class TestJobCoordinatorLaunchUtil {
   @Test
-  public void testCreateFromConfigLoader() throws Exception {
-    Map<String, String> config = new HashMap<>();
-    config.put(JobConfig.CONFIG_LOADER_FACTORY, 
PropertiesConfigLoaderFactory.class.getName());
-    config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + 
"path",
-        getClass().getResource("/test.properties").getPath());
-    Config originalConfig = new JobConfig(ConfigUtil.loadConfig(new 
MapConfig(config)));
-    Config fullConfig = new MapConfig(originalConfig, 
Collections.singletonMap("isAfterPlanning", "true"));
+  public void testRunClusterBasedJobCoordinator() throws Exception {
+    Config originalConfig = buildOriginalConfig(ImmutableMap.of());
+    JobConfig fullConfig =
+        new JobConfig(new MapConfig(originalConfig, 
Collections.singletonMap("isAfterPlanning", "true")));
     Config autoSizingConfig = new 
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, 
"10"));
     Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
 
@@ -77,15 +90,102 @@ public class TestJobCoordinatorLaunchUtil {
     
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
     
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
     
PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
-    
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(new 
JobConfig(fullConfig)));
+    
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullConfig));
 
     JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
 
     
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
 eq(mockCoordinatorStreamStore), eq(finalConfig));
     verify(mockJC, times(1)).run();
     verifyStatic(times(1));
-    CoordinatorStreamUtil.createCoordinatorStream(any());
+    CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
     verifyStatic(times(1));
-    CoordinatorStreamUtil.writeConfigToCoordinatorStream(any(), anyBoolean());
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
+  }
+
+  @Test
+  public void testRunJobCoordinator() throws Exception {
+    String jobCoordinatorFactoryClass = 
"org.apache.samza.custom.MyJobCoordinatorFactory";
+    Config originalConfig =
+        
buildOriginalConfig(ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
 jobCoordinatorFactoryClass));
+    JobConfig fullConfig =
+        new JobConfig(new MapConfig(originalConfig, 
Collections.singletonMap("isAfterPlanning", "true")));
+    Config autoSizingConfig = new 
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, 
"10"));
+    Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
+
+    RemoteJobPlanner remoteJobPlanner = mock(RemoteJobPlanner.class);
+    CoordinatorStreamStore coordinatorStreamStore = 
mock(CoordinatorStreamStore.class);
+    JobCoordinatorFactory jobCoordinatorFactory = 
mock(JobCoordinatorFactory.class);
+    JobCoordinator jobCoordinator = mock(JobCoordinator.class);
+    // use a latch to keep track of when start has been called
+    CountDownLatch jobCoordinatorStartedLatch = new CountDownLatch(1);
+    doAnswer(invocation -> {
+      jobCoordinatorStartedLatch.countDown();
+      return null;
+    }).when(jobCoordinator).start();
+
+    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    PowerMockito.doNothing().when(CoordinatorStreamUtil.class, 
"createCoordinatorStream", any());
+    PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, 
"buildCoordinatorStreamConfig", any());
+    PowerMockito.doReturn(autoSizingConfig)
+        .when(CoordinatorStreamUtil.class, 
"readLaunchConfigFromCoordinatorStream", any(), any());
+    
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(coordinatorStreamStore);
+    
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(remoteJobPlanner);
+    
when(remoteJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullConfig));
+    PowerMockito.mockStatic(ReflectionUtil.class);
+    PowerMockito.doReturn(jobCoordinatorFactory)
+        .when(ReflectionUtil.class, "getObj", jobCoordinatorFactoryClass, 
JobCoordinatorFactory.class);
+    when(jobCoordinatorFactory.getJobCoordinator(eq("samza-job-coordinator"), 
eq(finalConfig), any(),
+        eq(coordinatorStreamStore))).thenReturn(jobCoordinator);
+    PowerMockito.spy(JobCoordinatorLaunchUtil.class);
+    PowerMockito.doNothing().when(JobCoordinatorLaunchUtil.class, 
"addShutdownHook", any());
+    MetricsReporter metricsReporter = mock(MetricsReporter.class);
+    Map<String, MetricsReporter> metricsReporterMap = 
ImmutableMap.of("reporter", metricsReporter);
+    PowerMockito.mockStatic(MetricsReporterLoader.class);
+    PowerMockito.doReturn(metricsReporterMap)
+        .when(MetricsReporterLoader.class, "getMetricsReporters", new 
MetricsConfig(finalConfig), "JobCoordinator");
+    NoProcessorJobCoordinatorListener jobCoordinatorListener = 
mock(NoProcessorJobCoordinatorListener.class);
+    
PowerMockito.whenNew(NoProcessorJobCoordinatorListener.class).withAnyArguments().thenReturn(jobCoordinatorListener);
+
+    Thread runThread = new Thread(() -> JobCoordinatorLaunchUtil.run(new 
MockStreamApplication(), originalConfig));
+    runThread.start();
+    // wait for job coordinator to be started before doing verifications
+    jobCoordinatorStartedLatch.await();
+
+    verifyStatic();
+    CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
+    verifyStatic();
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
+    verifyStatic();
+    JobCoordinatorLaunchUtil.addShutdownHook(jobCoordinator);
+    InOrder inOrder = Mockito.inOrder(metricsReporter, jobCoordinator);
+    inOrder.verify(metricsReporter).register(eq("JobCoordinator"), any());
+    inOrder.verify(metricsReporter).start();
+    ArgumentCaptor<CountDownLatch> countDownLatchArgumentCaptor = 
ArgumentCaptor.forClass(CountDownLatch.class);
+    
verifyNew(NoProcessorJobCoordinatorListener.class).withArguments(countDownLatchArgumentCaptor.capture());
+    inOrder.verify(jobCoordinator).setListener(jobCoordinatorListener);
+    inOrder.verify(jobCoordinator).start();
+
+    // wait some time and then make sure the run thread is still alive
+    Thread.sleep(Duration.ofMillis(500).toMillis());
+    assertTrue(runThread.isAlive());
+
+    // trigger the count down latch so that the run thread can exit
+    countDownLatchArgumentCaptor.getValue().countDown();
+    runThread.join(Duration.ofSeconds(10).toMillis());
+    assertFalse(runThread.isAlive());
+
+    verify(metricsReporter).stop();
+  }
+
+  private static Config buildOriginalConfig(Map<String, String> 
additionalConfig) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("job.factory.class", "org.apache.samza.job.MockJobFactory");
+    configMap.put("job.name", "test-job");
+    configMap.put("foo", "bar");
+    configMap.put("systems.coordinator.samza.factory",
+        
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
+    configMap.put("job.coordinator.system", "coordinator");
+    configMap.putAll(additionalConfig);
+    return new MapConfig(configMap);
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
 
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
new file mode 100644
index 0000000..ac7425f
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobCoordinatorConfig {
+  @Test
+  public void getJobCoordinatorFactoryClassName() {
+    assertEquals(ZkJobCoordinatorFactory.class.getName(),
+        new JobCoordinatorConfig(new 
MapConfig()).getJobCoordinatorFactoryClassName());
+
+    JobCoordinatorConfig jobCoordinatorConfig =
+        new JobCoordinatorConfig(new 
MapConfig(ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "")));
+    assertEquals(ZkJobCoordinatorFactory.class.getName(), 
jobCoordinatorConfig.getJobCoordinatorFactoryClassName());
+
+    jobCoordinatorConfig = new JobCoordinatorConfig(new MapConfig(
+        ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.custom.MyJobCoordinatorFactory")));
+    assertEquals("org.custom.MyJobCoordinatorFactory", 
jobCoordinatorConfig.getJobCoordinatorFactoryClassName());
+  }
+
+  @Test
+  public void getOptionalJobCoordinatorFactoryClassName() {
+    assertFalse(new JobCoordinatorConfig(new 
MapConfig()).getOptionalJobCoordinatorFactoryClassName().isPresent());
+
+    JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(new 
MapConfig(
+        ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.custom.MyJobCoordinatorFactory")));
+    
assertTrue(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent());
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestNoProcessorJobCoordinatorListener.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestNoProcessorJobCoordinatorListener.java
new file mode 100644
index 0000000..2a4e0f5
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestNoProcessorJobCoordinatorListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.verify;
+
+
+public class TestNoProcessorJobCoordinatorListener {
+  @Mock
+  private CountDownLatch waitForShutdownLatch;
+
+  private NoProcessorJobCoordinatorListener noProcessorJobCoordinatorListener;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.noProcessorJobCoordinatorListener = new 
NoProcessorJobCoordinatorListener(this.waitForShutdownLatch);
+  }
+
+  @Test
+  public void testOnCoordinatorStop() {
+    this.noProcessorJobCoordinatorListener.onCoordinatorStop();
+
+    verify(this.waitForShutdownLatch).countDown();
+  }
+
+  @Test
+  public void testOnCoordinatorFailure() {
+    this.noProcessorJobCoordinatorListener.onCoordinatorFailure(new 
RuntimeException());
+
+    verify(this.waitForShutdownLatch).countDown();
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestHttpCoordinatorCommunication.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestHttpCoordinatorCommunication.java
new file mode 100644
index 0000000..d53a39c
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestHttpCoordinatorCommunication.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.coordinator.server.HttpServer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+
+public class TestHttpCoordinatorCommunication {
+  @Mock
+  private HttpServer httpServer;
+
+  private HttpCoordinatorCommunication httpCoordinatorCommunication;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.httpCoordinatorCommunication = new 
HttpCoordinatorCommunication(this.httpServer);
+  }
+
+  @Test
+  public void testStartStop() {
+    this.httpCoordinatorCommunication.start();
+    verify(this.httpServer).start();
+    this.httpCoordinatorCommunication.start();
+    // consecutive stops should still only result in one start
+    verify(this.httpServer).start();
+
+    this.httpCoordinatorCommunication.stop();
+    verify(this.httpServer).stop();
+    this.httpCoordinatorCommunication.stop();
+    // consecutive stops should still only result in one stop
+    verify(this.httpServer).stop();
+  }
+
+  @Test
+  public void testStopOnly() {
+    this.httpCoordinatorCommunication.stop();
+    verify(this.httpServer, never()).stop();
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelHttpServlet.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelHttpServlet.java
new file mode 100644
index 0000000..8c80d43
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelHttpServlet.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalMatchers.aryEq;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestJobModelHttpServlet {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  @Mock
+  private JobInfoProvider jobInfoProvider;
+  @Mock
+  private HttpServletResponse httpServletResponse;
+  @Mock
+  private ServletOutputStream servletOutputStream;
+
+  private JobModelHttpServlet.Metrics metrics;
+  private JobModelHttpServlet jobModelHttpServlet;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.metrics = new JobModelHttpServlet.Metrics(new MetricsRegistryMap());
+    this.jobModelHttpServlet = new JobModelHttpServlet(this.jobInfoProvider, 
this.metrics);
+  }
+
+  @Test
+  public void testDoGet() throws IOException {
+    JobModel jobModel = jobModel();
+    when(this.jobInfoProvider.getSerializedJobModel()).thenReturn(
+        Optional.of(OBJECT_MAPPER.writeValueAsBytes(jobModel)));
+    
when(this.httpServletResponse.getOutputStream()).thenReturn(this.servletOutputStream);
+
+    this.jobModelHttpServlet.doGet(mock(HttpServletRequest.class), 
this.httpServletResponse);
+
+    
verify(this.httpServletResponse).setContentType("application/json;charset=UTF-8");
+    verify(this.httpServletResponse).setStatus(HttpServletResponse.SC_OK);
+    
verify(this.servletOutputStream).write(aryEq(OBJECT_MAPPER.writeValueAsBytes(jobModel)));
+    assertEquals(1, this.metrics.incomingRequests.getCount());
+    assertEquals(1, this.metrics.successfulResponses.getCount());
+    assertEquals(0, this.metrics.failedResponses.getCount());
+  }
+
+  @Test
+  public void testDoGetMissingJobModel() throws IOException {
+    
when(this.jobInfoProvider.getSerializedJobModel()).thenReturn(Optional.empty());
+
+    this.jobModelHttpServlet.doGet(mock(HttpServletRequest.class), 
this.httpServletResponse);
+
+    
verify(this.httpServletResponse).setStatus(HttpServletResponse.SC_NOT_FOUND);
+    verify(this.httpServletResponse, never()).setContentType(any());
+    verify(this.httpServletResponse, never()).getOutputStream();
+    assertEquals(1, this.metrics.incomingRequests.getCount());
+    assertEquals(0, this.metrics.successfulResponses.getCount());
+    assertEquals(1, this.metrics.failedResponses.getCount());
+  }
+
+  @Test
+  public void testDoGetFailureToWriteResponse() throws IOException {
+    when(this.jobInfoProvider.getSerializedJobModel()).thenReturn(
+        Optional.of(OBJECT_MAPPER.writeValueAsBytes(jobModel())));
+    
when(this.httpServletResponse.getOutputStream()).thenReturn(this.servletOutputStream);
+    doThrow(new IOException("failure to write to output 
stream")).when(this.servletOutputStream).write(any());
+
+    this.jobModelHttpServlet.doGet(mock(HttpServletRequest.class), 
this.httpServletResponse);
+
+    
verify(this.httpServletResponse).setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    verify(this.httpServletResponse, never()).setContentType(any());
+    assertEquals(1, this.metrics.incomingRequests.getCount());
+    assertEquals(0, this.metrics.successfulResponses.getCount());
+    assertEquals(1, this.metrics.failedResponses.getCount());
+  }
+
+  private static JobModel jobModel() {
+    Config config = new MapConfig(ImmutableMap.of("samza.user.config", 
"config-value"));
+    Map<String, ContainerModel> containerModelMap = ImmutableMap.of("0", new 
ContainerModel("0",
+        ImmutableMap.of(new TaskName("Partition 0"), new TaskModel(new 
TaskName("Partition 0"),
+            ImmutableSet.of(new SystemStreamPartition("system", "stream", new 
Partition(0))), new Partition(0)))));
+    return new JobModel(config, containerModelMap);
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelServingContext.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelServingContext.java
new file mode 100644
index 0000000..16cac04
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelServingContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobModelServingContext {
+  private JobInfoServingContext jobModelServingContext;
+
+  @Before
+  public void setup() {
+    this.jobModelServingContext = new JobInfoServingContext();
+  }
+
+  @Test
+  public void testSetGet() throws IOException {
+    // return empty if no job model has been set
+    
assertFalse(this.jobModelServingContext.getSerializedJobModel().isPresent());
+
+    Config config = new MapConfig(ImmutableMap.of("samza.user.config", 
"config-value"));
+    Map<String, ContainerModel> containerModelMap = ImmutableMap.of("0", new 
ContainerModel("0",
+        ImmutableMap.of(new TaskName("Partition 0"), new TaskModel(new 
TaskName("Partition 0"),
+            ImmutableSet.of(new SystemStreamPartition("system", "stream", new 
Partition(0))), new Partition(0)))));
+    JobModel jobModel = new JobModel(config, containerModelMap);
+    this.jobModelServingContext.setJobModel(jobModel);
+    Optional<byte[]> serializedJobModel = 
this.jobModelServingContext.getSerializedJobModel();
+    assertTrue(serializedJobModel.isPresent());
+    assertEquals(jobModel, 
SamzaObjectMapper.getObjectMapper().readValue(serializedJobModel.get(), 
JobModel.class));
+
+    config = new MapConfig(ImmutableMap.of("samza.user.config0", 
"config-value0"));
+    containerModelMap = ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(new TaskName("Partition 0"),
+        new TaskModel(new TaskName("Partition 0"),
+            ImmutableSet.of(new SystemStreamPartition("system0", "stream0", 
new Partition(0))), new Partition(0)))));
+    jobModel = new JobModel(config, containerModelMap);
+    this.jobModelServingContext.setJobModel(jobModel);
+    serializedJobModel = this.jobModelServingContext.getSerializedJobModel();
+    assertTrue(serializedJobModel.isPresent());
+    assertEquals(jobModel, 
SamzaObjectMapper.getObjectMapper().readValue(serializedJobModel.get(), 
JobModel.class));
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
new file mode 100644
index 0000000..4cbfe3b
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.staticresource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.coordinator.JobModelHelper;
+import org.apache.samza.coordinator.MetadataResourceUtil;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.JobInfoServingContext;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * In general, these tests start the {@link StaticResourceJobCoordinator} in a 
separate thread and then execute certain
+ * actions (trigger callbacks which will change the job model, trigger 
shutdown) to check the coordination flow.
+ */
+public class TestStaticResourceJobCoordinator {
+  private static final String PROCESSOR_ID = "samza-job-coordinator";
+  private static final SystemStream SYSTEM_STREAM = new SystemStream("system", 
"stream");
+  private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
+  private static final Map<String, ContainerModel> CONTAINERS = 
ImmutableMap.of("0", new ContainerModel("0",
+      ImmutableMap.of(TASK_NAME,
+          new TaskModel(TASK_NAME, ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM, new Partition(0))),
+              new Partition(0)))));
+  private static final Map<TaskName, Set<SystemStreamPartition>> 
SINGLE_SSP_FANOUT =
+      ImmutableMap.of(TASK_NAME, ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM, new Partition(0))));
+
+  @Mock
+  private JobModelHelper jobModelHelper;
+  @Mock
+  private JobInfoServingContext jobModelServingContext;
+  @Mock
+  private CoordinatorCommunication coordinatorCommunication;
+  @Mock
+  private JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  @Mock
+  private StartpointManager startpointManager;
+  @Mock
+  private ChangelogStreamManager changelogStreamManager;
+  @Mock
+  private Map<TaskName, Integer> changelogPartitionMapping;
+  @Mock
+  private MetricsRegistryMap metrics;
+  @Mock
+  private SystemAdmins systemAdmins;
+  @Mock
+  private Config config;
+  @Mock
+  private JobCoordinatorListener jobCoordinatorListener;
+
+  private StaticResourceJobCoordinator staticResourceJobCoordinator;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    
when(this.changelogStreamManager.readPartitionMapping()).thenReturn(this.changelogPartitionMapping);
+    this.staticResourceJobCoordinator =
+        spy(new StaticResourceJobCoordinator(PROCESSOR_ID, 
this.jobModelHelper, this.jobModelServingContext,
+            this.coordinatorCommunication, this.jobCoordinatorMetadataManager, 
this.startpointManager,
+            this.changelogStreamManager, this.metrics, this.systemAdmins, 
this.config));
+    this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener);
+  }
+
+  @Test
+  public void testNoExistingJobModel() throws IOException {
+    Config jobModelConfig = mock(Config.class);
+    JobModel jobModel = setupJobModel(jobModelConfig);
+    JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, 
jobModelConfig,
+        ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+    MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+    this.staticResourceJobCoordinator.start();
+    assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+    verifyStartLifecycle();
+    verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, 
SINGLE_SSP_FANOUT);
+    verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
+  }
+
+  @Test
+  public void testSameJobModelAsPrevious() throws IOException {
+    Config jobModelConfig = mock(Config.class);
+    JobModel jobModel = setupJobModel(jobModelConfig);
+    setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(), 
true);
+    MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+    this.staticResourceJobCoordinator.start();
+    assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+    verifyStartLifecycle();
+    verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, null, null);
+    verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
+  }
+
+  @Test
+  public void testNewDeploymentNewJobModel() throws IOException {
+    Config jobModelConfig = mock(Config.class);
+    JobModel jobModel = setupJobModel(jobModelConfig);
+    JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, 
jobModelConfig,
+        ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, 
JobMetadataChange.JOB_MODEL), true);
+    MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+    this.staticResourceJobCoordinator.start();
+    assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+    verifyStartLifecycle();
+    verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, 
SINGLE_SSP_FANOUT);
+    verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
+  }
+
+  @Test
+  public void testStop() {
+    this.staticResourceJobCoordinator.stop();
+
+    verify(this.jobCoordinatorListener).onJobModelExpired();
+    verify(this.coordinatorCommunication).stop();
+    verify(this.startpointManager).stop();
+    verify(this.systemAdmins).stop();
+    verify(this.jobCoordinatorListener).onCoordinatorStop();
+  }
+
+  /**
+   * Missing {@link StartpointManager} and {@link JobCoordinatorListener}.
+   */
+  @Test
+  public void testStartMissingOptionalComponents() throws IOException {
+    this.staticResourceJobCoordinator =
+        spy(new StaticResourceJobCoordinator(PROCESSOR_ID, 
this.jobModelHelper, this.jobModelServingContext,
+            this.coordinatorCommunication, this.jobCoordinatorMetadataManager, 
null, this.changelogStreamManager,
+            this.metrics, this.systemAdmins, this.config));
+
+    Config jobModelConfig = mock(Config.class);
+    JobModel jobModel = setupJobModel(jobModelConfig);
+    JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, 
jobModelConfig,
+        ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+    MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+    this.staticResourceJobCoordinator.start();
+    assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+    verify(this.systemAdmins).start();
+    verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, 
null);
+  }
+
+  @Test
+  public void testStopMissingOptionalComponents() {
+    this.staticResourceJobCoordinator =
+        spy(new StaticResourceJobCoordinator(PROCESSOR_ID, 
this.jobModelHelper, this.jobModelServingContext,
+            this.coordinatorCommunication, this.jobCoordinatorMetadataManager, 
null, this.changelogStreamManager,
+            this.metrics, this.systemAdmins, this.config));
+
+    this.staticResourceJobCoordinator.stop();
+
+    verify(this.coordinatorCommunication).stop();
+    verify(this.systemAdmins).stop();
+  }
+
+  /**
+   * Set up {@link JobModel} and {@link JobModelHelper} to return job model.
+   */
+  private JobModel setupJobModel(Config config) {
+    JobModel jobModel = mock(JobModel.class);
+    when(jobModel.getContainers()).thenReturn(CONTAINERS);
+    when(jobModel.getConfig()).thenReturn(config);
+    when(this.jobModelHelper.newJobModel(this.config, 
this.changelogPartitionMapping)).thenReturn(jobModel);
+    return jobModel;
+  }
+
+  /**
+   * Set up mocks for {@link JobCoordinatorMetadataManager}.
+   * {@code jobMetadataChanges} defines which changes should be detected by 
the {@link JobCoordinatorMetadataManager}
+   */
+  private JobCoordinatorMetadata setupJobCoordinatorMetadata(JobModel 
jobModel, Config jobModelConfig,
+      Set<JobMetadataChange> jobMetadataChanges, boolean hasPreviousMetadata) {
+    JobCoordinatorMetadata previousMetadata = hasPreviousMetadata ? 
mock(JobCoordinatorMetadata.class) : null;
+    JobCoordinatorMetadata newMetadata = mock(JobCoordinatorMetadata.class);
+    
when(this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel,
 jobModelConfig)).thenReturn(
+        newMetadata);
+    
when(this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
+    if (hasPreviousMetadata) {
+      
when(this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(
+          jobMetadataChanges);
+    } else {
+      
when(this.jobCoordinatorMetadataManager.checkForMetadataChanges(eq(newMetadata),
+          
isNull(JobCoordinatorMetadata.class))).thenReturn(jobMetadataChanges);
+    }
+    return newMetadata;
+  }
+
+  private MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    MetadataResourceUtil metadataResourceUtil = 
mock(MetadataResourceUtil.class);
+    
doReturn(metadataResourceUtil).when(this.staticResourceJobCoordinator).metadataResourceUtil(jobModel);
+    return metadataResourceUtil;
+  }
+
+  private void verifyStartLifecycle() {
+    verify(this.systemAdmins).start();
+    verify(this.startpointManager).start();
+  }
+
+  /**
+   * Common steps to verify when preparing workers for processing.
+   * @param jobModel job model to be served for workers
+   * @param metadataResourceUtil expected to be used for creating resources
+   * @param newMetadata if not null, expected to be written to {@link 
JobCoordinatorMetadataManager}
+   * @param expectedFanOut if not null, expected to be passed to {@link 
StartpointManager} for fan out
+   */
+  private void verifyPrepareWorkerExecution(JobModel jobModel, 
MetadataResourceUtil metadataResourceUtil,
+      JobCoordinatorMetadata newMetadata, Map<TaskName, 
Set<SystemStreamPartition>> expectedFanOut) throws IOException {
+    InOrder inOrder = inOrder(this.jobCoordinatorMetadataManager, 
this.jobModelServingContext, metadataResourceUtil,
+        this.startpointManager, this.coordinatorCommunication);
+    if (newMetadata != null) {
+      
inOrder.verify(this.jobCoordinatorMetadataManager).writeJobCoordinatorMetadata(newMetadata);
+    } else {
+      verify(this.jobCoordinatorMetadataManager, 
never()).writeJobCoordinatorMetadata(any());
+    }
+    inOrder.verify(this.jobModelServingContext).setJobModel(jobModel);
+    inOrder.verify(metadataResourceUtil).createResources();
+    if (expectedFanOut != null) {
+      inOrder.verify(this.startpointManager).fanOut(expectedFanOut);
+    } else {
+      verify(this.startpointManager, never()).fanOut(any());
+    }
+    inOrder.verify(this.coordinatorCommunication).start();
+  }
+}
\ No newline at end of file

Reply via email to