Repository: incubator-samza
Updated Branches:
  refs/heads/0.7.0 477ad024e -> f9aab18c8


SAMZA-253: Consensus shutdown API and per-task commit API. Reviewed by Chris 
Riccomini.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f9aab18c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f9aab18c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f9aab18c

Branch: refs/heads/0.7.0
Commit: f9aab18c81bf95bdf0256c7ccf2649342ca780a3
Parents: 477ad02
Author: Martin Kleppmann <[email protected]>
Authored: Fri May 2 18:45:13 2014 +0100
Committer: Martin Kleppmann <[email protected]>
Committed: Tue May 20 18:45:03 2014 +0100

----------------------------------------------------------------------
 .../org/apache/samza/task/TaskCoordinator.java  |  56 +++++-
 .../org/apache/samza/container/RunLoop.scala    | 169 +++++++++++++++++++
 .../apache/samza/container/SamzaContainer.scala |  77 ++-------
 .../apache/samza/container/TaskInstance.scala   |  40 ++---
 .../samza/container/TaskInstanceMetrics.scala   |   2 -
 .../apache/samza/task/ReadableCoordinator.scala |  22 ++-
 .../apache/samza/container/TestRunLoop.scala    | 164 ++++++++++++++++++
 .../samza/container/TestSamzaContainer.scala    |   7 +-
 .../samza/container/TestTaskInstance.scala      |   3 +-
 .../samza/task/TestReadableCoordinator.scala    |  50 ++++--
 .../test/integration/SimpleStatefulTask.java    |   3 +-
 .../test/integration/StatePerfTestTask.java     |   3 +-
 .../samza/test/integration/join/Emitter.java    |   3 +-
 .../test/performance/TestPerformanceTask.scala  |   3 +-
 .../test/integration/TestStatefulTask.scala     |   3 +-
 15 files changed, 476 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
index 192b226..5049b1b 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
@@ -20,7 +20,59 @@
 package org.apache.samza.task;
 
 public interface TaskCoordinator {
-  void commit();
+  /**
+   * Requests that Samza should write out a checkpoint, from which a task can 
restart
+   * after failure.
+   *
+   * <p>If <code>CURRENT_TASK</code> is given, a checkpoint is only written 
for the
+   * current task. If <code>ALL_TASKS_IN_CONTAINER</code> is given, a 
checkpoint is
+   * written for all tasks in the current container.
+   *
+   * <p>Note that if you also have also configured your job to commit in 
regular
+   * intervals (using the <code>task.commit.ms</code> property), those 
time-based
+   * commits are not affected by calling this method. Any commits you request 
explicitly
+   * are in addition to timer-based commits. You can set 
<code>task.commit.ms=-1</code>
+   * if you don't want commits to happen automatically.
+   *
+   * @param scope Which tasks are being asked to commit.
+   */
+  void commit(RequestScope scope);
 
-  void shutdown();
+  /**
+   * Requests that the container should be shut down.
+   *
+   * <p>If <code>CURRENT_TASK</code> is given, that indicates a willingness of 
the current
+   * task to shut down. All tasks in the container (including the one that 
requested
+   * shutdown) will continue processing messages. Only when every task in the 
container
+   * has called <code>shutdown(CURRENT_TASK)</code>, the container is shut 
down. Once a
+   * task has called <code>shutdown(CURRENT_TASK)</code>, it cannot change its 
mind
+   * (i.e. it cannot revoke its willingness to shut down).
+   *
+   * <p>If <code>ALL_TASKS_IN_CONTAINER</code> is given, the container will 
shut down
+   * immediately after it has finished processing the current message. Any 
buffers of
+   * pending writes are flushed, but no further messages will be processed in 
this
+   * container.
+   *
+   * @param scope The approach we should use for shutting down the container.
+   */
+  void shutdown(RequestScope scope);
+
+  /**
+   * A task can make requests to the Samza framework while processing 
messages, such as
+   * {@link TaskCoordinator#commit(RequestScope)} and
+   * {@link TaskCoordinator#shutdown(RequestScope)}. This enum is used to 
indicate
+   * whether those requests apply only to the current task, or to all tasks in 
the
+   * current container.
+   */
+  public enum RequestScope {
+    /**
+     * Indicates that a request applies only to the task making the call.
+     */
+    CURRENT_TASK,
+
+    /**
+     * Indicates that a request applies to all tasks in the current container.
+     */
+    ALL_TASKS_IN_CONTAINER;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
new file mode 100644
index 0000000..4ca340c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import grizzled.slf4j.Logging
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.task.ReadableCoordinator
+
+/**
+ * Each {@link SamzaContainer} uses a single-threaded execution model: 
activities for
+ * all {@link TaskInstance}s within a container are multiplexed onto one 
execution
+ * thread. Those activities include task callbacks (such as StreamTask.process 
and
+ * WindowableTask.window), committing checkpoints, etc.
+ *
+ * <p>This class manages the execution of that run loop, determining what 
needs to
+ * be done when.
+ */
+class RunLoop(
+  val taskInstances: Map[Partition, TaskInstance],
+  val consumerMultiplexer: SystemConsumers,
+  val metrics: SamzaContainerMetrics,
+  val windowMs: Long = -1,
+  val commitMs: Long = 60000,
+  val clock: () => Long = { System.currentTimeMillis }) extends Runnable with 
Logging {
+
+  private var lastWindowMs = 0L
+  private var lastCommitMs = 0L
+  private var taskShutdownRequests: Set[Partition] = Set()
+  private var taskCommitRequests: Set[Partition] = Set()
+  private var shutdownNow = false
+
+
+  /**
+   * Starts the run loop. Blocks until either the tasks request shutdown, or an
+   * unhandled exception is thrown.
+   */
+  def run {
+    while (!shutdownNow) {
+      process
+      window
+      send
+      commit
+    }
+  }
+
+
+  /**
+   * Chooses a message from an input stream to process, and calls the
+   * process() method on the appropriate StreamTask to handle it.
+   */
+  private def process {
+    trace("Attempting to choose a message to process.")
+    metrics.processes.inc
+
+    val envelope = consumerMultiplexer.choose
+
+    if (envelope != null) {
+      val partition = envelope.getSystemStreamPartition.getPartition
+
+      trace("Processing incoming message envelope for partition %s." format 
partition)
+      metrics.envelopes.inc
+
+      val coordinator = new ReadableCoordinator(partition)
+      taskInstances(partition).process(envelope, coordinator)
+      checkCoordinator(coordinator)
+    } else {
+      trace("No incoming message envelope was available.")
+      metrics.nullEnvelopes.inc
+    }
+  }
+
+
+  /**
+   * Invokes WindowableTask.window on all tasks if it's time to do so.
+   */
+  private def window {
+    if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
+      trace("Windowing stream tasks.")
+      lastWindowMs = clock()
+      metrics.windows.inc
+
+      taskInstances.foreach { case (partition, task) =>
+        val coordinator = new ReadableCoordinator(partition)
+        task.window(coordinator)
+        checkCoordinator(coordinator)
+      }
+    }
+  }
+
+
+  /**
+   * If task instances published any messages to output streams, this flushes
+   * them to the underlying systems.
+   */
+  private def send {
+    trace("Triggering send in task instances.")
+    metrics.sends.inc
+    taskInstances.values.foreach(_.send)
+  }
+
+
+  /**
+   * Commits task state as a a checkpoint, if necessary.
+   */
+  private def commit {
+    if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
+      trace("Committing task instances because the commit interval has 
elapsed.")
+      lastCommitMs = clock()
+      metrics.commits.inc
+      taskInstances.values.foreach(_.commit)
+    } else if (!taskCommitRequests.isEmpty) {
+      trace("Committing due to explicit commit request.")
+      metrics.commits.inc
+      taskCommitRequests.foreach(partition => {
+        taskInstances(partition).commit
+      })
+    }
+
+    taskCommitRequests = Set()
+  }
+
+
+  /**
+   * A new TaskCoordinator object is passed to a task on every call to 
StreamTask.process
+   * and WindowableTask.window. This method checks whether the task requested 
that we
+   * do something that affects the run loop (such as commit or shut down), and 
updates
+   * run loop state accordingly.
+   */
+  private def checkCoordinator(coordinator: ReadableCoordinator) {
+    if (coordinator.requestedCommitTask) {
+      debug("Task %s requested commit for current task only" format 
coordinator.partition)
+      taskCommitRequests += coordinator.partition
+    }
+
+    if (coordinator.requestedCommitAll) {
+      debug("Task %s requested commit for all tasks in the container" format 
coordinator.partition)
+      taskCommitRequests ++= taskInstances.keys
+    }
+
+    if (coordinator.requestedShutdownOnConsensus) {
+      taskShutdownRequests += coordinator.partition
+      info("Shutdown has now been requested by tasks: %s" format 
taskShutdownRequests)
+    }
+
+    if (coordinator.requestedShutdownNow || taskShutdownRequests.size == 
taskInstances.size) {
+      info("Shutdown requested.")
+      shutdownNow = true
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ef14643..c7c6bcc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -49,7 +49,6 @@ import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.TaskLifecycleListenerFactory
 import org.apache.samza.util.Util
-import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCollector
 import org.apache.samza.system.SystemConsumers
@@ -424,18 +423,24 @@ object SamzaContainer extends Logging {
         reporters = reporters,
         listeners = listeners,
         inputStreams = inputStreamsForThisPartition,
-        windowMs = taskWindowMs,
-        commitMs = taskCommitMs,
         collector = collector)
 
       (partition, taskInstance)
     }).toMap
 
+    val runLoop = new RunLoop(
+      taskInstances = taskInstances,
+      consumerMultiplexer = consumerMultiplexer,
+      metrics = samzaContainerMetrics,
+      windowMs = taskWindowMs,
+      commitMs = taskCommitMs
+    )
+
     info("Samza container setup complete.")
 
     new SamzaContainer(
       taskInstances = taskInstances,
-      config = config,
+      runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
@@ -476,7 +481,7 @@ object SamzaContainer extends Logging {
 
 class SamzaContainer(
   taskInstances: Map[Partition, TaskInstance],
-  config: Config,
+  runLoop: RunLoop,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
@@ -496,21 +501,7 @@ class SamzaContainer(
       startConsumers
 
       info("Entering run loop.")
-
-      while (true) {
-        val coordinator = new ReadableCoordinator
-
-        process(coordinator)
-        window(coordinator)
-        send
-        commit(coordinator)
-
-        if (coordinator.shutdownRequested) {
-          info("Shutdown requested.")
-
-          return
-        }
-      }
+      runLoop.run
     } catch {
       case e: Exception =>
         error("Caught exception in process loop.", e)
@@ -590,52 +581,6 @@ class SamzaContainer(
     consumerMultiplexer.start
   }
 
-  def process(coordinator: ReadableCoordinator) {
-    trace("Attempting to choose a message to process.")
-
-    metrics.processes.inc
-
-    val envelope = consumerMultiplexer.choose
-
-    if (envelope != null) {
-      val partition = envelope.getSystemStreamPartition.getPartition
-
-      trace("Processing incoming message envelope for partition %s." format 
partition)
-
-      metrics.envelopes.inc
-
-      taskInstances(partition).process(envelope, coordinator)
-    } else {
-      trace("No incoming message envelope was available.")
-
-      metrics.nullEnvelopes.inc
-    }
-  }
-
-  def window(coordinator: ReadableCoordinator) {
-    trace("Windowing stream tasks.")
-
-    metrics.windows.inc
-
-    taskInstances.values.foreach(_.window(coordinator))
-  }
-
-  def send {
-    trace("Triggering send in task instances.")
-
-    metrics.sends.inc
-
-    taskInstances.values.foreach(_.send)
-  }
-
-  def commit(coordinator: ReadableCoordinator) {
-    trace("Committing task instances.")
-
-    metrics.commits.inc
-
-    taskInstances.values.foreach(_.commit(coordinator))
-  }
-
   def shutdownConsumers {
     info("Shutting down consumer multiplexer.")
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 049fd1f..99a9841 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -57,13 +57,8 @@ class TaskInstance(
   reporters: Map[String, MetricsReporter] = Map(),
   listeners: Seq[TaskLifecycleListener] = Seq(),
   inputStreams: Set[SystemStream] = Set(),
-  windowMs: Long = -1,
-  commitMs: Long = 60000,
-  clock: () => Long = { System.currentTimeMillis },
   collector: ReadableCollector = new ReadableCollector) extends Logging {
 
-  var lastWindowMs = 0L
-  var lastCommitMs = 0L
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
@@ -156,19 +151,12 @@ class TaskInstance(
   }
 
   def window(coordinator: ReadableCoordinator) {
-    if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < 
clock()) {
+    if (isWindowableTask) {
       trace("Windowing for partition: %s" format partition)
 
       metrics.windows.inc
 
-      lastWindowMs = clock()
       task.asInstanceOf[WindowableTask].window(collector, coordinator)
-
-      trace("Assigned last window time for partition: %s, %s" format 
(partition, lastWindowMs))
-    } else {
-      trace("Skipping window for partition: %s" format partition)
-
-      metrics.windowsSkipped.inc
     }
   }
 
@@ -191,28 +179,20 @@ class TaskInstance(
     }
   }
 
-  def commit(coordinator: ReadableCoordinator) {
-    if (lastCommitMs + commitMs < clock() || coordinator.isCommitRequested || 
coordinator.isShutdownRequested) {
-      trace("Flushing state stores for partition: %s" format partition)
-
-      metrics.commits.inc
+  def commit {
+    trace("Flushing state stores for partition: %s" format partition)
 
-      lastCommitMs = clock()
+    metrics.commits.inc
 
-      storageManager.flush
+    storageManager.flush
 
-      trace("Flushing producers for partition: %s" format partition)
+    trace("Flushing producers for partition: %s" format partition)
 
-      producerMultiplexer.flush(metrics.source)
+    producerMultiplexer.flush(metrics.source)
 
-      trace("Committing offset manager for partition: %s" format partition)
+    trace("Committing offset manager for partition: %s" format partition)
 
-      offsetManager.checkpoint(partition)
-    } else {
-      trace("Skipping commit for partition: %s" format partition)
-
-      metrics.commitsSkipped.inc
-    }
+    offsetManager.checkpoint(partition)
   }
 
   def shutdownTask {
@@ -241,6 +221,6 @@ class TaskInstance(
 
   override def toString() = "TaskInstance for class %s and partition %s." 
format (task.getClass.getName, partition)
 
-  def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, 
commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, 
lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
+  def toDetailedString() = "TaskInstance [windowable=%s, closable=%s, 
collector_size=%s]" format (isWindowableTask, isClosableTask, 
collector.envelopes.size)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 46f1f17..7502124 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -31,9 +31,7 @@ class TaskInstanceMetrics(
   val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends 
MetricsHelper {
 
   val commits = newCounter("commit-calls")
-  val commitsSkipped = newCounter("commit-skipped")
   val windows = newCounter("window-calls")
-  val windowsSkipped = newCounter("window-skipped")
   val processes = newCounter("process-calls")
   val sends = newCounter("send-calls")
   val sendsSkipped = newCounter("send-skipped")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
index aaf631e..4ccd604 100644
--- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -19,16 +19,22 @@
 
 package org.apache.samza.task
 
-/** An in-memory implementation of TaskCoordinator that stores all 
coordination messages */
-class ReadableCoordinator extends TaskCoordinator {
-  var commitRequested = false
-  var shutdownRequested = false
+import org.apache.samza.task.TaskCoordinator.RequestScope
+import org.apache.samza.Partition
 
-  def commit { commitRequested = true }
+/**
+ * An in-memory implementation of TaskCoordinator that is specific to a single 
TaskInstance.
+ */
+class ReadableCoordinator(val partition: Partition) extends TaskCoordinator {
+  var commitRequest: Option[RequestScope] = None
+  var shutdownRequest: Option[RequestScope] = None
 
-  def isCommitRequested = commitRequested
+  override def commit(scope: RequestScope) { commitRequest = Some(scope) }
+  override def shutdown(scope: RequestScope) { shutdownRequest = Some(scope) }
 
-  def shutdown { shutdownRequested = true }
+  def requestedCommitTask = commitRequest.isDefined && commitRequest.get == 
RequestScope.CURRENT_TASK
+  def requestedCommitAll  = commitRequest.isDefined && commitRequest.get == 
RequestScope.ALL_TASKS_IN_CONTAINER
 
-  def isShutdownRequested = shutdownRequested
+  def requestedShutdownOnConsensus = shutdownRequest.isDefined && 
shutdownRequest.get == RequestScope.CURRENT_TASK
+  def requestedShutdownNow         = shutdownRequest.isDefined && 
shutdownRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
new file mode 100644
index 0000000..fa10231
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.junit.Test
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.mock.MockitoSugar
+import org.apache.samza.Partition
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, 
SystemStreamPartition}
+import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.task.TaskCoordinator.RequestScope
+
+class TestRunLoop extends AssertionsForJUnit with MockitoSugar with 
ShouldMatchers {
+  class StopRunLoop extends RuntimeException
+
+  val p0 = new Partition(0)
+  val p1 = new Partition(1)
+  val ssp0 = new SystemStreamPartition("testSystem", "testStream", p0)
+  val ssp1 = new SystemStreamPartition("testSystem", "testStream", p1)
+  val envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0")
+  val envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1")
+
+  @Test
+  def testProcessMessageFromChooser {
+    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val consumers = mock[SystemConsumers]
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics)
+
+    
when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new
 StopRunLoop)
+    intercept[StopRunLoop] { runLoop.run }
+    verify(taskInstances(p0)).process(Matchers.eq(envelope0), anyObject)
+    verify(taskInstances(p1)).process(Matchers.eq(envelope1), anyObject)
+    runLoop.metrics.envelopes.getCount should equal(2L)
+    runLoop.metrics.nullEnvelopes.getCount should equal(0L)
+  }
+
+  @Test
+  def testNullMessageFromChooser {
+    val consumers = mock[SystemConsumers]
+    val runLoop = new RunLoop(Map(p0 -> mock[TaskInstance]), consumers, new 
SamzaContainerMetrics)
+    when(consumers.choose).thenReturn(null).thenReturn(null).thenThrow(new 
StopRunLoop)
+    intercept[StopRunLoop] { runLoop.run }
+    runLoop.metrics.envelopes.getCount should equal(0L)
+    runLoop.metrics.nullEnvelopes.getCount should equal(2L)
+  }
+
+  @Test
+  def testWindowAndCommitAreCalledRegularly {
+    var now = 1400000000000L
+    val consumers = mock[SystemConsumers]
+    when(consumers.choose).thenReturn(envelope0)
+
+    val runLoop = new RunLoop(
+      taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]),
+      consumerMultiplexer = consumers,
+      metrics = new SamzaContainerMetrics,
+      windowMs = 60000, // call window once per minute
+      commitMs = 30000, // call commit twice per minute
+      clock = () => {
+        now += 100 // clock advances by 100 ms every time we look at it
+        if (now == 1400000290000L) throw new StopRunLoop // stop after 4 
minutes 50 seconds
+        now
+      })
+
+    intercept[StopRunLoop] { runLoop.run }
+
+    verify(runLoop.taskInstances(p0), times(5)).window(anyObject)
+    verify(runLoop.taskInstances(p1), times(5)).window(anyObject)
+    verify(runLoop.taskInstances(p0), times(10)).commit
+    verify(runLoop.taskInstances(p1), times(10)).commit
+  }
+
+  @Test
+  def testCommitCurrentTaskManually {
+    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val consumers = mock[SystemConsumers]
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+
+    
when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new
 StopRunLoop)
+    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.commit(RequestScope.CURRENT_TASK))
+
+    intercept[StopRunLoop] { runLoop.run }
+    verify(taskInstances(p0), times(1)).commit
+    verify(taskInstances(p1), times(0)).commit
+  }
+
+  @Test
+  def testCommitAllTasksManually {
+    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val consumers = mock[SystemConsumers]
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+
+    when(consumers.choose).thenReturn(envelope0).thenThrow(new StopRunLoop)
+    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER))
+
+    intercept[StopRunLoop] { runLoop.run }
+    verify(taskInstances(p0), times(1)).commit
+    verify(taskInstances(p1), times(1)).commit
+  }
+
+  @Test
+  def testShutdownOnConsensus {
+    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val consumers = mock[SystemConsumers]
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+
+    
when(consumers.choose).thenReturn(envelope0).thenReturn(envelope0).thenReturn(envelope1)
+    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
+    stubProcess(taskInstances(p1), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
+
+    runLoop.run
+    verify(taskInstances(p0), times(2)).process(Matchers.eq(envelope0), 
anyObject)
+    verify(taskInstances(p1), times(1)).process(Matchers.eq(envelope1), 
anyObject)
+  }
+
+  @Test
+  def testShutdownNow {
+    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val consumers = mock[SystemConsumers]
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+
+    when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1)
+    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER))
+
+    runLoop.run
+    verify(taskInstances(p0), times(1)).process(anyObject, anyObject)
+    verify(taskInstances(p1), times(0)).process(anyObject, anyObject)
+  }
+
+  def anyObject[T] = Matchers.anyObject.asInstanceOf[T]
+
+  // Stub out TaskInstance.process. Mockito really doesn't make this easy. :(
+  def stubProcess(taskInstance: TaskInstance, process: 
(IncomingMessageEnvelope, ReadableCoordinator) => Unit) {
+    when(taskInstance.process(anyObject, anyObject)).thenAnswer(new 
Answer[Unit]() {
+      override def answer(invocation: InvocationOnMock) {
+        val envelope    = 
invocation.getArguments()(0).asInstanceOf[IncomingMessageEnvelope]
+        val coordinator = 
invocation.getArguments()(1).asInstanceOf[ReadableCoordinator]
+        process(envelope, coordinator)
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a2d5820..3c9272c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -95,9 +95,14 @@ class TestSamzaContainer {
       new TaskInstanceMetrics,
       consumerMultiplexer: SystemConsumers,
       producerMultiplexer: SystemProducers)
+    val runLoop = new RunLoop(
+      taskInstances = Map(partition -> taskInstance),
+      consumerMultiplexer = consumerMultiplexer,
+      metrics = new SamzaContainerMetrics
+    )
     val container = new SamzaContainer(
       Map(partition -> taskInstance),
-      config,
+      runLoop,
       consumerMultiplexer,
       producerMultiplexer,
       new SamzaContainerMetrics)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 27b4ca5..1f5e3bb 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -71,7 +71,8 @@ class TestTaskInstance {
       producerMultiplexer,
       offsetManager)
     // Pretend we got a message with offset 2 and next offset 3.
-    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, 
"2", null, null), new ReadableCoordinator)
+    val coordinator = new ReadableCoordinator(partition)
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, 
"2", null, null), coordinator)
     // Check to see if the offset manager has been properly updated with 
offset 3.
     val lastProcessedOffset = 
offsetManager.getLastProcessedOffset(systemStreamPartition)
     assertTrue(lastProcessedOffset.isDefined)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
index c45ed9b..12f1e03 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -21,25 +21,47 @@ package org.apache.samza.task
 
 import org.junit.Assert._
 import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.task.TaskCoordinator.RequestScope
 
 class TestReadableCoordinator {
   @Test
-  def testCommit {
-    val coord = new ReadableCoordinator
-    assert(!coord.isCommitRequested)
-    coord.commit
-    assert(coord.isCommitRequested)
-    coord.commit
-    assert(coord.isCommitRequested)
+  def testCommitTask {
+    val coord = new ReadableCoordinator(new Partition(0))
+    assertFalse(coord.requestedCommitTask)
+    assertFalse(coord.requestedCommitAll)
+    coord.commit(RequestScope.CURRENT_TASK)
+    assertTrue(coord.requestedCommitTask)
+    assertFalse(coord.requestedCommitAll)
   }
 
   @Test
-  def testShutdown {
-    val coord = new ReadableCoordinator
-    assert(!coord.isShutdownRequested)
-    coord.shutdown
-    assert(coord.isShutdownRequested)
-    coord.shutdown
-    assert(coord.isShutdownRequested)
+  def testCommitAll {
+    val coord = new ReadableCoordinator(new Partition(0))
+    assertFalse(coord.requestedCommitTask)
+    assertFalse(coord.requestedCommitAll)
+    coord.commit(RequestScope.ALL_TASKS_IN_CONTAINER)
+    assertFalse(coord.requestedCommitTask)
+    assertTrue(coord.requestedCommitAll)
+  }
+
+  @Test
+  def testShutdownNow {
+    val coord = new ReadableCoordinator(new Partition(0))
+    assertFalse(coord.requestedShutdownOnConsensus)
+    assertFalse(coord.requestedShutdownNow)
+    coord.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
+    assertFalse(coord.requestedShutdownOnConsensus)
+    assertTrue(coord.requestedShutdownNow)
+  }
+
+  @Test
+  def testShutdownRequest {
+    val coord = new ReadableCoordinator(new Partition(0))
+    assertFalse(coord.requestedShutdownOnConsensus)
+    assertFalse(coord.requestedShutdownNow)
+    coord.shutdown(RequestScope.CURRENT_TASK)
+    assertTrue(coord.requestedShutdownOnConsensus)
+    assertFalse(coord.requestedShutdownNow)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
index 46f6ef6..52a8059 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
@@ -29,6 +29,7 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskCoordinator.RequestScope;
 
 public class SimpleStatefulTask implements StreamTask, InitableTask {
   
@@ -48,7 +49,7 @@ public class SimpleStatefulTask implements StreamTask, 
InitableTask {
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     System.out.println("Adding " + envelope.getMessage() + " => " + 
envelope.getMessage() + " to the store.");
     store.put((String) envelope.getMessage(), (String) envelope.getMessage());
-    coordinator.commit();
+    coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
index d84db5e..25417a6 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
@@ -27,6 +27,7 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskCoordinator.RequestScope;
 
 public class StatePerfTestTask implements StreamTask, InitableTask {
   
@@ -47,7 +48,7 @@ public class StatePerfTestTask implements StreamTask, 
InitableTask {
       System.out.println(String.format("Throughput = %.2f messages/sec.", 
count/ellapsedSecs));
       start = System.currentTimeMillis();
       count = 0;
-      coordinator.commit();
+      coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index b3be653..222c130 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -29,6 +29,7 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskCoordinator.RequestScope;
 import org.apache.samza.task.WindowableTask;
 
 /**
@@ -66,7 +67,7 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
       // it's a new era, reset current epoch and count
       this.state.put(EPOCH, Integer.toString(epoch));
       this.state.put(COUNT, "0");
-      coordinator.commit();
+      coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 49dffa1..23d122e 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -25,6 +25,7 @@ import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.config.Config
 import grizzled.slf4j.Logging
 import org.apache.samza.system.SystemStream
@@ -104,7 +105,7 @@ class TestPerformanceTask extends StreamTask with 
InitableTask with Logging {
     }
 
     if (messagesProcessed >= maxMessages) {
-      coordinator.shutdown
+      coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f9aab18c/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 6fdfcfc..10502a9 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -71,6 +71,7 @@ import java.util.concurrent.Executors
 import kafka.message.MessageAndOffset
 import kafka.message.MessageAndMetadata
 import org.apache.samza.job.StreamJob
+import org.apache.samza.task.TaskCoordinator.RequestScope
 
 object TestStatefulTask {
   val INPUT_TOPIC = "input"
@@ -448,7 +449,7 @@ class TestTask extends StreamTask with InitableTask {
       store.put(msg, msg)
     }
 
-    coordinator.commit
+    coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER)
 
     // Notify sender that we got a message.
     gotMessage.countDown

Reply via email to