Repository: samza
Updated Branches:
  refs/heads/master 0fb025b2e -> 475b4654c


http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 010ff7e..bc4c47c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -23,7 +23,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.Partition
+import org.apache.samza.{SamzaContainerStatus, Partition}
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
@@ -37,7 +37,6 @@ import org.apache.samza.task.{ClosableTask, InitableTask, 
MessageCollector, Stre
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
-import org.mockito.Mockito.when
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
@@ -181,6 +180,11 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       metrics = new SamzaContainerMetrics,
       maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
     val container = new SamzaContainer(
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
@@ -188,19 +192,118 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics,
-      jmxServer = null
+      jmxServer = null)
+
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        onContainerFailedCalled = true
+        onContainerFailedThrowable = t
+      }
+
+      override def onContainerStop(invokedExternally: Boolean): Unit = {
+        onContainerStopCalled = true
+      }
+
+      override def onContainerStart(): Unit = {
+        onContainerStartCalled = true
+      }
+    }
+    container.setContainerListener(containerListener)
+
+    container.run
+    assertTrue(task.wasShutdown)
+    assertFalse(onContainerStartCalled)
+    assertFalse(onContainerStopCalled)
+
+    assertTrue(onContainerFailedCalled)
+    assertNotNull(onContainerFailedThrowable)
+  }
+
+  // Exception in Runloop should cause SamzaContainer to transition to FAILED 
status, shutdown the components and then,
+  // invoke the callback
+  @Test
+  def testExceptionInTaskProcessRunLoop() {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      var wasShutdown = false
+
+      def init(config: Config, context: TaskContext) {
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
+        throw new Exception("Trigger a shutdown, please.")
+      }
+
+      def close {
+        wasShutdown = true
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, 
Set[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
     )
-    try {
-      container.run
-      fail("Expected exception to be thrown in run method.")
-    } catch {
-      case e: Exception => // Expected
+
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
+    val mockRunLoop = mock[RunLoop]
+    when(mockRunLoop.run).thenThrow(new RuntimeException("Trigger a shutdown, 
please."))
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        onContainerFailedCalled = true
+        onContainerFailedThrowable = t
+      }
+
+      override def onContainerStop(invokedExternally: Boolean): Unit = {
+        onContainerStopCalled = true
+      }
+
+      override def onContainerStart(): Unit = {
+        onContainerStartCalled = true
+      }
     }
+    container.setContainerListener(containerListener)
+
+    container.run
     assertTrue(task.wasShutdown)
+    assertTrue(onContainerStartCalled)
+
+    assertFalse(onContainerStopCalled)
+
+    assertTrue(onContainerFailedCalled)
+    assertNotNull(onContainerFailedThrowable)
+
+    assertEquals(SamzaContainerStatus.FAILED, container.getStatus())
   }
 
   @Test
-  def testErrorInTaskInitShutsDownTask {
+  def testErrorInTaskInitShutsDownTask() {
     val task = new StreamTask with InitableTask with ClosableTask {
       var wasShutdown = false
 
@@ -240,6 +343,11 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       metrics = new SamzaContainerMetrics,
       maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
     val container = new SamzaContainer(
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
@@ -247,15 +355,187 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics,
-      jmxServer = null
-    )
-    try {
-      container.run
-      fail("Expected error to be thrown in run method.")
-    } catch {
-      case e: Throwable => // Expected
+      jmxServer = null)
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        onContainerFailedCalled = true
+        onContainerFailedThrowable = t
+      }
+
+      override def onContainerStop(invokedExternally: Boolean): Unit = {
+        onContainerStopCalled = true
+      }
+
+      override def onContainerStart(): Unit = {
+        onContainerStartCalled = true
+      }
     }
+    container.setContainerListener(containerListener)
+
+    container.run
+
     assertTrue(task.wasShutdown)
+
+    assertFalse(onContainerStopCalled)
+    assertFalse(onContainerStartCalled)
+
+    assertTrue(onContainerFailedCalled)
+    assertNotNull(onContainerFailedThrowable)
+  }
+
+  @Test
+  def testRunloopShutdownIsClean(): Unit = {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      var wasShutdown = false
+
+      def init(config: Config, context: TaskContext) {
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
+      }
+
+      def close {
+        wasShutdown = true
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, 
Set[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
+    val mockRunLoop = mock[RunLoop]
+    when(mockRunLoop.run).then(new Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        Thread.sleep(100)
+      }
+    })
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+      val containerListener = new SamzaContainerListener {
+        override def onContainerFailed(t: Throwable): Unit = {
+          onContainerFailedCalled = true
+          onContainerFailedThrowable = t
+        }
+
+        override def onContainerStop(invokedExternally: Boolean): Unit = {
+          onContainerStopCalled = true
+        }
+
+        override def onContainerStart(): Unit = {
+          onContainerStartCalled = true
+        }
+      }
+    container.setContainerListener(containerListener)
+
+    container.run
+    assertFalse(onContainerFailedCalled)
+    assertTrue(onContainerStartCalled)
+    assertTrue(onContainerStopCalled)
+  }
+
+  @Test
+  def testFailureDuringShutdown: Unit = {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      def init(config: Config, context: TaskContext) {
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
+
+      }
+
+      def close {
+        throw new Exception("Exception during shutdown, please.")
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, 
Set[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
+    val mockRunLoop = mock[RunLoop]
+    when(mockRunLoop.run).then(new Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        Thread.sleep(100)
+      }
+    })
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+
+    val containerListener = new SamzaContainerListener {
+        override def onContainerFailed(t: Throwable): Unit = {
+          onContainerFailedCalled = true
+          onContainerFailedThrowable = t
+        }
+
+        override def onContainerStop(invokedExternally: Boolean): Unit = {
+          onContainerStopCalled = true
+        }
+
+        override def onContainerStart(): Unit = {
+          onContainerStartCalled = true
+        }
+      }
+    container.setContainerListener(containerListener)
+
+    container.run
+
+    assertTrue(onContainerFailedCalled)
   }
 
   @Test
@@ -303,8 +583,8 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = containerMetrics,
-      jmxServer = null
-    )
+      jmxServer = null)
+
     container.startStores
     assertNotNull(containerMetrics.taskStoreRestorationMetrics)
     assertNotNull(containerMetrics.taskStoreRestorationMetrics.get(taskName))

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
new file mode 100644
index 0000000..f5a8ee5
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.processor
+
+import java.util.Collections
+
+import org.apache.samza.config.MapConfig
+import org.apache.samza.container.{SamzaContainerListener, RunLoop, 
SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, 
TaskInstanceMetrics, TaskName}
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.chooser.RoundRobinChooser
+import org.apache.samza.system.{SystemConsumer, SystemConsumers, 
SystemProducer, SystemProducers}
+import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
+
+
+object StreamProcessorTestUtils {
+  def getDummyContainer(mockRunloop: RunLoop, containerListener: 
SamzaContainerListener, streamTask: StreamTask) = {
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, 
Collections.singleton[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      streamTask,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunloop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+    if (containerListener != null) {
+      container.setContainerListener(containerListener)
+    }
+    container
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index a786468..f5bc73a 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,10 +19,6 @@
 
 package org.apache.samza.system.kafka;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
@@ -31,7 +27,13 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 
 public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index f37a224..75609aa 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -260,7 +260,7 @@ public class TestStreamProcessor extends 
StandaloneIntegrationTestHarness {
     boolean latchResult = false;
     processor.start();
     try {
-      processor.awaitStart(10000);
+      Thread.sleep(10000);
       latchResult = latch.await(10, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 29fb6d3..59e9a89 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -222,7 +222,8 @@ class StreamTaskTestUtil {
   def stopJob(job: StreamJob) {
     // Shutdown task.
     job.kill
-    assertEquals(ApplicationStatus.UnsuccessfulFinish, 
job.waitForFinish(60000))
+    val status = job.waitForFinish(60000)
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, status)
   }
 
   /**

Reply via email to