Repository: samza Updated Branches: refs/heads/master 0fb025b2e -> 475b4654c
http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 010ff7e..bc4c47c 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -23,7 +23,7 @@ import java.util import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import org.apache.samza.Partition +import org.apache.samza.{SamzaContainerStatus, Partition} import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.apache.samza.config.{Config, MapConfig} import org.apache.samza.coordinator.JobModelManager @@ -37,7 +37,6 @@ import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, Stre import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.junit.Assert._ import org.junit.Test -import org.mockito.Mockito.when import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.junit.AssertionsForJUnit @@ -181,6 +180,11 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { consumerMultiplexer = consumerMultiplexer, metrics = new SamzaContainerMetrics, maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1)) + @volatile var onContainerFailedCalled = false + @volatile var onContainerStopCalled = false + @volatile var onContainerStartCalled = false + @volatile var onContainerFailedThrowable: Throwable = null + val container = new SamzaContainer( containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), @@ -188,19 +192,118 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics, - jmxServer = null + jmxServer = null) + + val containerListener = new SamzaContainerListener { + override def onContainerFailed(t: Throwable): Unit = { + onContainerFailedCalled = true + onContainerFailedThrowable = t + } + + override def onContainerStop(invokedExternally: Boolean): Unit = { + onContainerStopCalled = true + } + + override def onContainerStart(): Unit = { + onContainerStartCalled = true + } + } + container.setContainerListener(containerListener) + + container.run + assertTrue(task.wasShutdown) + assertFalse(onContainerStartCalled) + assertFalse(onContainerStopCalled) + + assertTrue(onContainerFailedCalled) + assertNotNull(onContainerFailedThrowable) + } + + // Exception in Runloop should cause SamzaContainer to transition to FAILED status, shutdown the components and then, + // invoke the callback + @Test + def testExceptionInTaskProcessRunLoop() { + val task = new StreamTask with InitableTask with ClosableTask { + var wasShutdown = false + + def init(config: Config, context: TaskContext) { + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + throw new Exception("Trigger a shutdown, please.") + } + + def close { + wasShutdown = true + } + } + val config = new MapConfig + val taskName = new TaskName("taskName") + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName)) + val taskInstance: TaskInstance = new TaskInstance( + task, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext ) - try { - container.run - fail("Expected exception to be thrown in run method.") - } catch { - case e: Exception => // Expected + + @volatile var onContainerFailedCalled = false + @volatile var onContainerStopCalled = false + @volatile var onContainerStartCalled = false + @volatile var onContainerFailedThrowable: Throwable = null + + val mockRunLoop = mock[RunLoop] + when(mockRunLoop.run).thenThrow(new RuntimeException("Trigger a shutdown, please.")) + + val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = mockRunLoop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = new SamzaContainerMetrics, + jmxServer = null) + val containerListener = new SamzaContainerListener { + override def onContainerFailed(t: Throwable): Unit = { + onContainerFailedCalled = true + onContainerFailedThrowable = t + } + + override def onContainerStop(invokedExternally: Boolean): Unit = { + onContainerStopCalled = true + } + + override def onContainerStart(): Unit = { + onContainerStartCalled = true + } } + container.setContainerListener(containerListener) + + container.run assertTrue(task.wasShutdown) + assertTrue(onContainerStartCalled) + + assertFalse(onContainerStopCalled) + + assertTrue(onContainerFailedCalled) + assertNotNull(onContainerFailedThrowable) + + assertEquals(SamzaContainerStatus.FAILED, container.getStatus()) } @Test - def testErrorInTaskInitShutsDownTask { + def testErrorInTaskInitShutsDownTask() { val task = new StreamTask with InitableTask with ClosableTask { var wasShutdown = false @@ -240,6 +343,11 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { consumerMultiplexer = consumerMultiplexer, metrics = new SamzaContainerMetrics, maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1)) + @volatile var onContainerFailedCalled = false + @volatile var onContainerStopCalled = false + @volatile var onContainerStartCalled = false + @volatile var onContainerFailedThrowable: Throwable = null + val container = new SamzaContainer( containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), @@ -247,15 +355,187 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics, - jmxServer = null - ) - try { - container.run - fail("Expected error to be thrown in run method.") - } catch { - case e: Throwable => // Expected + jmxServer = null) + val containerListener = new SamzaContainerListener { + override def onContainerFailed(t: Throwable): Unit = { + onContainerFailedCalled = true + onContainerFailedThrowable = t + } + + override def onContainerStop(invokedExternally: Boolean): Unit = { + onContainerStopCalled = true + } + + override def onContainerStart(): Unit = { + onContainerStartCalled = true + } } + container.setContainerListener(containerListener) + + container.run + assertTrue(task.wasShutdown) + + assertFalse(onContainerStopCalled) + assertFalse(onContainerStartCalled) + + assertTrue(onContainerFailedCalled) + assertNotNull(onContainerFailedThrowable) + } + + @Test + def testRunloopShutdownIsClean(): Unit = { + val task = new StreamTask with InitableTask with ClosableTask { + var wasShutdown = false + + def init(config: Config, context: TaskContext) { + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + } + + def close { + wasShutdown = true + } + } + val config = new MapConfig + val taskName = new TaskName("taskName") + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName)) + val taskInstance: TaskInstance = new TaskInstance( + task, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext + ) + + @volatile var onContainerFailedCalled = false + @volatile var onContainerStopCalled = false + @volatile var onContainerStartCalled = false + @volatile var onContainerFailedThrowable: Throwable = null + + val mockRunLoop = mock[RunLoop] + when(mockRunLoop.run).then(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + Thread.sleep(100) + } + }) + + val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = mockRunLoop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = new SamzaContainerMetrics, + jmxServer = null) + val containerListener = new SamzaContainerListener { + override def onContainerFailed(t: Throwable): Unit = { + onContainerFailedCalled = true + onContainerFailedThrowable = t + } + + override def onContainerStop(invokedExternally: Boolean): Unit = { + onContainerStopCalled = true + } + + override def onContainerStart(): Unit = { + onContainerStartCalled = true + } + } + container.setContainerListener(containerListener) + + container.run + assertFalse(onContainerFailedCalled) + assertTrue(onContainerStartCalled) + assertTrue(onContainerStopCalled) + } + + @Test + def testFailureDuringShutdown: Unit = { + val task = new StreamTask with InitableTask with ClosableTask { + def init(config: Config, context: TaskContext) { + } + + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + + } + + def close { + throw new Exception("Exception during shutdown, please.") + } + } + val config = new MapConfig + val taskName = new TaskName("taskName") + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName)) + val taskInstance: TaskInstance = new TaskInstance( + task, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext + ) + + @volatile var onContainerFailedCalled = false + @volatile var onContainerStopCalled = false + @volatile var onContainerStartCalled = false + @volatile var onContainerFailedThrowable: Throwable = null + + val mockRunLoop = mock[RunLoop] + when(mockRunLoop.run).then(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + Thread.sleep(100) + } + }) + + val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = mockRunLoop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = new SamzaContainerMetrics, + jmxServer = null) + + val containerListener = new SamzaContainerListener { + override def onContainerFailed(t: Throwable): Unit = { + onContainerFailedCalled = true + onContainerFailedThrowable = t + } + + override def onContainerStop(invokedExternally: Boolean): Unit = { + onContainerStopCalled = true + } + + override def onContainerStart(): Unit = { + onContainerStartCalled = true + } + } + container.setContainerListener(containerListener) + + container.run + + assertTrue(onContainerFailedCalled) } @Test @@ -303,8 +583,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = containerMetrics, - jmxServer = null - ) + jmxServer = null) + container.startStores assertNotNull(containerMetrics.taskStoreRestorationMetrics) assertNotNull(containerMetrics.taskStoreRestorationMetrics.get(taskName)) http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala new file mode 100644 index 0000000..f5a8ee5 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.processor + +import java.util.Collections + +import org.apache.samza.config.MapConfig +import org.apache.samza.container.{SamzaContainerListener, RunLoop, SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, TaskInstanceMetrics, TaskName} +import org.apache.samza.serializers.SerdeManager +import org.apache.samza.system.chooser.RoundRobinChooser +import org.apache.samza.system.{SystemConsumer, SystemConsumers, SystemProducer, SystemProducers} +import org.apache.samza.task.{StreamTask, TaskInstanceCollector} + + +object StreamProcessorTestUtils { + def getDummyContainer(mockRunloop: RunLoop, containerListener: SamzaContainerListener, streamTask: StreamTask) = { + val config = new MapConfig + val taskName = new TaskName("taskName") + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName)) + val taskInstance: TaskInstance = new TaskInstance( + streamTask, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext + ) + + val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = mockRunloop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = new SamzaContainerMetrics, + jmxServer = null) + if (containerListener != null) { + container.setContainerListener(containerListener) + } + container + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index a786468..f5bc73a 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,10 +19,6 @@ package org.apache.samza.system.kafka; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; @@ -31,7 +27,13 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.junit.Assert.*; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index f37a224..75609aa 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -260,7 +260,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { boolean latchResult = false; processor.start(); try { - processor.awaitStart(10000); + Thread.sleep(10000); latchResult = latch.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 29fb6d3..59e9a89 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -222,7 +222,8 @@ class StreamTaskTestUtil { def stopJob(job: StreamJob) { // Shutdown task. job.kill - assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(60000)) + val status = job.waitForFinish(60000) + assertEquals(ApplicationStatus.UnsuccessfulFinish, status) } /**
