Repository: incubator-samza Updated Branches: refs/heads/0.7.0 477ad024e -> f9aab18c8
SAMZA-253: Consensus shutdown API and per-task commit API. Reviewed by Chris Riccomini. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f9aab18c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f9aab18c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f9aab18c Branch: refs/heads/0.7.0 Commit: f9aab18c81bf95bdf0256c7ccf2649342ca780a3 Parents: 477ad02 Author: Martin Kleppmann <[email protected]> Authored: Fri May 2 18:45:13 2014 +0100 Committer: Martin Kleppmann <[email protected]> Committed: Tue May 20 18:45:03 2014 +0100 ---------------------------------------------------------------------- .../org/apache/samza/task/TaskCoordinator.java | 56 +++++- .../org/apache/samza/container/RunLoop.scala | 169 +++++++++++++++++++ .../apache/samza/container/SamzaContainer.scala | 77 ++------- .../apache/samza/container/TaskInstance.scala | 40 ++--- .../samza/container/TaskInstanceMetrics.scala | 2 - .../apache/samza/task/ReadableCoordinator.scala | 22 ++- .../apache/samza/container/TestRunLoop.scala | 164 ++++++++++++++++++ .../samza/container/TestSamzaContainer.scala | 7 +- .../samza/container/TestTaskInstance.scala | 3 +- .../samza/task/TestReadableCoordinator.scala | 50 ++++-- .../test/integration/SimpleStatefulTask.java | 3 +- .../test/integration/StatePerfTestTask.java | 3 +- .../samza/test/integration/join/Emitter.java | 3 +- .../test/performance/TestPerformanceTask.scala | 3 +- .../test/integration/TestStatefulTask.scala | 3 +- 15 files changed, 476 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java index 192b226..5049b1b 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java @@ -20,7 +20,59 @@ package org.apache.samza.task; public interface TaskCoordinator { - void commit(); + /** + * Requests that Samza should write out a checkpoint, from which a task can restart + * after failure. + * + * <p>If <code>CURRENT_TASK</code> is given, a checkpoint is only written for the + * current task. If <code>ALL_TASKS_IN_CONTAINER</code> is given, a checkpoint is + * written for all tasks in the current container. + * + * <p>Note that if you also have also configured your job to commit in regular + * intervals (using the <code>task.commit.ms</code> property), those time-based + * commits are not affected by calling this method. Any commits you request explicitly + * are in addition to timer-based commits. You can set <code>task.commit.ms=-1</code> + * if you don't want commits to happen automatically. + * + * @param scope Which tasks are being asked to commit. + */ + void commit(RequestScope scope); - void shutdown(); + /** + * Requests that the container should be shut down. + * + * <p>If <code>CURRENT_TASK</code> is given, that indicates a willingness of the current + * task to shut down. All tasks in the container (including the one that requested + * shutdown) will continue processing messages. Only when every task in the container + * has called <code>shutdown(CURRENT_TASK)</code>, the container is shut down. Once a + * task has called <code>shutdown(CURRENT_TASK)</code>, it cannot change its mind + * (i.e. it cannot revoke its willingness to shut down). + * + * <p>If <code>ALL_TASKS_IN_CONTAINER</code> is given, the container will shut down + * immediately after it has finished processing the current message. Any buffers of + * pending writes are flushed, but no further messages will be processed in this + * container. + * + * @param scope The approach we should use for shutting down the container. + */ + void shutdown(RequestScope scope); + + /** + * A task can make requests to the Samza framework while processing messages, such as + * {@link TaskCoordinator#commit(RequestScope)} and + * {@link TaskCoordinator#shutdown(RequestScope)}. This enum is used to indicate + * whether those requests apply only to the current task, or to all tasks in the + * current container. + */ + public enum RequestScope { + /** + * Indicates that a request applies only to the task making the call. + */ + CURRENT_TASK, + + /** + * Indicates that a request applies to all tasks in the current container. + */ + ALL_TASKS_IN_CONTAINER; + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala new file mode 100644 index 0000000..4ca340c --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container + +import grizzled.slf4j.Logging +import org.apache.samza.Partition +import org.apache.samza.system.SystemConsumers +import org.apache.samza.task.ReadableCoordinator + +/** + * Each {@link SamzaContainer} uses a single-threaded execution model: activities for + * all {@link TaskInstance}s within a container are multiplexed onto one execution + * thread. Those activities include task callbacks (such as StreamTask.process and + * WindowableTask.window), committing checkpoints, etc. + * + * <p>This class manages the execution of that run loop, determining what needs to + * be done when. + */ +class RunLoop( + val taskInstances: Map[Partition, TaskInstance], + val consumerMultiplexer: SystemConsumers, + val metrics: SamzaContainerMetrics, + val windowMs: Long = -1, + val commitMs: Long = 60000, + val clock: () => Long = { System.currentTimeMillis }) extends Runnable with Logging { + + private var lastWindowMs = 0L + private var lastCommitMs = 0L + private var taskShutdownRequests: Set[Partition] = Set() + private var taskCommitRequests: Set[Partition] = Set() + private var shutdownNow = false + + + /** + * Starts the run loop. Blocks until either the tasks request shutdown, or an + * unhandled exception is thrown. + */ + def run { + while (!shutdownNow) { + process + window + send + commit + } + } + + + /** + * Chooses a message from an input stream to process, and calls the + * process() method on the appropriate StreamTask to handle it. + */ + private def process { + trace("Attempting to choose a message to process.") + metrics.processes.inc + + val envelope = consumerMultiplexer.choose + + if (envelope != null) { + val partition = envelope.getSystemStreamPartition.getPartition + + trace("Processing incoming message envelope for partition %s." format partition) + metrics.envelopes.inc + + val coordinator = new ReadableCoordinator(partition) + taskInstances(partition).process(envelope, coordinator) + checkCoordinator(coordinator) + } else { + trace("No incoming message envelope was available.") + metrics.nullEnvelopes.inc + } + } + + + /** + * Invokes WindowableTask.window on all tasks if it's time to do so. + */ + private def window { + if (windowMs >= 0 && lastWindowMs + windowMs < clock()) { + trace("Windowing stream tasks.") + lastWindowMs = clock() + metrics.windows.inc + + taskInstances.foreach { case (partition, task) => + val coordinator = new ReadableCoordinator(partition) + task.window(coordinator) + checkCoordinator(coordinator) + } + } + } + + + /** + * If task instances published any messages to output streams, this flushes + * them to the underlying systems. + */ + private def send { + trace("Triggering send in task instances.") + metrics.sends.inc + taskInstances.values.foreach(_.send) + } + + + /** + * Commits task state as a a checkpoint, if necessary. + */ + private def commit { + if (commitMs >= 0 && lastCommitMs + commitMs < clock()) { + trace("Committing task instances because the commit interval has elapsed.") + lastCommitMs = clock() + metrics.commits.inc + taskInstances.values.foreach(_.commit) + } else if (!taskCommitRequests.isEmpty) { + trace("Committing due to explicit commit request.") + metrics.commits.inc + taskCommitRequests.foreach(partition => { + taskInstances(partition).commit + }) + } + + taskCommitRequests = Set() + } + + + /** + * A new TaskCoordinator object is passed to a task on every call to StreamTask.process + * and WindowableTask.window. This method checks whether the task requested that we + * do something that affects the run loop (such as commit or shut down), and updates + * run loop state accordingly. + */ + private def checkCoordinator(coordinator: ReadableCoordinator) { + if (coordinator.requestedCommitTask) { + debug("Task %s requested commit for current task only" format coordinator.partition) + taskCommitRequests += coordinator.partition + } + + if (coordinator.requestedCommitAll) { + debug("Task %s requested commit for all tasks in the container" format coordinator.partition) + taskCommitRequests ++= taskInstances.keys + } + + if (coordinator.requestedShutdownOnConsensus) { + taskShutdownRequests += coordinator.partition + info("Shutdown has now been requested by tasks: %s" format taskShutdownRequests) + } + + if (coordinator.requestedShutdownNow || taskShutdownRequests.size == taskInstances.size) { + info("Shutdown requested.") + shutdownNow = true + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index ef14643..c7c6bcc 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -49,7 +49,6 @@ import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.TaskLifecycleListenerFactory import org.apache.samza.util.Util -import org.apache.samza.task.ReadableCoordinator import org.apache.samza.system.SystemProducers import org.apache.samza.task.ReadableCollector import org.apache.samza.system.SystemConsumers @@ -424,18 +423,24 @@ object SamzaContainer extends Logging { reporters = reporters, listeners = listeners, inputStreams = inputStreamsForThisPartition, - windowMs = taskWindowMs, - commitMs = taskCommitMs, collector = collector) (partition, taskInstance) }).toMap + val runLoop = new RunLoop( + taskInstances = taskInstances, + consumerMultiplexer = consumerMultiplexer, + metrics = samzaContainerMetrics, + windowMs = taskWindowMs, + commitMs = taskCommitMs + ) + info("Samza container setup complete.") new SamzaContainer( taskInstances = taskInstances, - config = config, + runLoop = runLoop, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, offsetManager = offsetManager, @@ -476,7 +481,7 @@ object SamzaContainer extends Logging { class SamzaContainer( taskInstances: Map[Partition, TaskInstance], - config: Config, + runLoop: RunLoop, consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers, metrics: SamzaContainerMetrics, @@ -496,21 +501,7 @@ class SamzaContainer( startConsumers info("Entering run loop.") - - while (true) { - val coordinator = new ReadableCoordinator - - process(coordinator) - window(coordinator) - send - commit(coordinator) - - if (coordinator.shutdownRequested) { - info("Shutdown requested.") - - return - } - } + runLoop.run } catch { case e: Exception => error("Caught exception in process loop.", e) @@ -590,52 +581,6 @@ class SamzaContainer( consumerMultiplexer.start } - def process(coordinator: ReadableCoordinator) { - trace("Attempting to choose a message to process.") - - metrics.processes.inc - - val envelope = consumerMultiplexer.choose - - if (envelope != null) { - val partition = envelope.getSystemStreamPartition.getPartition - - trace("Processing incoming message envelope for partition %s." format partition) - - metrics.envelopes.inc - - taskInstances(partition).process(envelope, coordinator) - } else { - trace("No incoming message envelope was available.") - - metrics.nullEnvelopes.inc - } - } - - def window(coordinator: ReadableCoordinator) { - trace("Windowing stream tasks.") - - metrics.windows.inc - - taskInstances.values.foreach(_.window(coordinator)) - } - - def send { - trace("Triggering send in task instances.") - - metrics.sends.inc - - taskInstances.values.foreach(_.send) - } - - def commit(coordinator: ReadableCoordinator) { - trace("Committing task instances.") - - metrics.commits.inc - - taskInstances.values.foreach(_.commit(coordinator)) - } - def shutdownConsumers { info("Shutting down consumer multiplexer.") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 049fd1f..99a9841 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -57,13 +57,8 @@ class TaskInstance( reporters: Map[String, MetricsReporter] = Map(), listeners: Seq[TaskLifecycleListener] = Seq(), inputStreams: Set[SystemStream] = Set(), - windowMs: Long = -1, - commitMs: Long = 60000, - clock: () => Long = { System.currentTimeMillis }, collector: ReadableCollector = new ReadableCollector) extends Logging { - var lastWindowMs = 0L - var lastCommitMs = 0L val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isClosableTask = task.isInstanceOf[ClosableTask] @@ -156,19 +151,12 @@ class TaskInstance( } def window(coordinator: ReadableCoordinator) { - if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < clock()) { + if (isWindowableTask) { trace("Windowing for partition: %s" format partition) metrics.windows.inc - lastWindowMs = clock() task.asInstanceOf[WindowableTask].window(collector, coordinator) - - trace("Assigned last window time for partition: %s, %s" format (partition, lastWindowMs)) - } else { - trace("Skipping window for partition: %s" format partition) - - metrics.windowsSkipped.inc } } @@ -191,28 +179,20 @@ class TaskInstance( } } - def commit(coordinator: ReadableCoordinator) { - if (lastCommitMs + commitMs < clock() || coordinator.isCommitRequested || coordinator.isShutdownRequested) { - trace("Flushing state stores for partition: %s" format partition) - - metrics.commits.inc + def commit { + trace("Flushing state stores for partition: %s" format partition) - lastCommitMs = clock() + metrics.commits.inc - storageManager.flush + storageManager.flush - trace("Flushing producers for partition: %s" format partition) + trace("Flushing producers for partition: %s" format partition) - producerMultiplexer.flush(metrics.source) + producerMultiplexer.flush(metrics.source) - trace("Committing offset manager for partition: %s" format partition) + trace("Committing offset manager for partition: %s" format partition) - offsetManager.checkpoint(partition) - } else { - trace("Skipping commit for partition: %s" format partition) - - metrics.commitsSkipped.inc - } + offsetManager.checkpoint(partition) } def shutdownTask { @@ -241,6 +221,6 @@ class TaskInstance( override def toString() = "TaskInstance for class %s and partition %s." format (task.getClass.getName, partition) - def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size) + def toDetailedString() = "TaskInstance [windowable=%s, closable=%s, collector_size=%s]" format (isWindowableTask, isClosableTask, collector.envelopes.size) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index 46f1f17..7502124 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -31,9 +31,7 @@ class TaskInstanceMetrics( val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { val commits = newCounter("commit-calls") - val commitsSkipped = newCounter("commit-skipped") val windows = newCounter("window-calls") - val windowsSkipped = newCounter("window-skipped") val processes = newCounter("process-calls") val sends = newCounter("send-calls") val sendsSkipped = newCounter("send-skipped") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala index aaf631e..4ccd604 100644 --- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala @@ -19,16 +19,22 @@ package org.apache.samza.task -/** An in-memory implementation of TaskCoordinator that stores all coordination messages */ -class ReadableCoordinator extends TaskCoordinator { - var commitRequested = false - var shutdownRequested = false +import org.apache.samza.task.TaskCoordinator.RequestScope +import org.apache.samza.Partition - def commit { commitRequested = true } +/** + * An in-memory implementation of TaskCoordinator that is specific to a single TaskInstance. + */ +class ReadableCoordinator(val partition: Partition) extends TaskCoordinator { + var commitRequest: Option[RequestScope] = None + var shutdownRequest: Option[RequestScope] = None - def isCommitRequested = commitRequested + override def commit(scope: RequestScope) { commitRequest = Some(scope) } + override def shutdown(scope: RequestScope) { shutdownRequest = Some(scope) } - def shutdown { shutdownRequested = true } + def requestedCommitTask = commitRequest.isDefined && commitRequest.get == RequestScope.CURRENT_TASK + def requestedCommitAll = commitRequest.isDefined && commitRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER - def isShutdownRequested = shutdownRequested + def requestedShutdownOnConsensus = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.CURRENT_TASK + def requestedShutdownNow = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala new file mode 100644 index 0000000..fa10231 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container + +import org.junit.Test +import org.mockito.Matchers +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.junit.AssertionsForJUnit +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.mock.MockitoSugar +import org.apache.samza.Partition +import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition} +import org.apache.samza.task.ReadableCoordinator +import org.apache.samza.task.TaskCoordinator.RequestScope + +class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatchers { + class StopRunLoop extends RuntimeException + + val p0 = new Partition(0) + val p1 = new Partition(1) + val ssp0 = new SystemStreamPartition("testSystem", "testStream", p0) + val ssp1 = new SystemStreamPartition("testSystem", "testStream", p1) + val envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0") + val envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1") + + @Test + def testProcessMessageFromChooser { + val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val consumers = mock[SystemConsumers] + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics) + + when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new StopRunLoop) + intercept[StopRunLoop] { runLoop.run } + verify(taskInstances(p0)).process(Matchers.eq(envelope0), anyObject) + verify(taskInstances(p1)).process(Matchers.eq(envelope1), anyObject) + runLoop.metrics.envelopes.getCount should equal(2L) + runLoop.metrics.nullEnvelopes.getCount should equal(0L) + } + + @Test + def testNullMessageFromChooser { + val consumers = mock[SystemConsumers] + val runLoop = new RunLoop(Map(p0 -> mock[TaskInstance]), consumers, new SamzaContainerMetrics) + when(consumers.choose).thenReturn(null).thenReturn(null).thenThrow(new StopRunLoop) + intercept[StopRunLoop] { runLoop.run } + runLoop.metrics.envelopes.getCount should equal(0L) + runLoop.metrics.nullEnvelopes.getCount should equal(2L) + } + + @Test + def testWindowAndCommitAreCalledRegularly { + var now = 1400000000000L + val consumers = mock[SystemConsumers] + when(consumers.choose).thenReturn(envelope0) + + val runLoop = new RunLoop( + taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]), + consumerMultiplexer = consumers, + metrics = new SamzaContainerMetrics, + windowMs = 60000, // call window once per minute + commitMs = 30000, // call commit twice per minute + clock = () => { + now += 100 // clock advances by 100 ms every time we look at it + if (now == 1400000290000L) throw new StopRunLoop // stop after 4 minutes 50 seconds + now + }) + + intercept[StopRunLoop] { runLoop.run } + + verify(runLoop.taskInstances(p0), times(5)).window(anyObject) + verify(runLoop.taskInstances(p1), times(5)).window(anyObject) + verify(runLoop.taskInstances(p0), times(10)).commit + verify(runLoop.taskInstances(p1), times(10)).commit + } + + @Test + def testCommitCurrentTaskManually { + val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val consumers = mock[SystemConsumers] + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + + when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new StopRunLoop) + stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.commit(RequestScope.CURRENT_TASK)) + + intercept[StopRunLoop] { runLoop.run } + verify(taskInstances(p0), times(1)).commit + verify(taskInstances(p1), times(0)).commit + } + + @Test + def testCommitAllTasksManually { + val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val consumers = mock[SystemConsumers] + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + + when(consumers.choose).thenReturn(envelope0).thenThrow(new StopRunLoop) + stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER)) + + intercept[StopRunLoop] { runLoop.run } + verify(taskInstances(p0), times(1)).commit + verify(taskInstances(p1), times(1)).commit + } + + @Test + def testShutdownOnConsensus { + val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val consumers = mock[SystemConsumers] + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + + when(consumers.choose).thenReturn(envelope0).thenReturn(envelope0).thenReturn(envelope1) + stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) + stubProcess(taskInstances(p1), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) + + runLoop.run + verify(taskInstances(p0), times(2)).process(Matchers.eq(envelope0), anyObject) + verify(taskInstances(p1), times(1)).process(Matchers.eq(envelope1), anyObject) + } + + @Test + def testShutdownNow { + val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val consumers = mock[SystemConsumers] + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + + when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1) + stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)) + + runLoop.run + verify(taskInstances(p0), times(1)).process(anyObject, anyObject) + verify(taskInstances(p1), times(0)).process(anyObject, anyObject) + } + + def anyObject[T] = Matchers.anyObject.asInstanceOf[T] + + // Stub out TaskInstance.process. Mockito really doesn't make this easy. :( + def stubProcess(taskInstance: TaskInstance, process: (IncomingMessageEnvelope, ReadableCoordinator) => Unit) { + when(taskInstance.process(anyObject, anyObject)).thenAnswer(new Answer[Unit]() { + override def answer(invocation: InvocationOnMock) { + val envelope = invocation.getArguments()(0).asInstanceOf[IncomingMessageEnvelope] + val coordinator = invocation.getArguments()(1).asInstanceOf[ReadableCoordinator] + process(envelope, coordinator) + } + }) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index a2d5820..3c9272c 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -95,9 +95,14 @@ class TestSamzaContainer { new TaskInstanceMetrics, consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers) + val runLoop = new RunLoop( + taskInstances = Map(partition -> taskInstance), + consumerMultiplexer = consumerMultiplexer, + metrics = new SamzaContainerMetrics + ) val container = new SamzaContainer( Map(partition -> taskInstance), - config, + runLoop, consumerMultiplexer, producerMultiplexer, new SamzaContainerMetrics) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 27b4ca5..1f5e3bb 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -71,7 +71,8 @@ class TestTaskInstance { producerMultiplexer, offsetManager) // Pretend we got a message with offset 2 and next offset 3. - taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), new ReadableCoordinator) + val coordinator = new ReadableCoordinator(partition) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator) // Check to see if the offset manager has been properly updated with offset 3. val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition) assertTrue(lastProcessedOffset.isDefined) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala index c45ed9b..12f1e03 100644 --- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala @@ -21,25 +21,47 @@ package org.apache.samza.task import org.junit.Assert._ import org.junit.Test +import org.apache.samza.Partition +import org.apache.samza.task.TaskCoordinator.RequestScope class TestReadableCoordinator { @Test - def testCommit { - val coord = new ReadableCoordinator - assert(!coord.isCommitRequested) - coord.commit - assert(coord.isCommitRequested) - coord.commit - assert(coord.isCommitRequested) + def testCommitTask { + val coord = new ReadableCoordinator(new Partition(0)) + assertFalse(coord.requestedCommitTask) + assertFalse(coord.requestedCommitAll) + coord.commit(RequestScope.CURRENT_TASK) + assertTrue(coord.requestedCommitTask) + assertFalse(coord.requestedCommitAll) } @Test - def testShutdown { - val coord = new ReadableCoordinator - assert(!coord.isShutdownRequested) - coord.shutdown - assert(coord.isShutdownRequested) - coord.shutdown - assert(coord.isShutdownRequested) + def testCommitAll { + val coord = new ReadableCoordinator(new Partition(0)) + assertFalse(coord.requestedCommitTask) + assertFalse(coord.requestedCommitAll) + coord.commit(RequestScope.ALL_TASKS_IN_CONTAINER) + assertFalse(coord.requestedCommitTask) + assertTrue(coord.requestedCommitAll) + } + + @Test + def testShutdownNow { + val coord = new ReadableCoordinator(new Partition(0)) + assertFalse(coord.requestedShutdownOnConsensus) + assertFalse(coord.requestedShutdownNow) + coord.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) + assertFalse(coord.requestedShutdownOnConsensus) + assertTrue(coord.requestedShutdownNow) + } + + @Test + def testShutdownRequest { + val coord = new ReadableCoordinator(new Partition(0)) + assertFalse(coord.requestedShutdownOnConsensus) + assertFalse(coord.requestedShutdownNow) + coord.shutdown(RequestScope.CURRENT_TASK) + assertTrue(coord.requestedShutdownOnConsensus) + assertFalse(coord.requestedShutdownNow) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java index 46f6ef6..52a8059 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java @@ -29,6 +29,7 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.TaskCoordinator.RequestScope; public class SimpleStatefulTask implements StreamTask, InitableTask { @@ -48,7 +49,7 @@ public class SimpleStatefulTask implements StreamTask, InitableTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { System.out.println("Adding " + envelope.getMessage() + " => " + envelope.getMessage() + " to the store."); store.put((String) envelope.getMessage(), (String) envelope.getMessage()); - coordinator.commit(); + coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java index d84db5e..25417a6 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java @@ -27,6 +27,7 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.TaskCoordinator.RequestScope; public class StatePerfTestTask implements StreamTask, InitableTask { @@ -47,7 +48,7 @@ public class StatePerfTestTask implements StreamTask, InitableTask { System.out.println(String.format("Throughput = %.2f messages/sec.", count/ellapsedSecs)); start = System.currentTimeMillis(); count = 0; - coordinator.commit(); + coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java index b3be653..222c130 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java @@ -29,6 +29,7 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.TaskCoordinator.RequestScope; import org.apache.samza.task.WindowableTask; /** @@ -66,7 +67,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { // it's a new era, reset current epoch and count this.state.put(EPOCH, Integer.toString(epoch)); this.state.put(COUNT, "0"); - coordinator.commit(); + coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala index 49dffa1..23d122e 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala @@ -25,6 +25,7 @@ import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.task.MessageCollector import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskCoordinator +import org.apache.samza.task.TaskCoordinator.RequestScope import org.apache.samza.config.Config import grizzled.slf4j.Logging import org.apache.samza.system.SystemStream @@ -104,7 +105,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging { } if (messagesProcessed >= maxMessages) { - coordinator.shutdown + coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 6fdfcfc..10502a9 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -71,6 +71,7 @@ import java.util.concurrent.Executors import kafka.message.MessageAndOffset import kafka.message.MessageAndMetadata import org.apache.samza.job.StreamJob +import org.apache.samza.task.TaskCoordinator.RequestScope object TestStatefulTask { val INPUT_TOPIC = "input" @@ -448,7 +449,7 @@ class TestTask extends StreamTask with InitableTask { store.put(msg, msg) } - coordinator.commit + coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER) // Notify sender that we got a message. gotMessage.countDown
