SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator and SamzaContainer
See SAMZA-1212 for motivation toward this refactoring. Changes here are: * Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator and SamzaContainer * Introduced SamzaContainerListener and JobCoordinatorListener interface implemented by StreamProcessor * Introduced SamzaContainerStatus to handler failures and lifecycle using Listener interfaces Author: Navina Ramesh <[email protected]> Reviewers: Xinyu Liu <[email protected]>, Prateek Maheshwari <[email protected]> Closes #148 from navina/SAMZA-1212 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/475b4654 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/475b4654 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/475b4654 Branch: refs/heads/master Commit: 475b4654cae52a4f92fe39c931a0e673048dd027 Parents: 0fb025b Author: Navina Ramesh <[email protected]> Authored: Wed May 3 15:10:13 2017 -0700 Committer: nramesh <[email protected]> Committed: Wed May 3 15:10:13 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/SamzaContainerStatus.java | 68 ++++ .../samza/container/SamzaContainerListener.java | 55 ++++ .../samza/coordinator/JobCoordinator.java | 51 +-- .../coordinator/JobCoordinatorFactory.java | 8 +- .../coordinator/JobCoordinatorListener.java | 61 ++++ .../processor/SamzaContainerController.java | 164 ---------- .../apache/samza/processor/StreamProcessor.java | 307 ++++++++++++++---- .../StreamProcessorLifecycleListener.java | 3 + .../samza/runtime/LocalContainerRunner.java | 26 +- .../standalone/StandaloneJobCoordinator.java | 119 ++++--- .../StandaloneJobCoordinatorFactory.java | 5 +- .../samza/zk/ZkBarrierForVersionUpgrade.java | 1 - .../org/apache/samza/zk/ZkControllerImpl.java | 15 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 44 +-- .../samza/zk/ZkJobCoordinatorFactory.java | 7 +- .../apache/samza/container/SamzaContainer.scala | 134 ++++++-- .../org/apache/samza/job/local/ThreadJob.scala | 10 +- .../samza/job/local/ThreadJobFactory.scala | 51 +-- .../samza/processor/TestStreamProcessor.java | 176 +++++++++++ .../samza/container/TestSamzaContainer.scala | 316 +++++++++++++++++-- .../processor/StreamProcessorTestUtils.scala | 67 ++++ .../system/kafka/TestKafkaSystemAdminJava.java | 12 +- .../test/processor/TestStreamProcessor.java | 2 +- .../test/integration/StreamTaskTestUtil.scala | 3 +- 24 files changed, 1259 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java b/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java new file mode 100644 index 0000000..4565de6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza; + + +/** + * <pre> + * runloop completed [OR] + * container.run() runloop.run container.shutdown() + * NOT_STARTED -----------------> STARTING ------------> STARTED -----------------------> STOPPED + * | Error in runloop | + * | [OR] Error when | + * Error when | stopping | + * starting components | components | + * V | + * FAILED <-------------------| + * </pre> + */ + +/** + * Indicates the current status of a {@link org.apache.samza.container.SamzaContainer} + */ +public enum SamzaContainerStatus { + /** + * Indicates that the container has not been started + */ + NOT_STARTED, + + /** + * Indicates that the container is starting all the components required by the + * {@link org.apache.samza.container.RunLoop} for processing + */ + STARTING, + + /** + * Indicates that the container started the {@link org.apache.samza.container.RunLoop} + */ + STARTED, + + /** + * Indicates that the container was successfully stopped either due to task-initiated shutdown + * (eg. end-of-stream triggered shutdown or application-driven shutdown of all tasks and hence, the container) or + * due to external shutdown requests (eg. from {@link org.apache.samza.processor.StreamProcessor}) + */ + STOPPED, + + /** + * Indicates that the container failed during any of its 3 active states - + * {@link #STARTING}, {@link #STARTED}, {@link #STOPPED} + */ + FAILED +} http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java new file mode 100644 index 0000000..a9c3b2c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container; + +/** + * A Listener for {@link org.apache.samza.container.SamzaContainer} lifecycle events. + */ +public interface SamzaContainerListener { + + /** + * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to + * the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the + * {@link org.apache.samza.container.RunLoop} + */ + void onContainerStart(); + + /** + * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to + * {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in + * {@link org.apache.samza.SamzaContainerStatus} + * <br> + * <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any + * exceptions/errors. + * @param pausedByJm boolean indicating why the container was stopped. It should be {@literal true}, iff the container + * was stopped as a result of an expired {@link org.apache.samza.job.model.JobModel}. Otherwise, + * it should be {@literal false} + */ + void onContainerStop(boolean pausedByJm); + + /** + * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has transitioned to + * {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in + * {@link org.apache.samza.SamzaContainerStatus} + * <br> + * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop(boolean)}. + * @param t Throwable that caused the container failure. + */ + void onContainerFailed(Throwable t); +} http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java index af2ef6a..bd06039 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java @@ -28,39 +28,41 @@ import org.apache.samza.job.model.JobModel; * based on the underlying environment. In some cases, ID assignment is completely config driven, while in other * cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors. * - * This interface contains methods required for the StreamProcessor to interact with JobCoordinator. + * StreamProcessor registers a {@link JobCoordinatorListener} in order to get notified about JobModel changes and + * Coordinator state change. + * + * <pre> + * {@code + * ******************* start() ******************** + * * *----------------------------------->>* * + * * * onNewJobModel ************ * + * * *<<------------------------* Job * * + * * * onJobModelExpired * Co- * * + * * *<<------------------------* ordinator* * + * * StreamProcessor * onCoordinatorStop * Listener * JobCoordinator * + * * *<<------------------------* * * + * * * onCoordinatorFailure * * * + * * *<<------------------------************ * + * * * stop() * * + * * *----------------------------------->>* * + * ******************* ******************** + * } + * </pre> */ @InterfaceStability.Evolving public interface JobCoordinator { /** - * Starts the JobCoordinator which involves one or more of the following: - * * LeaderElector Module initialization, if any - * * If leader, generate JobModel. Else, read JobModel + * Starts the JobCoordinator, which generally consists of participating in LeaderElection and listening for JobModel + * changes. */ void start(); /** - * Cleanly shutting down the JobCoordinator involves: - * * Shutting down the Container - * * Shutting down the LeaderElection module (TBD: details depending on leader or not) + * Stops the JobCoordinator and notifies the registered {@link JobCoordinatorListener}, if any */ void stop(); /** - * Waits for a specified amount of time for the JobCoordinator to fully start-up, which means it should be ready to - * process messages. - * In a Standalone use-case, it may be sufficient to wait for the container to start-up. - * In a ZK based Standalone use-case, it also includes registration with ZK, initialization of the - * leader elector module, container start-up etc. - * - * @param timeoutMs Maximum time to wait, in milliseconds - * @return {@code true}, if the JobCoordinator is started within the specified wait time and {@code false} if the - * waiting time elapsed - * @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up - */ - boolean awaitStart(long timeoutMs) throws InterruptedException; - - /** * Returns the identifier assigned to the processor that is local to the instance of StreamProcessor. * * The semantics and format of the identifier returned should adhere to the specification defined in @@ -71,6 +73,13 @@ public interface JobCoordinator { String getProcessorId(); /** + * Registers a {@link JobCoordinatorListener} to receive notification on coordinator state changes and job model changes + * + * @param listener An instance of {@link JobCoordinatorListener} + */ + void setListener(JobCoordinatorListener listener); + + /** * Returns the current JobModel * The implementation of the JobCoordinator in the leader needs to know how to read the config and generate JobModel * In case of a non-leader, the JobCoordinator should simply fetch the jobmodel http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java index 7f7e1ed..784d48d 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java @@ -20,17 +20,13 @@ package org.apache.samza.coordinator; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; -import org.apache.samza.processor.SamzaContainerController; - @InterfaceStability.Evolving public interface JobCoordinatorFactory { /** - * @param processorId {@link org.apache.samza.processor.StreamProcessor} id + * @param processorId Identifier for {@link org.apache.samza.processor.StreamProcessor} instance * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" - * @param containerController Controller interface for starting and stopping container. In future, it may simply - * pause the container and add/remove tasks * @return An instance of IJobCoordinator */ - JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController); + JobCoordinator getJobCoordinator(String processorId, Config config); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java new file mode 100644 index 0000000..8e17032 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.job.model.JobModel; + +/** + * Listener interface that can be registered with a {@link org.apache.samza.coordinator.JobCoordinator} instance in order + * to receive notifications. + */ +public interface JobCoordinatorListener { + /** + * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} in the following scenarios: + * <ul> + * <li>the existing {@link JobModel} is no longer valid due to either re-balancing </li> + * <li>JobCoordinator is shutting down</li> + * </ul> + */ + void onJobModelExpired(); + + /** + * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when there is new {@link JobModel} + * available for use by the processor. + * + * @param processorId String, representing the identifier of {@link org.apache.samza.processor.StreamProcessor} + * @param jobModel Current {@link JobModel} containing a {@link org.apache.samza.job.model.ContainerModel} for the + * given processorId + */ + // TODO: Can change interface to ContainerModel if maxChangelogStreamPartitions can be made a part of ContainerModel + void onNewJobModel(String processorId, JobModel jobModel); + + /** + * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when it is shutting without any errors + */ + void onCoordinatorStop(); + + /** + * + * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when it is shutting down with error. + * <b>Note</b>: This should be the last call after completely shutting down the JobCoordinator. + * + * @param t Throwable that was the cause of the JobCoordinator failure + */ + void onCoordinatorFailure(Throwable t); +} http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java deleted file mode 100644 index 4af413a..0000000 --- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.processor; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.samza.config.ClusterManagerConfig; -import org.apache.samza.config.Config; -import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.container.LocalityManager; -import org.apache.samza.container.SamzaContainer; -import org.apache.samza.container.SamzaContainer$; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.metrics.JmxServer; -import org.apache.samza.metrics.MetricsReporter; -import org.apache.samza.util.Util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SamzaContainerController { - private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class); - - private ExecutorService executorService; - private volatile SamzaContainer container; - private final Map<String, MetricsReporter> metricsReporterMap; - private final Object taskFactory; - private final long containerShutdownMs; - private final StreamProcessorLifecycleListener lifecycleListener; - - // Internal Member Variables - private Future containerFuture; - - /** - * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer} - * Requests to execute a container are submitted to the {@link ExecutorService} - * - * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or - * {@link org.apache.samza.task.AsyncStreamTask} - * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances - * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance - * @param lifecycleListener {@link StreamProcessorLifecycleListener} - */ - public SamzaContainerController( - Object taskFactory, - long containerShutdownMs, - Map<String, MetricsReporter> metricsReporterMap, - StreamProcessorLifecycleListener lifecycleListener) { - this.taskFactory = taskFactory; - this.metricsReporterMap = metricsReporterMap; - if (containerShutdownMs == -1) { - this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS; - } else { - this.containerShutdownMs = containerShutdownMs; - } - // life cycle callbacks when shutdown and failure happens - this.lifecycleListener = lifecycleListener; - } - - /** - * Instantiates a container and submits to the executor. This method does not actually wait for the container to - * fully start-up. For such a behavior, see {@link #awaitStart(long)} - * <p> - * <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to - * ensure that the container has been stopped with stopContainer before invoking this method.</i> - * - * @param containerModel {@link ContainerModel} instance to use for the current run of the Container - * @param config Complete configuration map used by the Samza job - * @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams - * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments - */ - public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) { - LocalityManager localityManager = null; - if (new ClusterManagerConfig(config).getHostAffinityEnabled()) { - localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), config); - } - log.info("About to create container: " + containerModel.getProcessorId()); - container = SamzaContainer$.MODULE$.apply( - containerModel.getProcessorId(), - containerModel, - config, - maxChangelogStreamPartitions, - localityManager, - new JmxServer(), - Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap), - taskFactory); - log.info("About to start container: " + containerModel.getProcessorId()); - executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("p-" + containerModel.getProcessorId() + "-container-thread-%d").build()); - containerFuture = executorService.submit(() -> { - try { - container.run(); - lifecycleListener.onShutdown(); - } catch (Throwable t) { - lifecycleListener.onFailure(t); - } - }); - } - - /** - * Method waits for a specified amount of time for the container to fully start-up, which consists of class-loading - * all the components and start message processing - * - * @param timeoutMs Maximum time to wait, in milliseconds - * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting - * time elapsed - * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up - */ - public boolean awaitStart(long timeoutMs) throws InterruptedException { - return container.awaitStart(timeoutMs); - } - - /** - * Stops a running container, if any. Invoking this method multiple times does not have any side-effects. - */ - public void stopContainer() { - if (container == null) { - log.warn("Shutdown before a container was created."); - return; - } - - container.shutdown(); - try { - if (containerFuture != null) - containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException e) { - log.error("Ran into problems while trying to stop the container in the processor!", e); - } catch (TimeoutException e) { - log.warn("Got Timeout Exception while trying to stop the container in the processor! The processor may not shutdown properly", e); - } - } - - /** - * Shutsdown the controller by first stop any running container and then, shutting down the {@link ExecutorService} - */ - public void shutdown() { - stopContainer(); - if (executorService != null) { - executorService.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 1910594..6329f6c 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -18,47 +18,64 @@ */ package org.apache.samza.processor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.samza.SamzaContainerStatus; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.IllegalContainerStateException; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.coordinator.JobCoordinatorListener; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an * independent process. * <p> - * <b>Usage Example:</b> - * <pre> - * StreamProcessor processor = new StreamProcessor(1, config); - * processor.start(); - * try { - * boolean status = processor.awaitStart(TIMEOUT_MS); // Optional - blocking call - * if (!status) { - * // Timed out - * } - * ... - * } catch (InterruptedException ie) { - * ... - * } finally { - * processor.stop(); - * } - * </pre> - * Note: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in + * + * <b>Note</b>: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in * multiple threads. */ @InterfaceStability.Evolving public class StreamProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class); + private final JobCoordinator jobCoordinator; - private final StreamProcessorLifecycleListener lifecycleListener; - private final String processorId; + private final StreamProcessorLifecycleListener processorListener; + private final Object taskFactory; + private final Map<String, MetricsReporter> customMetricsReporter; + private final Config config; + private final long taskShutdownMs; + + private ExecutorService executorService; + + private volatile SamzaContainer container = null; + // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is + // stopped due to re-balancing + private volatile CountDownLatch jcContainerShutdownLatch = new CountDownLatch(1); + private volatile boolean processorOnStartCalled = false; + + @VisibleForTesting + JobCoordinatorListener jobCoordinatorListener = null; /** * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container @@ -70,84 +87,246 @@ public class StreamProcessor { * <p> * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user * + * @param processorId String identifier for this processor * @param config Instance of config object - contains all configuration required for processing * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances. - * @param lifecycleListener listener to the StreamProcessor life cycle + * @param processorListener listener to the StreamProcessor life cycle */ public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, - AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) { - this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, lifecycleListener); + AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) { + this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener); } - /** *Same as {@link #StreamProcessor(String, Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task * instances are created using the provided {@link StreamTaskFactory}. * @param config - config * @param customMetricsReporters metric Reporter * @param streamTaskFactory task factory to instantiate the Task - * @param lifecycleListener listener to the StreamProcessor life cycle + * @param processorListener listener to the StreamProcessor life cycle */ public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, - StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) { - this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, lifecycleListener); + StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) { + this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, processorListener); } - private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, - Object taskFactory, StreamProcessorLifecycleListener lifecycleListener) { - this.processorId = processorId; - - SamzaContainerController containerController = new SamzaContainerController( - taskFactory, - new TaskConfigJava(config).getShutdownMs(), - customMetricsReporters, - lifecycleListener); - - this.jobCoordinator = Util. + /* package private */ + JobCoordinator getJobCoordinator(String processorId) { + return Util. <JobCoordinatorFactory>getObj( new JobCoordinatorConfig(config) .getJobCoordinatorFactoryClassName()) - .getJobCoordinator(processorId, config, containerController); + .getJobCoordinator(processorId, config); + } - this.lifecycleListener = lifecycleListener; + @VisibleForTesting + StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory, + StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) { + this.taskFactory = taskFactory; + this.config = config; + this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs(); + this.customMetricsReporter = customMetricsReporters; + this.processorListener = processorListener; + this.jobCoordinator = jobCoordinator; + this.jobCoordinatorListener = createJobCoordinatorListener(); + this.jobCoordinator.setListener(jobCoordinatorListener); + } + + private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, + Object taskFactory, StreamProcessorLifecycleListener processorListener) { + this.taskFactory = taskFactory; + this.config = config; + this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs(); + this.customMetricsReporter = customMetricsReporters; + this.processorListener = processorListener; + this.jobCoordinator = getJobCoordinator(processorId); + this.jobCoordinator.setListener(createJobCoordinatorListener()); } /** - * StreamProcessor Lifecycle: start() - * <ul> - * <li>Starts the JobCoordinator and fetches the JobModel</li> - * <li>jobCoordinator.start returns after starting the container using ContainerModel </li> - * </ul> - * When start() returns, it only guarantees that the container is initialized and submitted by the controller to - * execute + * Asynchronously starts this {@link StreamProcessor}. + * <p> + * <b>Implementation</b>: + * Starts the {@link JobCoordinator}, which will eventually start the {@link SamzaContainer} when a new + * {@link JobModel} is available. + * </p> */ public void start() { jobCoordinator.start(); - lifecycleListener.onStart(); } /** - * Method that allows the user to wait for a specified amount of time for the container to initialize and start - * processing messages + * <p> + * Asynchronously stops the {@link StreamProcessor}'s running components - {@link SamzaContainer} + * and {@link JobCoordinator} + * </p> + * There are multiple ways in which the StreamProcessor stops: + * <ol> + * <li>Caller of StreamProcessor invokes stop()</li> + * <li>Samza Container completes processing (eg. bounded input) and shuts down</li> + * <li>Samza Container fails</li> + * <li>Job Coordinator fails</li> + * </ol> + * When either container or coordinator stops (cleanly or due to exception), it will try to shutdown the + * StreamProcessor. This needs to be synchronized so that only one code path gets triggered for shutdown. + * <br> + * If container is running, + * <ol> + * <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop(boolean)} will trigger + * {@link JobCoordinator#stop()}</li> + * <li>container fails to shutdown cleanly and {@link SamzaContainerListener#onContainerFailed(Throwable)} will + * trigger {@link JobCoordinator#stop()}</li> + * </ol> + * If container is not running, then this method will simply shutdown the {@link JobCoordinator}. * - * @param timeoutMs Maximum time to wait, in milliseconds - * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting time - * elapsed - * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up */ - public boolean awaitStart(long timeoutMs) throws InterruptedException { - return jobCoordinator.awaitStart(timeoutMs); + public synchronized void stop() { + boolean containerShutdownInvoked = false; + if (container != null) { + try { + LOGGER.info("Shutting down container " + container.toString() + " from StreamProcessor"); + container.shutdown(); + containerShutdownInvoked = true; + } catch (IllegalContainerStateException icse) { + LOGGER.info("Container was not running", icse); + } + } + + if (!containerShutdownInvoked) { + LOGGER.info("Shutting down JobCoordinator from StreamProcessor"); + jobCoordinator.stop(); + } + } - /** - * StreamProcessor Lifecycle: stop() - * <ul> - * <li>Stops the SamzaContainer execution</li> - * <li>Stops the JobCoordinator</li> - * </ul> - */ - public void stop() { - jobCoordinator.stop(); + SamzaContainer createSamzaContainer(ContainerModel containerModel, int maxChangelogStreamPartitions, JmxServer jmxServer) { + return SamzaContainer.apply( + containerModel, + config, + maxChangelogStreamPartitions, + jmxServer, + Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter), + taskFactory); + } + + JobCoordinatorListener createJobCoordinatorListener() { + return new JobCoordinatorListener() { + + @Override + public void onJobModelExpired() { + if (container != null) { + SamzaContainerStatus status = container.getStatus(); + if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { + boolean shutdownComplete = false; + try { + LOGGER.info("Shutting down container in onJobModelExpired."); + container.pause(); + shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (IllegalContainerStateException icse) { + // Ignored since container is not running + LOGGER.info("Container was not running.", icse); + shutdownComplete = true; + } catch (InterruptedException e) { + LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e); + } + if (!shutdownComplete) { + LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " + + "Stopping the processor."); + container = null; + stop(); + } else { + LOGGER.debug("Container " + container.toString() + " shutdown successfully"); + } + } else { + LOGGER.debug("Container " + container.toString() + " is not running."); + } + } else { + LOGGER.debug("Container is not instantiated yet."); + } + } + + @Override + public void onNewJobModel(String processorId, JobModel jobModel) { + if (!jobModel.getContainers().containsKey(processorId)) { + LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor."); + stop(); + } else { + jcContainerShutdownLatch = new CountDownLatch(1); + + SamzaContainerListener containerListener = new SamzaContainerListener() { + @Override + public void onContainerStart() { + if (!processorOnStartCalled) { + // processorListener is called on start only the first time the container starts. + // It is not called after every re-balance of partitions among the processors + processorOnStartCalled = true; + if (processorListener != null) { + processorListener.onStart(); + } + } else { + LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time."); + } + } + + @Override + public void onContainerStop(boolean pauseByJm) { + if (pauseByJm) { + LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator."); + if (jcContainerShutdownLatch != null) { + jcContainerShutdownLatch.countDown(); + } + } else { // sp.stop was called or container stopped by itself + LOGGER.info("Container " + container.toString() + " stopped."); + container = null; // this guarantees that stop() doesn't try to stop container again + stop(); + } + } + + @Override + public void onContainerFailed(Throwable t) { + if (jcContainerShutdownLatch != null) { + jcContainerShutdownLatch.countDown(); + } else { + LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); + } + LOGGER.error("Container failed. Stopping the processor.", t); + container = null; + stop(); + } + }; + + container = createSamzaContainer( + jobModel.getContainers().get(processorId), + jobModel.maxChangeLogStreamPartitions, + new JmxServer()); + container.setContainerListener(containerListener); + LOGGER.info("Starting container " + container.toString()); + executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("p-" + processorId + "-container-thread-%d").build()); + executorService.submit(container::run); + } + } + + @Override + public void onCoordinatorStop() { + if (executorService != null) { + LOGGER.info("Shutting down the executor service."); + executorService.shutdownNow(); + } + if (processorListener != null) { + processorListener.onShutdown(); + } + } + + @Override + public void onCoordinatorFailure(Throwable e) { + LOGGER.info("Coordinator Failed. Stopping the processor."); + stop(); + if (processorListener != null) { + processorListener.onFailure(e); + } + } + }; } } http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java index 7bca074..6b8e3c7 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java @@ -31,6 +31,9 @@ import org.apache.samza.annotation.InterfaceStability; public interface StreamProcessorLifecycleListener { /** * Callback when the {@link StreamProcessor} is started + * This callback is invoked only once when {@link org.apache.samza.container.SamzaContainer} starts for the first time + * in the {@link StreamProcessor}. When there is a re-balance of tasks/partitions among the processors, the container + * may temporarily be "paused" and re-started again. For such re-starts, this callback is NOT invoked. */ void onStart(); http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 80350df..920cc3d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -33,6 +33,7 @@ import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.task.TaskFactoryUtil; import org.apache.samza.util.ScalaToJavaUtils; import org.apache.samza.util.Util; @@ -55,6 +56,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner { private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class); private final JobModel jobModel; private final String containerId; + private volatile Throwable containerException = null; public LocalContainerRunner(JobModel jobModel, String containerId) { super(jobModel.getConfig()); @@ -71,14 +73,30 @@ public class LocalContainerRunner extends AbstractApplicationRunner { Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); SamzaContainer container = SamzaContainer$.MODULE$.apply( - containerModel.getProcessorId(), containerModel, config, jobModel.maxChangeLogStreamPartitions, - SamzaContainer.getLocalityManager(containerId, config), jmxServer, Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()), taskFactory); + container.setContainerListener( + new SamzaContainerListener() { + @Override + public void onContainerStart() { + log.info("Container Started"); + } + + @Override + public void onContainerStop(boolean invokedExternally) { + log.info("Container Stopped"); + } + + @Override + public void onContainerFailed(Throwable t) { + log.info("Container Failed"); + containerException = t; + } + }); container.run(); } finally { @@ -86,6 +104,10 @@ public class LocalContainerRunner extends AbstractApplicationRunner { jmxServer.stop(); } } + if (containerException != null) { + log.error("Container stopped with Exception. Exiting process now.", containerException); + System.exit(1); + } } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java index 0d74fb8..61ead18 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java @@ -18,18 +18,13 @@ */ package org.apache.samza.standalone; -import com.google.common.annotations.VisibleForTesting; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; -import org.apache.samza.processor.SamzaContainerController; -import org.apache.samza.runtime.ProcessorIdGenerator; +import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; @@ -38,6 +33,10 @@ import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * Standalone Job Coordinator does not implement any leader elector module or cluster manager * @@ -62,87 +61,79 @@ import org.slf4j.LoggerFactory; * </ul> * */ public class StandaloneJobCoordinator implements JobCoordinator { - private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneJobCoordinator.class); private final String processorId; private final Config config; - private final JobModel jobModel; - private final SamzaContainerController containerController; + private JobCoordinatorListener coordinatorListener = null; - @VisibleForTesting - StandaloneJobCoordinator( - ProcessorIdGenerator processorIdGenerator, - Config config, - SamzaContainerController containerController, - JobModel jobModel) { - this.processorId = processorIdGenerator.generateProcessorId(config); - this.config = config; - this.containerController = containerController; - this.jobModel = jobModel; - } - - public StandaloneJobCoordinator(String processorId, Config config, SamzaContainerController containerController) { - this.config = config; - this.containerController = containerController; + public StandaloneJobCoordinator(String processorId, Config config) { this.processorId = processorId; - - JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); - Map<String, SystemAdmin> systemAdmins = new HashMap<>(); - for (String systemName: systemConfig.getSystemNames()) { - String systemFactoryClassName = systemConfig.getSystemFactory(systemName); - if (systemFactoryClassName == null) { - log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); - throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); - } - SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName); - systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); - } - - StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); - - /** TODO: - * Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also, - * in SamzaContainer for writing locality info to the coordinator stream. This closely couples together - * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator - * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) - */ - this.jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null); + this.config = config; } @Override public void start() { // No-op - JobModel jobModel = getJobModel(); - containerController.startContainer( - jobModel.getContainers().get(getProcessorId()), - jobModel.getConfig(), - jobModel.maxChangeLogStreamPartitions); + JobModel jobModel = null; + try { + jobModel = getJobModel(); + } catch (Exception e) { + LOGGER.error("Exception while trying to getJobModel.", e); + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorFailure(e); + } + } + if (jobModel != null && jobModel.getContainers().containsKey(processorId)) { + if (coordinatorListener != null) { + coordinatorListener.onNewJobModel(processorId, jobModel); + } + } else { + stop(); + } } @Override public void stop() { // No-op - containerController.shutdown(); + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + coordinatorListener.onCoordinatorStop(); + } } - /** - * Waits for a specified amount of time for the JobCoordinator to fully start-up, which means it should be ready to - * process messages. In a Standalone use-case, it may be sufficient to wait for the container to start-up. In case of - * ZK based Standalone use-case, it also includes registration with ZK, the initialization of leader elector module etc. - * - * @param timeoutMs Maximum time to wait, in milliseconds - */ @Override - public boolean awaitStart(long timeoutMs) throws InterruptedException { - return containerController.awaitStart(timeoutMs); + public String getProcessorId() { + return processorId; } @Override - public String getProcessorId() { - return processorId; + public void setListener(JobCoordinatorListener listener) { + this.coordinatorListener = listener; } @Override public JobModel getJobModel() { - return jobModel; + JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); + Map<String, SystemAdmin> systemAdmins = new HashMap<>(); + for (String systemName: systemConfig.getSystemNames()) { + String systemFactoryClassName = systemConfig.getSystemFactory(systemName); + if (systemFactoryClassName == null) { + LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + } + SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName); + systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); + } + + StreamMetadataCache streamMetadataCache = new StreamMetadataCache( + Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + + /** TODO: + Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also, + in SamzaContainer for writing locality info to the coordinator stream. This closely couples together + TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator + (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) + */ + return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null); } } http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java index 0faeca9..8c27ebe 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java @@ -21,11 +21,10 @@ package org.apache.samza.standalone; import org.apache.samza.config.Config; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; -import org.apache.samza.processor.SamzaContainerController; public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory { @Override - public JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController) { - return new StandaloneJobCoordinator(processorId, config, containerController); + public JobCoordinator getJobCoordinator(String processorId, Config config) { + return new StandaloneJobCoordinator(processorId, config); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 0afd840..20de43c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -119,7 +119,6 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { @Override public void waitForBarrier(String version, String participantName, Runnable callback) { - setPaths(version); final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, participantName); http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java index 61f7876..b6e3aed 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -58,24 +58,17 @@ public class ZkControllerImpl implements ZkController { keyBuilder.getJobModelPathPrefix()}); } - private void onBecomeLeader() { - - listenToProcessorLiveness(); // subscribe for adding new processors - - // inform the caller - zkControllerListener.onBecomeLeader(); - - } - @Override public void register() { - // TODO - make a loop here with some number of attempts. // possibly split into two method - becomeLeader() and becomeParticipant() leaderElector.tryBecomeLeader(new LeaderElectorListener() { @Override public void onBecomingLeader() { - onBecomeLeader(); + listenToProcessorLiveness(); + + // inform the caller + zkControllerListener.onBecomeLeader(); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 1ddedbc..d2d0199 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -35,7 +35,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; -import org.apache.samza.processor.SamzaContainerController; +import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; @@ -53,23 +53,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private final ZkUtils zkUtils; private final String processorId; - private final ZkController zkController; - private final SamzaContainerController containerController; private final ScheduleAfterDebounceTime debounceTimer; private final StreamMetadataCache streamMetadataCache; - private final ZkKeyBuilder keyBuilder; private final Config config; private final CoordinationUtils coordinationUtils; + private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; - - public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer, - SamzaContainerController containerController) { + + public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer) { + this.processorId = processorId; this.debounceTimer = debounceTimer; - this.containerController = containerController; this.config = config; - this.processorId = processorId; this.coordinationUtils = Util. <CoordinationServiceFactory>getObj( @@ -78,7 +74,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config); this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils(); - this.keyBuilder = zkUtils.getKeyBuilder(); this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this); streamMetadataCache = getStreamMetadataCache(); @@ -109,20 +104,23 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void stop() { + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } zkController.stop(); - if (containerController != null) - containerController.stopContainer(); + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorStop(); + } } @Override - public boolean awaitStart(long timeoutMs) - throws InterruptedException { - return containerController.awaitStart(timeoutMs); + public String getProcessorId() { + return processorId; } @Override - public String getProcessorId() { - return processorId; + public void setListener(JobCoordinatorListener listener) { + this.coordinatorListener = listener; } @Override @@ -147,13 +145,18 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size()); // if list of processors is empty - it means we are called from 'onBecomeLeader' generateNewJobModel(processors); + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } } @Override public void onNewJobModelAvailable(final String version) { log.info("pid=" + processorId + "new JobModel available"); // stop current work - containerController.stopContainer(); + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } log.info("pid=" + processorId + "new JobModel available.Container stopped."); // get the new job model newJobModel = zkUtils.getJobModel(version); @@ -179,8 +182,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel); // start the container with the new model - containerController.startContainer(jobModel.getContainers().get(processorId), jobModel.getConfig(), - jobModel.maxChangeLogStreamPartitions); + if (coordinatorListener != null) { + coordinatorListener.onNewJobModel(processorId, jobModel); + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index a44565c..a7239eb 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -22,7 +22,6 @@ package org.apache.samza.zk; import org.apache.samza.config.Config; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; -import org.apache.samza.processor.SamzaContainerController; public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { /** @@ -30,17 +29,15 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { * * @param processorId - id of this processor * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" - * @param containerController - controller to allow JobCoordinator control the SamzaContainer. * @return An instance of IJobCoordinator */ @Override - public JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController) { + public JobCoordinator getJobCoordinator(String processorId, Config config) { ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); return new ZkJobCoordinator( processorId, config, - debounceTimer, - containerController); + debounceTimer); } } http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 8481c92..c7b2b7c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -22,15 +22,15 @@ package org.apache.samza.container import java.io.File import java.nio.file.Path import java.util -import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import java.net.{URL, UnknownHostException} -import org.apache.samza.SamzaException +import org.apache.samza.{SamzaContainerStatus, SamzaException} import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config.{Config, ShellCommandConfig, StorageConfig} +import org.apache.samza.config.{ClusterManagerConfig, Config, ShellCommandConfig, StorageConfig} import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System @@ -48,7 +48,6 @@ import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.JvmMetrics import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.runtime.ApplicationRunner import org.apache.samza.serializers.SerdeFactory import org.apache.samza.serializers.SerdeManager import org.apache.samza.serializers.model.SamzaObjectMapper @@ -81,8 +80,7 @@ object SamzaContainer extends Logging { val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms" - def getLocalityManager(containerId: String, config: Config): LocalityManager = { - val containerName = getSamzaContainerName(containerId) + def getLocalityManager(containerName: String, config: Config): LocalityManager = { val registryMap = new MetricsRegistryMap(containerName) val coordinatorSystemProducer = new CoordinatorStreamSystemFactory() @@ -108,20 +106,21 @@ object SamzaContainer extends Logging { classOf[JobModel]) } - def getSamzaContainerName(containerId: String): String = { - "samza-container-%s" format containerId - } - def apply( - containerId: String, containerModel: ContainerModel, config: Config, maxChangeLogStreamPartitions: Int, - localityManager: LocalityManager, jmxServer: JmxServer, customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](), taskFactory: Object) = { - val containerName = getSamzaContainerName(containerId) + val containerId = containerModel.getProcessorId() + val containerName = "samza-container-%s" format containerId + + var localityManager: LocalityManager = null + if (new ClusterManagerConfig(config).getHostAffinityEnabled()) { + localityManager = getLocalityManager(containerName, config) + } + val containerPID = Util.getContainerPID info("Setting up Samza container: %s" format containerName) @@ -627,23 +626,25 @@ class SamzaContainer( taskThreadPool: ExecutorService = null) extends Runnable with Logging { val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L) - private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1) var shutdownHookThread: Thread = null - def awaitStart(timeoutMs: Long): Boolean = { - try { - runLoopStartLatch.await(timeoutMs, TimeUnit.MILLISECONDS) - } catch { - case ie: InterruptedException => - error("Interrupted while waiting for runloop to start!", ie) - throw ie - } + @volatile private var status = SamzaContainerStatus.NOT_STARTED + private var exceptionSeen: Throwable = null + private var paused: Boolean = false + private var containerListener: SamzaContainerListener = null + + def getStatus(): SamzaContainerStatus = status + + def setContainerListener(listener: SamzaContainerListener): Unit = { + containerListener = listener } def run { try { info("Starting container.") + status = SamzaContainerStatus.STARTING + startMetrics startOffsetManager startLocalityManager @@ -656,16 +657,24 @@ class SamzaContainer( startSecurityManger addShutdownHook - runLoopStartLatch.countDown() info("Entering run loop.") + status = SamzaContainerStatus.STARTED + if (containerListener != null) { + containerListener.onContainerStart() + } runLoop.run } catch { case e: Throwable => - error("Caught exception/error in process loop.", e) - throw e - } finally { + if (status.equals(SamzaContainerStatus.STARTED)) { + error("Caught exception/error in run loop.", e) + } else { + error("Caught exception/error while initializing container.", e) + } + status = SamzaContainerStatus.FAILED + exceptionSeen = e + } + try { info("Shutting down.") - removeShutdownHook shutdownConsumers @@ -679,11 +688,64 @@ class SamzaContainer( shutdownMetrics shutdownSecurityManger + if (!status.equals(SamzaContainerStatus.FAILED)) { + status = SamzaContainerStatus.STOPPED + } + info("Shutdown complete.") + } catch { + case e: Throwable => + error("Caught exception/error while shutting down container.", e) + if (exceptionSeen == null) { + exceptionSeen = e + } + status = SamzaContainerStatus.FAILED + } + + status match { + case SamzaContainerStatus.STOPPED => + if (containerListener != null) { + containerListener.onContainerStop(paused) + } + case SamzaContainerStatus.FAILED => + if (containerListener != null) { + containerListener.onContainerFailed(exceptionSeen) + } } } - def shutdown() = { + // TODO: We want to introduce a "PAUSED" state for SamzaContainer in the future so that StreamProcessor can pause and + // unpause the container when the jobmodel changes. + /** + * Marks the [[SamzaContainer]] as being paused by the called due to a change in [[JobModel]] and then, asynchronously + * shuts down this [[SamzaContainer]] + */ + def pause(): Unit = { + paused = true + shutdown() + } + + /** + * <p> + * Asynchronously shuts down this [[SamzaContainer]] + * </p> + * <br> + * <b>Implementation</b>: Stops the [[RunLoop]], which will eventually transition the container from + * [[SamzaContainerStatus.STARTED]] to either [[SamzaContainerStatus.STOPPED]] or [[SamzaContainerStatus.FAILED]]]. + * Based on the final `status`, [[SamzaContainerListener#onContainerStop(boolean)]] or + * [[SamzaContainerListener#onContainerFailed(Throwable)]] will be invoked respectively. + * + * @throws SamzaException, Thrown when the container has already been stopped or failed + */ + def shutdown(): Unit = { + if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) { + throw new IllegalContainerStateException("Cannot shutdown a container with status - " + status) + } + shutdownRunLoop() + } + + // Shutdown Runloop + def shutdownRunLoop() = { runLoop match { case runLoop: RunLoop => runLoop.shutdown case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown() @@ -809,10 +871,7 @@ class SamzaContainer( shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") { override def run() = { info("Shutting down, will wait up to %s ms" format shutdownMs) - runLoop match { - case runLoop: RunLoop => runLoop.shutdown - case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown() - } + shutdownRunLoop() //TODO: Pull out shutdown hook to LocalContainerRunner or SP try { runLoopThread.join(shutdownMs) } catch { @@ -923,3 +982,14 @@ class SamzaContainer( } } } + +/** + * Exception thrown when the SamzaContainer tries to transition to an illegal state. + * {@link SamzaContainerStatus} has more details on the state transitions. + * + * @param s String, Message associated with the exception + * @param t Throwable, Wrapped error/exception thrown, if any. + */ +class IllegalContainerStateException(s: String, t: Throwable) extends SamzaException(s, t) { + def this(s: String) = this(s, null) +} http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala index e0522b1..a61a297 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala @@ -19,15 +19,9 @@ package org.apache.samza.job.local -import java.lang.Thread.UncaughtExceptionHandler - +import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish} +import org.apache.samza.job.{ApplicationStatus, StreamJob} import org.apache.samza.util.Logging -import org.apache.samza.job.StreamJob -import org.apache.samza.job.ApplicationStatus -import org.apache.samza.job.ApplicationStatus.New -import org.apache.samza.job.ApplicationStatus.Running -import org.apache.samza.job.ApplicationStatus.SuccessfulFinish -import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish class ThreadJob(runnable: Runnable) extends StreamJob with Logging { @volatile var jobStatus: Option[ApplicationStatus] = None http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index dcef3af..cb36863 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -20,19 +20,16 @@ package org.apache.samza.job.local -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap} -import org.apache.samza.runtime.LocalContainerRunner -import org.apache.samza.task.TaskFactoryUtil -import org.apache.samza.util.Logging -import org.apache.samza.SamzaException import org.apache.samza.config.Config -import org.apache.samza.config.ShellCommandConfig._ -import org.apache.samza.config.TaskConfig._ -import org.apache.samza.container.SamzaContainer -import org.apache.samza.job.{ StreamJob, StreamJobFactory } import org.apache.samza.config.JobConfig._ +import org.apache.samza.config.ShellCommandConfig._ +import org.apache.samza.container.{SamzaContainerListener, SamzaContainer} import org.apache.samza.coordinator.JobModelManager +import org.apache.samza.job.{StreamJob, StreamJobFactory} +import org.apache.samza.metrics.{JmxServer, MetricsReporter} +import org.apache.samza.runtime.LocalContainerRunner +import org.apache.samza.task.TaskFactoryUtil +import org.apache.samza.util.Logging /** * Creates a new Thread job with the given config @@ -54,18 +51,32 @@ class ThreadJobFactory extends StreamJobFactory with Logging { case _ => None } + val containerListener = new SamzaContainerListener { + override def onContainerFailed(t: Throwable): Unit = { + error("Container failed.", t) + throw t + } + + override def onContainerStop(pausedOrNot: Boolean): Unit = { + } + + override def onContainerStart(): Unit = { + + } + } try { coordinator.start - new ThreadJob( - SamzaContainer( - containerModel.getProcessorId, - containerModel, - config, - jobModel.maxChangeLogStreamPartitions, - null, - jmxServer, - Map[String, MetricsReporter](), - taskFactory)) + val container = SamzaContainer( + containerModel, + config, + jobModel.maxChangeLogStreamPartitions, + jmxServer, + Map[String, MetricsReporter](), + taskFactory) + container.setContainerListener(containerListener) + + val threadJob = new ThreadJob(container) + threadJob } finally { coordinator.stop jmxServer.stop http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java new file mode 100644 index 0000000..4a654dc --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.processor; + +import org.apache.samza.SamzaContainerStatus; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.RunLoop; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.JmxServer; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStreamProcessor { + + class TestableStreamProcessor extends StreamProcessor { + private final CountDownLatch containerStop = new CountDownLatch(1); + private final CountDownLatch runLoopStartForMain = new CountDownLatch(1); + private SamzaContainer containerReference = null; + + public TestableStreamProcessor( + Config config, + Map<String, MetricsReporter> customMetricsReporters, + StreamTaskFactory streamTaskFactory, + StreamProcessorLifecycleListener processorListener, + JobCoordinator jobCoordinator) { + super(config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator); + } + + @Override + SamzaContainer createSamzaContainer( + ContainerModel containerModel, + int maxChangelogStreamPartitions, + JmxServer jmxServer) { + RunLoop mockRunLoop = mock(RunLoop.class); + doAnswer(invocation -> + { + try { + runLoopStartForMain.countDown(); + containerStop.await(); + } catch (InterruptedException e) { + System.out.println("In exception" + e); + e.printStackTrace(); + } + return null; + }).when(mockRunLoop).run(); + + doAnswer(invocation -> + { + containerStop.countDown(); + return null; + }).when(mockRunLoop).shutdown(); + containerReference = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, null, mock(StreamTask.class)); + return containerReference; + } + } + + /** + * Tests stop() method when Container AND JobCoordinator are running + */ + @Test + public void testStopByProcessor() { + JobCoordinator mockJobCoordinator = mock(JobCoordinator.class); + + final CountDownLatch processorListenerStop = new CountDownLatch(1); + final CountDownLatch processorListenerStart = new CountDownLatch(1); + + TestableStreamProcessor processor = new TestableStreamProcessor( + new MapConfig(), + new HashMap<>(), + mock(StreamTaskFactory.class), + new StreamProcessorLifecycleListener() { + @Override + public void onStart() { + processorListenerStart.countDown(); + } + + @Override + public void onShutdown() { + processorListenerStop.countDown(); + } + + @Override + public void onFailure(Throwable t) { + + } + }, + mockJobCoordinator); + + Map containers = mock(Map.class); + doReturn(true).when(containers).containsKey(anyString()); + when(containers.get(anyString())).thenReturn(mock(ContainerModel.class)); + JobModel mockJobModel = mock(JobModel.class); + when(mockJobModel.getContainers()).thenReturn(containers); + + final CountDownLatch coordinatorStop = new CountDownLatch(1); + final Thread jcThread = new Thread(() -> + { + try { + processor.jobCoordinatorListener.onNewJobModel("1", mockJobModel); + coordinatorStop.await(); + processor.jobCoordinatorListener.onCoordinatorStop(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + doAnswer(invocation -> + { + coordinatorStop.countDown(); + return null; + }).when(mockJobCoordinator).stop(); + + doAnswer(invocation -> + { + jcThread.start(); + return null; + }).when(mockJobCoordinator).start(); + + try { + processor.start(); + processorListenerStart.await(); + + Assert.assertEquals(SamzaContainerStatus.STARTED, processor.containerReference.getStatus()); + + // This block is required for the mockRunloop is actually start. + // Otherwise, processor.stop gets triggered before mockRunloop begins to block + processor.runLoopStartForMain.await(); + + processor.stop(); + + processorListenerStop.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // TODO: + // Test multiple start / stop and its ordering + // test onNewJobModel + // test onJobModelExpiry + // test Coordinator failure - correctly shutsdown the streamprocessor + // test Container failure +}
