This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c570a3d SAMZA-2685: Add job coordinator which does not do resource
management (#1529)
c570a3d is described below
commit c570a3ddfc91becb6269926231f5737038f02fff
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Sep 28 13:06:24 2021 -0700
SAMZA-2685: Add job coordinator which does not do resource management
(#1529)
Adding a job coordinator which does not do resource management. For Samza
on YARN, the ClusterBasedJobCoordinator does resource management. However, for
Samza on Kubernetes, it is not necessary to have a job coordinator which does
resource management, because Kubernetes controllers can take care of resource
management.
API changes (all changes are backwards compatible):
1. Set the config job.coordinator.factory to
org.apache.samza.coordinator.staticresource.StaticResourceJobCoordinatorFactory
in order to use the new coordinator.
2. Set the config job.coordinator.restart.signal.factory to define how to
restart the Samza job when an input stream changes which will change the job
model. This plug-in is dependent on where the Samza job is running (e.g.
Kubernetes). Currently, there is only a no-op implementation of this restart
signal.
---
.../clustermanager/JobCoordinatorLaunchUtil.java | 71 +++++-
.../apache/samza/config/JobCoordinatorConfig.java | 14 +-
.../apache/samza/coordinator/JobCoordinator.java | 16 +-
.../NoProcessorJobCoordinatorListener.java | 62 +++++
.../communication/CoordinatorCommunication.java | 38 +++
.../CoordinatorCommunicationContext.java | 64 +++++
.../CoordinatorToWorkerCommunicationFactory.java | 35 +++
.../HttpCoordinatorCommunication.java | 60 +++++
...ttpCoordinatorToWorkerCommunicationFactory.java | 41 +++
.../coordinator/communication/JobInfoProvider.java | 30 +++
.../communication/JobInfoServingContext.java | 48 ++++
.../communication/JobModelHttpServlet.java | 75 ++++++
.../StaticResourceJobCoordinator.java | 168 +++++++++++++
.../StaticResourceJobCoordinatorFactory.java | 88 +++++++
.../apache/samza/metrics/BaseServerMetrics.java | 34 +++
.../TestJobCoordinatorLaunchUtil.java | 132 ++++++++--
.../samza/config/TestJobCoordinatorConfig.java | 53 ++++
.../TestNoProcessorJobCoordinatorListener.java | 55 +++++
.../TestHttpCoordinatorCommunication.java | 63 +++++
.../communication/TestJobModelHttpServlet.java | 128 ++++++++++
.../communication/TestJobModelServingContext.java | 76 ++++++
.../TestStaticResourceJobCoordinator.java | 275 +++++++++++++++++++++
22 files changed, 1589 insertions(+), 37 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index 63a1f5c..cb84956 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -19,6 +19,10 @@
package org.apache.samza.clustermanager;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.SamzaException;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
@@ -26,13 +30,23 @@ import
org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -40,6 +54,13 @@ import org.apache.samza.util.DiagnosticsUtil;
* This util is being used by both high/low and beam API Samza jobs.
*/
public class JobCoordinatorLaunchUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(JobCoordinatorLaunchUtil.class);
+ private static final String JOB_COORDINATOR_SOURCE_NAME = "JobCoordinator";
+ /**
+ * There is no processor associated with this job coordinator, so adding a
placeholder value.
+ */
+ private static final String JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER =
"samza-job-coordinator";
+
/**
* Run {@link ClusterBasedJobCoordinator} with full job config.
*
@@ -76,11 +97,51 @@ public class JobCoordinatorLaunchUtil {
CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
DiagnosticsUtil.createDiagnosticsStream(finalConfig);
- ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
- metrics,
- metadataStore,
- finalConfig);
- jc.run();
+ Optional<String> jobCoordinatorFactoryClassName =
+ new
JobCoordinatorConfig(config).getOptionalJobCoordinatorFactoryClassName();
+ if (jobCoordinatorFactoryClassName.isPresent()) {
+ runJobCoordinator(jobCoordinatorFactoryClassName.get(), metrics,
metadataStore, finalConfig);
+ } else {
+ ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics,
metadataStore, finalConfig);
+ jc.run();
+ }
+ }
+
+ private static void runJobCoordinator(String jobCoordinatorClassName,
MetricsRegistryMap metrics,
+ MetadataStore metadataStore, Config finalConfig) {
+ JobCoordinatorFactory jobCoordinatorFactory =
+ ReflectionUtil.getObj(jobCoordinatorClassName,
JobCoordinatorFactory.class);
+ JobCoordinator jobCoordinator =
+
jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER,
finalConfig, metrics,
+ metadataStore);
+ addShutdownHook(jobCoordinator);
+ Map<String, MetricsReporter> metricsReporters =
+ MetricsReporterLoader.getMetricsReporters(new
MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME);
+ metricsReporters.values()
+ .forEach(metricsReporter ->
metricsReporter.register(JOB_COORDINATOR_SOURCE_NAME, metrics));
+ metricsReporters.values().forEach(MetricsReporter::start);
+ CountDownLatch waitForShutdownLatch = new CountDownLatch(1);
+ jobCoordinator.setListener(new
NoProcessorJobCoordinatorListener(waitForShutdownLatch));
+ jobCoordinator.start();
+ try {
+ waitForShutdownLatch.await();
+ } catch (InterruptedException e) {
+ String errorMessage = "Error while waiting for coordinator to complete";
+ LOG.error(errorMessage, e);
+ throw new SamzaException(errorMessage, e);
+ } finally {
+ metricsReporters.values().forEach(MetricsReporter::stop);
+ }
+ }
+
+ /**
+ * This is a separate method so it can be stubbed in tests, since adding a
real shutdown hook will cause the hook to
+ * added to the test suite JVM.
+ */
+ @VisibleForTesting
+ static void addShutdownHook(JobCoordinator jobCoordinator) {
+ Runtime.getRuntime()
+ .addShutdownHook(new Thread(jobCoordinator::stop, "Samza Job
Coordinator Shutdown Hook Thread"));
}
private JobCoordinatorLaunchUtil() {}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index 5f2ca29..c97628c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -19,6 +19,7 @@
package org.apache.samza.config;
+import java.util.Optional;
import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
@@ -70,11 +71,12 @@ public class JobCoordinatorConfig extends MapConfig {
}
public String getJobCoordinatorFactoryClassName() {
- String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
- if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
- return ZkJobCoordinatorFactory.class.getName();
- } else {
- return jobCoordinatorFactoryClassName;
- }
+ return getOptionalJobCoordinatorFactoryClassName()
+ .filter(className -> !Strings.isNullOrEmpty(className))
+ .orElse(ZkJobCoordinatorFactory.class.getName());
+ }
+
+ public Optional<String> getOptionalJobCoordinatorFactoryClassName() {
+ return Optional.ofNullable(get(JOB_COORDINATOR_FACTORY));
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index cd10acb..f35c1fc 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -22,14 +22,10 @@ import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.job.model.JobModel;
/**
- * A JobCoordinator is a pluggable module in each process that provides the
JobModel and the ID to the StreamProcessor.
+ * A JobCoordinator is a pluggable module in each process that coordinates
with workers to do processing.
*
- * It is the responsibility of the JobCoordinator to assign a unique
identifier to the StreamProcessor
- * based on the underlying environment. In some cases, ID assignment is
completely config driven, while in other
- * cases, ID assignment may require coordination with JobCoordinators of
other StreamProcessors.
- *
- * StreamProcessor registers a {@link JobCoordinatorListener} in order to get
notified about JobModel changes and
- * Coordinator state change.
+ * The process running a {@link JobCoordinator} can register a {@link
JobCoordinatorListener} in order to get notified
+ * about JobModel changes and coordinator state change.
*
* <pre>
* {@code
@@ -38,9 +34,9 @@ import org.apache.samza.job.model.JobModel;
* * * onNewJobModel ************
*
* * *<<------------------------* Job *
*
* * * onJobModelExpired * Co- *
*
- * * *<<------------------------* ordinator*
*
- * * StreamProcessor * onCoordinatorStop * Listener * JobCoordinator
*
- * * *<<------------------------* *
*
+ * * StreamProcessor *<<------------------------* ordinator*
*
+ * * or * onCoordinatorStop * Listener * JobCoordinator
*
+ * * other JC process*<<------------------------* *
*
* * * onCoordinatorFailure * *
*
* * *<<------------------------************
*
* * * stop() *
*
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
b/samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
new file mode 100644
index 0000000..d6da8c6
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.job.model.JobModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link JobCoordinatorListener} for a {@link JobCoordinator} which does not
run alongside a processor.
+ */
+public class NoProcessorJobCoordinatorListener implements
JobCoordinatorListener {
+ private static final Logger LOG =
LoggerFactory.getLogger(NoProcessorJobCoordinatorListener.class);
+ private final CountDownLatch waitForShutdownLatch;
+
+ public NoProcessorJobCoordinatorListener(CountDownLatch
waitForShutdownLatch) {
+ this.waitForShutdownLatch = waitForShutdownLatch;
+ }
+
+ @Override
+ public void onJobModelExpired() {
+ // nothing to notify so far about job model expiration
+ }
+
+ @Override
+ public void onNewJobModel(String processorId, JobModel jobModel) {
+ // nothing to notify so far about new job model
+ }
+
+ @Override
+ public void onCoordinatorStop() {
+ this.waitForShutdownLatch.countDown();
+ }
+
+ /**
+ * There is currently no use case for bubbling up this exception, so just
log the error and allow shutdown for now. If
+ * we get a future use case where it is useful to bubble up the exception,
then we can update this class.
+ */
+ @Override
+ public void onCoordinatorFailure(Throwable t) {
+ LOG.error("Failure in coordinator, allowing shutdown to begin", t);
+ this.waitForShutdownLatch.countDown();
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java
new file mode 100644
index 0000000..daf0cc7
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunication.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+/**
+ * Interface for setting up communication components on the job coordinator
side for coordinator-to-worker
+ * communication. For example, this could be implemented by an HTTP server.
+ *
+ * See {@link CoordinatorCommunicationContext} for the communication paths
that need to be handled by this
+ * {@link CoordinatorCommunication} component.
+ */
+public interface CoordinatorCommunication {
+ /**
+ * Start the communication components.
+ */
+ void start();
+
+ /**
+ * Stop the communication components. This may be called even if {@link
#start()} has not yet been called.
+ */
+ void stop();
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
new file mode 100644
index 0000000..42fd860
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link
CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+ private final JobInfoProvider jobInfoProvider;
+ private final Config configForFactory;
+ private final MetricsRegistry metricsRegistry;
+
+ /**
+ * @param jobInfoProvider provides dynamic access to job info to pass along
to workers (e.g. job model, configs)
+ * @param configForFactory config to use to build the {@link
CoordinatorCommunication}
+ */
+ public CoordinatorCommunicationContext(JobInfoProvider jobInfoProvider,
Config configForFactory,
+ MetricsRegistry metricsRegistry) {
+ this.configForFactory = configForFactory;
+ this.jobInfoProvider = jobInfoProvider;
+ this.metricsRegistry = metricsRegistry;
+ }
+
+ /**
+ * Use this to access job model.
+ */
+ public JobInfoProvider getJobInfoProvider() {
+ return jobInfoProvider;
+ }
+
+ /**
+ * Use this to access the {@link Config} for building the {@link
CoordinatorCommunication}.
+ * Do not use this as the {@link Config} to pass along to the workers. Use
{@link #getJobInfoProvider()} for accessing
+ * the job model and config to pass along to workers.
+ */
+ public Config getConfigForFactory() {
+ return configForFactory;
+ }
+
+ public MetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorToWorkerCommunicationFactory.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorToWorkerCommunicationFactory.java
new file mode 100644
index 0000000..a0b8fa1
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorToWorkerCommunicationFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+/**
+ * Factory for generating components for coordinator-to-work communication.
+ *
+ * SAMZA-2673: There should be a "WorkerCommunication" interface which pairs
with the {@link CoordinatorCommunication},
+ * but that abstraction layer is not implemented yet. One communication path
to be handled by WorkerCommunication is the
+ * query to get the job model.
+ */
+public interface CoordinatorToWorkerCommunicationFactory {
+ /**
+ * Build a {@link CoordinatorCommunication} to handle communication with the
workers for the job.
+ */
+ CoordinatorCommunication
coordinatorCommunication(CoordinatorCommunicationContext context);
+
+ // SAMZA-2673: an opportunity for improvement here is to add the
WorkerCommunication layer
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorCommunication.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorCommunication.java
new file mode 100644
index 0000000..1b01a68
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorCommunication.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Runs an {@link HttpServer} to communicate with workers.
+ */
+public class HttpCoordinatorCommunication implements CoordinatorCommunication {
+ private static final Logger LOG =
LoggerFactory.getLogger(HttpCoordinatorCommunication.class);
+
+ private final HttpServer httpServer;
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ public HttpCoordinatorCommunication(HttpServer httpServer) {
+ this.httpServer = httpServer;
+ }
+
+ @Override
+ public void start() {
+ if (!this.isRunning.compareAndSet(false, true)) {
+ LOG.warn("Tried to start, but already started");
+ } else {
+ this.httpServer.start();
+ LOG.info("Started http server");
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (!this.isRunning.compareAndSet(true, false)) {
+ LOG.warn("Tried to stop, but not currently running");
+ } else {
+ this.httpServer.stop();
+ LOG.info("Stopped http server");
+ }
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
new file mode 100644
index 0000000..d7a2363
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements
CoordinatorToWorkerCommunicationFactory {
+ @Override
+ public CoordinatorCommunication
coordinatorCommunication(CoordinatorCommunicationContext context) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(context.getConfigForFactory());
+ JobModelHttpServlet jobModelHttpServlet = new
JobModelHttpServlet(context.getJobInfoProvider(),
+ new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+ HttpServer httpServer = new HttpServer("/",
clusterManagerConfig.getCoordinatorUrlPort(), null,
+ new ServletHolder(DefaultServlet.class));
+ httpServer.addServlet("/", jobModelHttpServlet);
+ return new HttpCoordinatorCommunication(httpServer);
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoProvider.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoProvider.java
new file mode 100644
index 0000000..32b5a33
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.util.Optional;
+
+
+/**
+ * Used by implementors of {@link CoordinatorCommunication} to access job
information for coordinator-to-worker
+ * communication.
+ */
+public interface JobInfoProvider {
+ Optional<byte[]> getSerializedJobModel();
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoServingContext.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoServingContext.java
new file mode 100644
index 0000000..b1914b5
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobInfoServingContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.util.Optional;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+
+
+/**
+ * Provides a way to access and set the job model.
+ * The {@link JobInfoProvider} part of this class is used by a {@link
CoordinatorCommunication} implementation, and the
+ * "set job model" part is called by the job coordinator when there is a new
job model.
+ */
+public class JobInfoServingContext implements JobInfoProvider {
+ private volatile byte[] serializedJobModel = null;
+
+ @Override
+ public Optional<byte[]> getSerializedJobModel() {
+ return Optional.ofNullable(this.serializedJobModel);
+ }
+
+ public void setJobModel(JobModel jobModel) {
+ try {
+ this.serializedJobModel =
SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel);
+ } catch (JsonProcessingException e) {
+ throw new SamzaException("Failed to serialize job model", e);
+ }
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobModelHttpServlet.java
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobModelHttpServlet.java
new file mode 100644
index 0000000..2e946c0
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/communication/JobModelHttpServlet.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.metrics.BaseServerMetrics;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link HttpServlet} which provides access to job model.
+ */
+public class JobModelHttpServlet extends HttpServlet {
+ private static final Logger LOG =
LoggerFactory.getLogger(JobModelHttpServlet.class);
+ private static final String CONTENT_TYPE = "application/json;charset=UTF-8";
+
+ private final JobInfoProvider jobInfoProvider;
+ private final Metrics metrics;
+
+ public JobModelHttpServlet(JobInfoProvider jobInfoProvider, Metrics metrics)
{
+ this.jobInfoProvider = jobInfoProvider;
+ this.metrics = metrics;
+ }
+
+ protected void doGet(HttpServletRequest request, HttpServletResponse
response) {
+ this.metrics.incomingRequests.inc();
+ Optional<byte[]> serializedJobModel =
this.jobInfoProvider.getSerializedJobModel();
+ if (serializedJobModel.isPresent()) {
+ try {
+ response.getOutputStream().write(serializedJobModel.get());
+ response.setContentType(CONTENT_TYPE);
+ response.setStatus(HttpServletResponse.SC_OK);
+ this.metrics.successfulResponses.inc();
+ } catch (IOException e) {
+ LOG.error("Failed to write response body", e);
+ this.metrics.failedResponses.inc();
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ } else {
+ LOG.warn("Received request for job model before job model is available");
+ this.metrics.failedResponses.inc();
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ }
+ }
+
+ public static class Metrics extends BaseServerMetrics {
+ private static final String GROUP =
JobModelHttpServlet.class.getSimpleName();
+
+ public Metrics(MetricsRegistry metricsRegistry) {
+ super(metricsRegistry, GROUP);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
new file mode 100644
index 0000000..b9fe86f
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.staticresource;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.coordinator.JobModelHelper;
+import org.apache.samza.coordinator.MetadataResourceUtil;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.JobInfoServingContext;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.SystemAdmins;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes
generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator,
but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an
external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator implements JobCoordinator {
+ private static final Logger LOG =
LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+ private final JobModelHelper jobModelHelper;
+ private final JobInfoServingContext jobModelServingContext;
+ private final CoordinatorCommunication coordinatorCommunication;
+ private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+ private final Optional<StartpointManager> startpointManager;
+ private final ChangelogStreamManager changelogStreamManager;
+ private final MetricsRegistry metrics;
+ private final SystemAdmins systemAdmins;
+ private final String processorId;
+ private final Config config;
+
+ private Optional<JobModel> currentJobModel = Optional.empty();
+ private Optional<JobCoordinatorListener> jobCoordinatorListener =
Optional.empty();
+
+ StaticResourceJobCoordinator(String processorId, JobModelHelper
jobModelHelper, JobInfoServingContext jobModelServingContext,
+ CoordinatorCommunication coordinatorCommunication,
JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+ StartpointManager startpointManager, ChangelogStreamManager
changelogStreamManager, MetricsRegistry metrics,
+ SystemAdmins systemAdmins, Config config) {
+ this.jobModelHelper = jobModelHelper;
+ this.jobModelServingContext = jobModelServingContext;
+ this.coordinatorCommunication = coordinatorCommunication;
+ this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+ this.startpointManager = Optional.ofNullable(startpointManager);
+ this.changelogStreamManager = changelogStreamManager;
+ this.metrics = metrics;
+ this.systemAdmins = systemAdmins;
+ this.processorId = processorId;
+ this.config = config;
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting job coordinator");
+ this.systemAdmins.start();
+ this.startpointManager.ifPresent(StartpointManager::start);
+ try {
+ JobModel jobModel = newJobModel();
+ JobCoordinatorMetadata newMetadata =
+
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel,
jobModel.getConfig());
+ Set<JobMetadataChange> jobMetadataChanges =
checkForMetadataChanges(newMetadata);
+ prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+ this.coordinatorCommunication.start();
+ this.currentJobModel = Optional.of(jobModel);
+ this.jobCoordinatorListener.ifPresent(listener ->
listener.onNewJobModel(this.processorId, jobModel));
+ } catch (Exception e) {
+ LOG.error("Error while running job coordinator; exiting", e);
+ throw new SamzaException("Error while running job coordinator", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+
this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onJobModelExpired);
+ this.coordinatorCommunication.stop();
+ this.startpointManager.ifPresent(StartpointManager::stop);
+ this.systemAdmins.stop();
+ } finally {
+
this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onCoordinatorStop);
+ }
+ }
+
+ @Override
+ public String getProcessorId() {
+ return this.processorId;
+ }
+
+ @Override
+ public void setListener(JobCoordinatorListener jobCoordinatorListener) {
+ this.jobCoordinatorListener = Optional.ofNullable(jobCoordinatorListener);
+ }
+
+ @Override
+ public JobModel getJobModel() {
+ return this.currentJobModel.orElse(null);
+ }
+
+ private JobModel newJobModel() {
+ return this.jobModelHelper.newJobModel(this.config,
this.changelogStreamManager.readPartitionMapping());
+ }
+
+ /**
+ * Run set up steps so that workers can begin processing:
+ * 1. Persist job coordinator metadata
+ * 2. Publish new job model on coordinator-to-worker communication channel
+ * 3. Create metadata resources
+ * 4. Handle startpoints
+ */
+ private void prepareWorkerExecution(JobModel jobModel,
JobCoordinatorMetadata newMetadata,
+ Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+ if (!jobMetadataChanges.isEmpty()) {
+
this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+ }
+ this.jobModelServingContext.setJobModel(jobModel);
+
+ metadataResourceUtil(jobModel).createResources();
+
+ // the fan out trigger logic comes from ClusterBasedJobCoordinator, in
which a new job model can trigger a fan out
+ if (this.startpointManager.isPresent() && !jobMetadataChanges.isEmpty()) {
+
this.startpointManager.get().fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+ }
+ }
+
+ @VisibleForTesting
+ MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+ return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+ }
+
+ private Set<JobMetadataChange>
checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+ JobCoordinatorMetadata previousMetadata =
this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+ return
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
new file mode 100644
index 0000000..23ee54c
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinatorFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.staticresource;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.JobModelCalculator;
+import org.apache.samza.coordinator.JobModelHelper;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import
org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import
org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobInfoServingContext;
+import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+
+
+public class StaticResourceJobCoordinatorFactory implements
JobCoordinatorFactory {
+ @Override
+ public JobCoordinator getJobCoordinator(String processorId, Config config,
MetricsRegistry metricsRegistry,
+ MetadataStore metadataStore) {
+ JobInfoServingContext jobModelServingContext = new JobInfoServingContext();
+ JobConfig jobConfig = new JobConfig(config);
+ CoordinatorCommunicationContext context =
+ new CoordinatorCommunicationContext(jobModelServingContext, config,
metricsRegistry);
+ CoordinatorCommunication coordinatorCommunication =
+ new
HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+ JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new
JobCoordinatorMetadataManager(
+ new NamespaceAwareCoordinatorStreamStore(metadataStore,
SetJobCoordinatorMetadataMessage.TYPE),
+ JobCoordinatorMetadataManager.ClusterType.NON_YARN, metricsRegistry);
+ ChangelogStreamManager changelogStreamManager =
+ new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+ StartpointManager startpointManager =
+ jobConfig.getStartpointEnabled() ? new
StartpointManager(metadataStore) : null;
+ SystemAdmins systemAdmins = new SystemAdmins(config,
StaticResourceJobCoordinator.class.getSimpleName());
+ StreamMetadataCache streamMetadataCache = new
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+ JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore,
streamMetadataCache);
+ return new StaticResourceJobCoordinator(processorId, jobModelHelper,
jobModelServingContext,
+ coordinatorCommunication, jobCoordinatorMetadataManager,
startpointManager, changelogStreamManager,
+ metricsRegistry, systemAdmins, config);
+ }
+
+ private static JobModelHelper buildJobModelHelper(MetadataStore
metadataStore,
+ StreamMetadataCache streamMetadataCache) {
+ LocalityManager localityManager =
+ new LocalityManager(new
NamespaceAwareCoordinatorStreamStore(metadataStore,
SetContainerHostMapping.TYPE));
+ TaskAssignmentManager taskAssignmentManager =
+ new TaskAssignmentManager(new
NamespaceAwareCoordinatorStreamStore(metadataStore,
SetTaskContainerMapping.TYPE),
+ new NamespaceAwareCoordinatorStreamStore(metadataStore,
SetTaskModeMapping.TYPE));
+ TaskPartitionAssignmentManager taskPartitionAssignmentManager = new
TaskPartitionAssignmentManager(
+ new NamespaceAwareCoordinatorStreamStore(metadataStore,
SetTaskPartitionMapping.TYPE));
+ return new JobModelHelper(localityManager, taskAssignmentManager,
taskPartitionAssignmentManager,
+ streamMetadataCache, JobModelCalculator.INSTANCE);
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/metrics/BaseServerMetrics.java
b/samza-core/src/main/java/org/apache/samza/metrics/BaseServerMetrics.java
new file mode 100644
index 0000000..b6b822c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/BaseServerMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics;
+
+/**
+ * Provides basic metrics for components that are serving data.
+ */
+public class BaseServerMetrics {
+ public final Counter incomingRequests;
+ public final Counter successfulResponses;
+ public final Counter failedResponses;
+
+ public BaseServerMetrics(MetricsRegistry metricsRegistry, String group) {
+ this.incomingRequests = metricsRegistry.newCounter(group,
"incoming-requests");
+ this.successfulResponses = metricsRegistry.newCounter(group,
"successful-responses");
+ this.failedResponses = metricsRegistry.newCounter(group,
"failed-responses");
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index af27423..9a95d33 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -18,28 +18,42 @@
*/
package org.apache.samza.clustermanager;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.application.MockStreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.ReflectionUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -49,20 +63,19 @@ import static
org.powermock.api.mockito.PowerMockito.verifyStatic;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({
- CoordinatorStreamUtil.class,
+@PrepareForTest({CoordinatorStreamUtil.class,
JobCoordinatorLaunchUtil.class,
CoordinatorStreamStore.class,
- RemoteJobPlanner.class})
+ RemoteJobPlanner.class,
+ ReflectionUtil.class,
+ MetricsReporterLoader.class,
+ NoProcessorJobCoordinatorListener.class})
public class TestJobCoordinatorLaunchUtil {
@Test
- public void testCreateFromConfigLoader() throws Exception {
- Map<String, String> config = new HashMap<>();
- config.put(JobConfig.CONFIG_LOADER_FACTORY,
PropertiesConfigLoaderFactory.class.getName());
- config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX +
"path",
- getClass().getResource("/test.properties").getPath());
- Config originalConfig = new JobConfig(ConfigUtil.loadConfig(new
MapConfig(config)));
- Config fullConfig = new MapConfig(originalConfig,
Collections.singletonMap("isAfterPlanning", "true"));
+ public void testRunClusterBasedJobCoordinator() throws Exception {
+ Config originalConfig = buildOriginalConfig(ImmutableMap.of());
+ JobConfig fullConfig =
+ new JobConfig(new MapConfig(originalConfig,
Collections.singletonMap("isAfterPlanning", "true")));
Config autoSizingConfig = new
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT,
"10"));
Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
@@ -77,15 +90,102 @@ public class TestJobCoordinatorLaunchUtil {
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
-
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(new
JobConfig(fullConfig)));
+
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullConfig));
JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class),
eq(mockCoordinatorStreamStore), eq(finalConfig));
verify(mockJC, times(1)).run();
verifyStatic(times(1));
- CoordinatorStreamUtil.createCoordinatorStream(any());
+ CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
verifyStatic(times(1));
- CoordinatorStreamUtil.writeConfigToCoordinatorStream(any(), anyBoolean());
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
+ }
+
+ @Test
+ public void testRunJobCoordinator() throws Exception {
+ String jobCoordinatorFactoryClass =
"org.apache.samza.custom.MyJobCoordinatorFactory";
+ Config originalConfig =
+
buildOriginalConfig(ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
jobCoordinatorFactoryClass));
+ JobConfig fullConfig =
+ new JobConfig(new MapConfig(originalConfig,
Collections.singletonMap("isAfterPlanning", "true")));
+ Config autoSizingConfig = new
MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT,
"10"));
+ Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
+
+ RemoteJobPlanner remoteJobPlanner = mock(RemoteJobPlanner.class);
+ CoordinatorStreamStore coordinatorStreamStore =
mock(CoordinatorStreamStore.class);
+ JobCoordinatorFactory jobCoordinatorFactory =
mock(JobCoordinatorFactory.class);
+ JobCoordinator jobCoordinator = mock(JobCoordinator.class);
+ // use a latch to keep track of when start has been called
+ CountDownLatch jobCoordinatorStartedLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ jobCoordinatorStartedLatch.countDown();
+ return null;
+ }).when(jobCoordinator).start();
+
+ PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+ PowerMockito.doNothing().when(CoordinatorStreamUtil.class,
"createCoordinatorStream", any());
+ PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class,
"buildCoordinatorStreamConfig", any());
+ PowerMockito.doReturn(autoSizingConfig)
+ .when(CoordinatorStreamUtil.class,
"readLaunchConfigFromCoordinatorStream", any(), any());
+
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(coordinatorStreamStore);
+
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(remoteJobPlanner);
+
when(remoteJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullConfig));
+ PowerMockito.mockStatic(ReflectionUtil.class);
+ PowerMockito.doReturn(jobCoordinatorFactory)
+ .when(ReflectionUtil.class, "getObj", jobCoordinatorFactoryClass,
JobCoordinatorFactory.class);
+ when(jobCoordinatorFactory.getJobCoordinator(eq("samza-job-coordinator"),
eq(finalConfig), any(),
+ eq(coordinatorStreamStore))).thenReturn(jobCoordinator);
+ PowerMockito.spy(JobCoordinatorLaunchUtil.class);
+ PowerMockito.doNothing().when(JobCoordinatorLaunchUtil.class,
"addShutdownHook", any());
+ MetricsReporter metricsReporter = mock(MetricsReporter.class);
+ Map<String, MetricsReporter> metricsReporterMap =
ImmutableMap.of("reporter", metricsReporter);
+ PowerMockito.mockStatic(MetricsReporterLoader.class);
+ PowerMockito.doReturn(metricsReporterMap)
+ .when(MetricsReporterLoader.class, "getMetricsReporters", new
MetricsConfig(finalConfig), "JobCoordinator");
+ NoProcessorJobCoordinatorListener jobCoordinatorListener =
mock(NoProcessorJobCoordinatorListener.class);
+
PowerMockito.whenNew(NoProcessorJobCoordinatorListener.class).withAnyArguments().thenReturn(jobCoordinatorListener);
+
+ Thread runThread = new Thread(() -> JobCoordinatorLaunchUtil.run(new
MockStreamApplication(), originalConfig));
+ runThread.start();
+ // wait for job coordinator to be started before doing verifications
+ jobCoordinatorStartedLatch.await();
+
+ verifyStatic();
+ CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
+ verifyStatic();
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
+ verifyStatic();
+ JobCoordinatorLaunchUtil.addShutdownHook(jobCoordinator);
+ InOrder inOrder = Mockito.inOrder(metricsReporter, jobCoordinator);
+ inOrder.verify(metricsReporter).register(eq("JobCoordinator"), any());
+ inOrder.verify(metricsReporter).start();
+ ArgumentCaptor<CountDownLatch> countDownLatchArgumentCaptor =
ArgumentCaptor.forClass(CountDownLatch.class);
+
verifyNew(NoProcessorJobCoordinatorListener.class).withArguments(countDownLatchArgumentCaptor.capture());
+ inOrder.verify(jobCoordinator).setListener(jobCoordinatorListener);
+ inOrder.verify(jobCoordinator).start();
+
+ // wait some time and then make sure the run thread is still alive
+ Thread.sleep(Duration.ofMillis(500).toMillis());
+ assertTrue(runThread.isAlive());
+
+ // trigger the count down latch so that the run thread can exit
+ countDownLatchArgumentCaptor.getValue().countDown();
+ runThread.join(Duration.ofSeconds(10).toMillis());
+ assertFalse(runThread.isAlive());
+
+ verify(metricsReporter).stop();
+ }
+
+ private static Config buildOriginalConfig(Map<String, String>
additionalConfig) {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("job.factory.class", "org.apache.samza.job.MockJobFactory");
+ configMap.put("job.name", "test-job");
+ configMap.put("foo", "bar");
+ configMap.put("systems.coordinator.samza.factory",
+
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
+ configMap.put("job.coordinator.system", "coordinator");
+ configMap.putAll(additionalConfig);
+ return new MapConfig(configMap);
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
new file mode 100644
index 0000000..ac7425f
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/config/TestJobCoordinatorConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobCoordinatorConfig {
+ @Test
+ public void getJobCoordinatorFactoryClassName() {
+ assertEquals(ZkJobCoordinatorFactory.class.getName(),
+ new JobCoordinatorConfig(new
MapConfig()).getJobCoordinatorFactoryClassName());
+
+ JobCoordinatorConfig jobCoordinatorConfig =
+ new JobCoordinatorConfig(new
MapConfig(ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "")));
+ assertEquals(ZkJobCoordinatorFactory.class.getName(),
jobCoordinatorConfig.getJobCoordinatorFactoryClassName());
+
+ jobCoordinatorConfig = new JobCoordinatorConfig(new MapConfig(
+ ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.custom.MyJobCoordinatorFactory")));
+ assertEquals("org.custom.MyJobCoordinatorFactory",
jobCoordinatorConfig.getJobCoordinatorFactoryClassName());
+ }
+
+ @Test
+ public void getOptionalJobCoordinatorFactoryClassName() {
+ assertFalse(new JobCoordinatorConfig(new
MapConfig()).getOptionalJobCoordinatorFactoryClassName().isPresent());
+
+ JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(new
MapConfig(
+ ImmutableMap.of(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
"org.custom.MyJobCoordinatorFactory")));
+
assertTrue(jobCoordinatorConfig.getOptionalJobCoordinatorFactoryClassName().isPresent());
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/TestNoProcessorJobCoordinatorListener.java
b/samza-core/src/test/java/org/apache/samza/coordinator/TestNoProcessorJobCoordinatorListener.java
new file mode 100644
index 0000000..2a4e0f5
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/TestNoProcessorJobCoordinatorListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.verify;
+
+
+public class TestNoProcessorJobCoordinatorListener {
+ @Mock
+ private CountDownLatch waitForShutdownLatch;
+
+ private NoProcessorJobCoordinatorListener noProcessorJobCoordinatorListener;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ this.noProcessorJobCoordinatorListener = new
NoProcessorJobCoordinatorListener(this.waitForShutdownLatch);
+ }
+
+ @Test
+ public void testOnCoordinatorStop() {
+ this.noProcessorJobCoordinatorListener.onCoordinatorStop();
+
+ verify(this.waitForShutdownLatch).countDown();
+ }
+
+ @Test
+ public void testOnCoordinatorFailure() {
+ this.noProcessorJobCoordinatorListener.onCoordinatorFailure(new
RuntimeException());
+
+ verify(this.waitForShutdownLatch).countDown();
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestHttpCoordinatorCommunication.java
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestHttpCoordinatorCommunication.java
new file mode 100644
index 0000000..d53a39c
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestHttpCoordinatorCommunication.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.coordinator.server.HttpServer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+
+public class TestHttpCoordinatorCommunication {
+ @Mock
+ private HttpServer httpServer;
+
+ private HttpCoordinatorCommunication httpCoordinatorCommunication;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ this.httpCoordinatorCommunication = new
HttpCoordinatorCommunication(this.httpServer);
+ }
+
+ @Test
+ public void testStartStop() {
+ this.httpCoordinatorCommunication.start();
+ verify(this.httpServer).start();
+ this.httpCoordinatorCommunication.start();
+ // consecutive stops should still only result in one start
+ verify(this.httpServer).start();
+
+ this.httpCoordinatorCommunication.stop();
+ verify(this.httpServer).stop();
+ this.httpCoordinatorCommunication.stop();
+ // consecutive stops should still only result in one stop
+ verify(this.httpServer).stop();
+ }
+
+ @Test
+ public void testStopOnly() {
+ this.httpCoordinatorCommunication.stop();
+ verify(this.httpServer, never()).stop();
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelHttpServlet.java
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelHttpServlet.java
new file mode 100644
index 0000000..8c80d43
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelHttpServlet.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalMatchers.aryEq;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestJobModelHttpServlet {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Mock
+ private JobInfoProvider jobInfoProvider;
+ @Mock
+ private HttpServletResponse httpServletResponse;
+ @Mock
+ private ServletOutputStream servletOutputStream;
+
+ private JobModelHttpServlet.Metrics metrics;
+ private JobModelHttpServlet jobModelHttpServlet;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ this.metrics = new JobModelHttpServlet.Metrics(new MetricsRegistryMap());
+ this.jobModelHttpServlet = new JobModelHttpServlet(this.jobInfoProvider,
this.metrics);
+ }
+
+ @Test
+ public void testDoGet() throws IOException {
+ JobModel jobModel = jobModel();
+ when(this.jobInfoProvider.getSerializedJobModel()).thenReturn(
+ Optional.of(OBJECT_MAPPER.writeValueAsBytes(jobModel)));
+
when(this.httpServletResponse.getOutputStream()).thenReturn(this.servletOutputStream);
+
+ this.jobModelHttpServlet.doGet(mock(HttpServletRequest.class),
this.httpServletResponse);
+
+
verify(this.httpServletResponse).setContentType("application/json;charset=UTF-8");
+ verify(this.httpServletResponse).setStatus(HttpServletResponse.SC_OK);
+
verify(this.servletOutputStream).write(aryEq(OBJECT_MAPPER.writeValueAsBytes(jobModel)));
+ assertEquals(1, this.metrics.incomingRequests.getCount());
+ assertEquals(1, this.metrics.successfulResponses.getCount());
+ assertEquals(0, this.metrics.failedResponses.getCount());
+ }
+
+ @Test
+ public void testDoGetMissingJobModel() throws IOException {
+
when(this.jobInfoProvider.getSerializedJobModel()).thenReturn(Optional.empty());
+
+ this.jobModelHttpServlet.doGet(mock(HttpServletRequest.class),
this.httpServletResponse);
+
+
verify(this.httpServletResponse).setStatus(HttpServletResponse.SC_NOT_FOUND);
+ verify(this.httpServletResponse, never()).setContentType(any());
+ verify(this.httpServletResponse, never()).getOutputStream();
+ assertEquals(1, this.metrics.incomingRequests.getCount());
+ assertEquals(0, this.metrics.successfulResponses.getCount());
+ assertEquals(1, this.metrics.failedResponses.getCount());
+ }
+
+ @Test
+ public void testDoGetFailureToWriteResponse() throws IOException {
+ when(this.jobInfoProvider.getSerializedJobModel()).thenReturn(
+ Optional.of(OBJECT_MAPPER.writeValueAsBytes(jobModel())));
+
when(this.httpServletResponse.getOutputStream()).thenReturn(this.servletOutputStream);
+ doThrow(new IOException("failure to write to output
stream")).when(this.servletOutputStream).write(any());
+
+ this.jobModelHttpServlet.doGet(mock(HttpServletRequest.class),
this.httpServletResponse);
+
+
verify(this.httpServletResponse).setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ verify(this.httpServletResponse, never()).setContentType(any());
+ assertEquals(1, this.metrics.incomingRequests.getCount());
+ assertEquals(0, this.metrics.successfulResponses.getCount());
+ assertEquals(1, this.metrics.failedResponses.getCount());
+ }
+
+ private static JobModel jobModel() {
+ Config config = new MapConfig(ImmutableMap.of("samza.user.config",
"config-value"));
+ Map<String, ContainerModel> containerModelMap = ImmutableMap.of("0", new
ContainerModel("0",
+ ImmutableMap.of(new TaskName("Partition 0"), new TaskModel(new
TaskName("Partition 0"),
+ ImmutableSet.of(new SystemStreamPartition("system", "stream", new
Partition(0))), new Partition(0)))));
+ return new JobModel(config, containerModelMap);
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelServingContext.java
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelServingContext.java
new file mode 100644
index 0000000..16cac04
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/communication/TestJobModelServingContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobModelServingContext {
+ private JobInfoServingContext jobModelServingContext;
+
+ @Before
+ public void setup() {
+ this.jobModelServingContext = new JobInfoServingContext();
+ }
+
+ @Test
+ public void testSetGet() throws IOException {
+ // return empty if no job model has been set
+
assertFalse(this.jobModelServingContext.getSerializedJobModel().isPresent());
+
+ Config config = new MapConfig(ImmutableMap.of("samza.user.config",
"config-value"));
+ Map<String, ContainerModel> containerModelMap = ImmutableMap.of("0", new
ContainerModel("0",
+ ImmutableMap.of(new TaskName("Partition 0"), new TaskModel(new
TaskName("Partition 0"),
+ ImmutableSet.of(new SystemStreamPartition("system", "stream", new
Partition(0))), new Partition(0)))));
+ JobModel jobModel = new JobModel(config, containerModelMap);
+ this.jobModelServingContext.setJobModel(jobModel);
+ Optional<byte[]> serializedJobModel =
this.jobModelServingContext.getSerializedJobModel();
+ assertTrue(serializedJobModel.isPresent());
+ assertEquals(jobModel,
SamzaObjectMapper.getObjectMapper().readValue(serializedJobModel.get(),
JobModel.class));
+
+ config = new MapConfig(ImmutableMap.of("samza.user.config0",
"config-value0"));
+ containerModelMap = ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(new TaskName("Partition 0"),
+ new TaskModel(new TaskName("Partition 0"),
+ ImmutableSet.of(new SystemStreamPartition("system0", "stream0",
new Partition(0))), new Partition(0)))));
+ jobModel = new JobModel(config, containerModelMap);
+ this.jobModelServingContext.setJobModel(jobModel);
+ serializedJobModel = this.jobModelServingContext.getSerializedJobModel();
+ assertTrue(serializedJobModel.isPresent());
+ assertEquals(jobModel,
SamzaObjectMapper.getObjectMapper().readValue(serializedJobModel.get(),
JobModel.class));
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
new file mode 100644
index 0000000..4cbfe3b
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.staticresource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.coordinator.JobModelHelper;
+import org.apache.samza.coordinator.MetadataResourceUtil;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.JobInfoServingContext;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * In general, these tests start the {@link StaticResourceJobCoordinator} in a
separate thread and then execute certain
+ * actions (trigger callbacks which will change the job model, trigger
shutdown) to check the coordination flow.
+ */
+public class TestStaticResourceJobCoordinator {
+ private static final String PROCESSOR_ID = "samza-job-coordinator";
+ private static final SystemStream SYSTEM_STREAM = new SystemStream("system",
"stream");
+ private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
+ private static final Map<String, ContainerModel> CONTAINERS =
ImmutableMap.of("0", new ContainerModel("0",
+ ImmutableMap.of(TASK_NAME,
+ new TaskModel(TASK_NAME, ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM, new Partition(0))),
+ new Partition(0)))));
+ private static final Map<TaskName, Set<SystemStreamPartition>>
SINGLE_SSP_FANOUT =
+ ImmutableMap.of(TASK_NAME, ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM, new Partition(0))));
+
+ @Mock
+ private JobModelHelper jobModelHelper;
+ @Mock
+ private JobInfoServingContext jobModelServingContext;
+ @Mock
+ private CoordinatorCommunication coordinatorCommunication;
+ @Mock
+ private JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+ @Mock
+ private StartpointManager startpointManager;
+ @Mock
+ private ChangelogStreamManager changelogStreamManager;
+ @Mock
+ private Map<TaskName, Integer> changelogPartitionMapping;
+ @Mock
+ private MetricsRegistryMap metrics;
+ @Mock
+ private SystemAdmins systemAdmins;
+ @Mock
+ private Config config;
+ @Mock
+ private JobCoordinatorListener jobCoordinatorListener;
+
+ private StaticResourceJobCoordinator staticResourceJobCoordinator;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
when(this.changelogStreamManager.readPartitionMapping()).thenReturn(this.changelogPartitionMapping);
+ this.staticResourceJobCoordinator =
+ spy(new StaticResourceJobCoordinator(PROCESSOR_ID,
this.jobModelHelper, this.jobModelServingContext,
+ this.coordinatorCommunication, this.jobCoordinatorMetadataManager,
this.startpointManager,
+ this.changelogStreamManager, this.metrics, this.systemAdmins,
this.config));
+ this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener);
+ }
+
+ @Test
+ public void testNoExistingJobModel() throws IOException {
+ Config jobModelConfig = mock(Config.class);
+ JobModel jobModel = setupJobModel(jobModelConfig);
+ JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
+ ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+ this.staticResourceJobCoordinator.start();
+ assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+ verifyStartLifecycle();
+ verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata,
SINGLE_SSP_FANOUT);
+ verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
+ }
+
+ @Test
+ public void testSameJobModelAsPrevious() throws IOException {
+ Config jobModelConfig = mock(Config.class);
+ JobModel jobModel = setupJobModel(jobModelConfig);
+ setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(),
true);
+ MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+ this.staticResourceJobCoordinator.start();
+ assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+ verifyStartLifecycle();
+ verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, null, null);
+ verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
+ }
+
+ @Test
+ public void testNewDeploymentNewJobModel() throws IOException {
+ Config jobModelConfig = mock(Config.class);
+ JobModel jobModel = setupJobModel(jobModelConfig);
+ JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
+ ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT,
JobMetadataChange.JOB_MODEL), true);
+ MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+ this.staticResourceJobCoordinator.start();
+ assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+ verifyStartLifecycle();
+ verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata,
SINGLE_SSP_FANOUT);
+ verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
+ }
+
+ @Test
+ public void testStop() {
+ this.staticResourceJobCoordinator.stop();
+
+ verify(this.jobCoordinatorListener).onJobModelExpired();
+ verify(this.coordinatorCommunication).stop();
+ verify(this.startpointManager).stop();
+ verify(this.systemAdmins).stop();
+ verify(this.jobCoordinatorListener).onCoordinatorStop();
+ }
+
+ /**
+ * Missing {@link StartpointManager} and {@link JobCoordinatorListener}.
+ */
+ @Test
+ public void testStartMissingOptionalComponents() throws IOException {
+ this.staticResourceJobCoordinator =
+ spy(new StaticResourceJobCoordinator(PROCESSOR_ID,
this.jobModelHelper, this.jobModelServingContext,
+ this.coordinatorCommunication, this.jobCoordinatorMetadataManager,
null, this.changelogStreamManager,
+ this.metrics, this.systemAdmins, this.config));
+
+ Config jobModelConfig = mock(Config.class);
+ JobModel jobModel = setupJobModel(jobModelConfig);
+ JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
+ ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
+
+ this.staticResourceJobCoordinator.start();
+ assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
+ verify(this.systemAdmins).start();
+ verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata,
null);
+ }
+
+ @Test
+ public void testStopMissingOptionalComponents() {
+ this.staticResourceJobCoordinator =
+ spy(new StaticResourceJobCoordinator(PROCESSOR_ID,
this.jobModelHelper, this.jobModelServingContext,
+ this.coordinatorCommunication, this.jobCoordinatorMetadataManager,
null, this.changelogStreamManager,
+ this.metrics, this.systemAdmins, this.config));
+
+ this.staticResourceJobCoordinator.stop();
+
+ verify(this.coordinatorCommunication).stop();
+ verify(this.systemAdmins).stop();
+ }
+
+ /**
+ * Set up {@link JobModel} and {@link JobModelHelper} to return job model.
+ */
+ private JobModel setupJobModel(Config config) {
+ JobModel jobModel = mock(JobModel.class);
+ when(jobModel.getContainers()).thenReturn(CONTAINERS);
+ when(jobModel.getConfig()).thenReturn(config);
+ when(this.jobModelHelper.newJobModel(this.config,
this.changelogPartitionMapping)).thenReturn(jobModel);
+ return jobModel;
+ }
+
+ /**
+ * Set up mocks for {@link JobCoordinatorMetadataManager}.
+ * {@code jobMetadataChanges} defines which changes should be detected by
the {@link JobCoordinatorMetadataManager}
+ */
+ private JobCoordinatorMetadata setupJobCoordinatorMetadata(JobModel
jobModel, Config jobModelConfig,
+ Set<JobMetadataChange> jobMetadataChanges, boolean hasPreviousMetadata) {
+ JobCoordinatorMetadata previousMetadata = hasPreviousMetadata ?
mock(JobCoordinatorMetadata.class) : null;
+ JobCoordinatorMetadata newMetadata = mock(JobCoordinatorMetadata.class);
+
when(this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel,
jobModelConfig)).thenReturn(
+ newMetadata);
+
when(this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
+ if (hasPreviousMetadata) {
+
when(this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(
+ jobMetadataChanges);
+ } else {
+
when(this.jobCoordinatorMetadataManager.checkForMetadataChanges(eq(newMetadata),
+
isNull(JobCoordinatorMetadata.class))).thenReturn(jobMetadataChanges);
+ }
+ return newMetadata;
+ }
+
+ private MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+ MetadataResourceUtil metadataResourceUtil =
mock(MetadataResourceUtil.class);
+
doReturn(metadataResourceUtil).when(this.staticResourceJobCoordinator).metadataResourceUtil(jobModel);
+ return metadataResourceUtil;
+ }
+
+ private void verifyStartLifecycle() {
+ verify(this.systemAdmins).start();
+ verify(this.startpointManager).start();
+ }
+
+ /**
+ * Common steps to verify when preparing workers for processing.
+ * @param jobModel job model to be served for workers
+ * @param metadataResourceUtil expected to be used for creating resources
+ * @param newMetadata if not null, expected to be written to {@link
JobCoordinatorMetadataManager}
+ * @param expectedFanOut if not null, expected to be passed to {@link
StartpointManager} for fan out
+ */
+ private void verifyPrepareWorkerExecution(JobModel jobModel,
MetadataResourceUtil metadataResourceUtil,
+ JobCoordinatorMetadata newMetadata, Map<TaskName,
Set<SystemStreamPartition>> expectedFanOut) throws IOException {
+ InOrder inOrder = inOrder(this.jobCoordinatorMetadataManager,
this.jobModelServingContext, metadataResourceUtil,
+ this.startpointManager, this.coordinatorCommunication);
+ if (newMetadata != null) {
+
inOrder.verify(this.jobCoordinatorMetadataManager).writeJobCoordinatorMetadata(newMetadata);
+ } else {
+ verify(this.jobCoordinatorMetadataManager,
never()).writeJobCoordinatorMetadata(any());
+ }
+ inOrder.verify(this.jobModelServingContext).setJobModel(jobModel);
+ inOrder.verify(metadataResourceUtil).createResources();
+ if (expectedFanOut != null) {
+ inOrder.verify(this.startpointManager).fanOut(expectedFanOut);
+ } else {
+ verify(this.startpointManager, never()).fanOut(any());
+ }
+ inOrder.verify(this.coordinatorCommunication).start();
+ }
+}
\ No newline at end of file