Repository: samza Updated Branches: refs/heads/master 603bd8eac -> 6e5e1621a
SAMZA-1832: Fix race condition between StreamProcessor and SamzaContainerListener The PR addresses the following issues - We have few scenarios where there is a race condition between `SamzaContainerListener` and `StreamProcessor` that results in incorrect application status being propagated to client > Consider the scenario when the container runs into an exception in run loop > and triggers shutdown sequence and at the same time the user triggers a stop > on the `StreamProcessor`. The user request comes in and notices that the > container is already shutting down and proceeds to shutdown the job > coordinator which in turns shuts down successfully and the > `processorListener.onStop()` is invoked. The container eventually invokes the > `SamzaContainerListener` callback and updates the exception state after the > `StreamProcessor` has finished shutting down. - Currently, we only propagate failures to `processorListener` when containerException is not null. It is possible for the samza container to take longer than `task.shutdown.ms` to shutdown in which case, we need to propagate a timeout exception to the `processorListener` as opposed assuming the shutdown was successful. - Make container shutdown idempotent since its only setting a boolean flag to true and there is no need to throw an exception on subsequent invocations. 1. It simplifies the interaction of other components (`StreamProcessor`) with container and prevents unnecessary check on container state to determine if its safe to call shutdown or not. 2. Enables to reason about the impact of container state change on `StreamProcessor` easily since we restrict container interaction w/ `StreamProcessor` only via `SamzaContainerListener`. Author: bharathkk <codin.mart...@gmail.com> Reviewers: Yi Pan <nickpa...@gmail.com>, Shanthoosh Venkatraman <svenk...@linkedin.com> Closes #673 from bharathkk/samza-1832-alternative and squashes the following commits: 131d79681 [bharathkk] Addressed PR comments 11d577dda [bharathkk] Merge with latest master; dummy commit to trigger tests 60792166e [bharathkk] Address PR comments 5e6acda5c [bharathkk] Minor edits 891f65617 [bharathkk] Randomize task.shutdown.ms to make sure the status returned is deterministic in all scenarios 5ef5963be [bharathkk] Minor edits 605caa572 [bharathkk] Fix race condition between SamzaContainerListener and StreamProcessor Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6e5e1621 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6e5e1621 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6e5e1621 Branch: refs/heads/master Commit: 6e5e1621abdbbeccdea9906081676c552b1b213b Parents: 603bd8e Author: bharathkk <codin.mart...@gmail.com> Authored: Thu Oct 4 10:56:21 2018 -0700 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Thu Oct 4 10:56:21 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/processor/StreamProcessor.java | 64 ++++++---- .../apache/samza/container/SamzaContainer.scala | 17 +-- .../samza/processor/TestStreamProcessor.java | 7 +- .../test/framework/FaultInjectionTest.java | 126 +++++++++++++++++++ ...StreamApplicationIntegrationTestHarness.java | 8 +- .../processor/TestZkLocalApplicationRunner.java | 10 +- 6 files changed, 185 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 7910216..26e52f2 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -28,11 +28,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.container.IllegalContainerStateException; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.coordinator.JobCoordinator; @@ -101,10 +101,10 @@ public class StreamProcessor { private final Config config; private final long taskShutdownMs; private final String processorId; - private final ExecutorService executorService; + private final ExecutorService containerExcecutorService; private final Object lock = new Object(); - private Throwable containerException = null; + private volatile Throwable containerException = null; volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1); @@ -198,7 +198,7 @@ public class StreamProcessor { this.jobCoordinatorListener = createJobCoordinatorListener(); this.jobCoordinator.setListener(jobCoordinatorListener); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build(); - this.executorService = Executors.newSingleThreadExecutor(threadFactory); + this.containerExcecutorService = Executors.newSingleThreadExecutor(threadFactory); // TODO: remove the dependency on jobCoordinator for processorId after fixing SAMZA-1835 this.processorId = this.jobCoordinator.getProcessorId(); this.processorListener = listenerFactory.createInstance(this); @@ -258,7 +258,7 @@ public class StreamProcessor { boolean hasContainerShutdown = stopSamzaContainer(); if (!hasContainerShutdown) { LOGGER.info("Interrupting the container: {} thread to die.", container); - executorService.shutdownNow(); + containerExcecutorService.shutdownNow(); } } catch (Throwable throwable) { LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable); @@ -298,22 +298,28 @@ public class StreamProcessor { private boolean stopSamzaContainer() { boolean hasContainerShutdown = true; if (container != null) { - if (!container.hasStopped()) { - try { - container.shutdown(); - LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container); - hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - } catch (IllegalContainerStateException icse) { - LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); - } catch (Exception e) { - LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); - hasContainerShutdown = false; + try { + container.shutdown(); + LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container); + hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); + hasContainerShutdown = false; + if (containerException != null) { + containerException = e; } - LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown)); - } else { - LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); } + LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown)); + } + + // We want to propagate TimeoutException when container shutdown times out. It is possible that the timeout exception + // we propagate to the application runner maybe overwritten by container failure cause in case of interleaved execution. + // It is acceptable since container exception is much more useful compared to timeout exception. + // We can infer from the logs about the fact that container shutdown timed out or not for additional inference. + if (!hasContainerShutdown && containerException == null) { + containerException = new TimeoutException("Container shutdown timed out after " + taskShutdownMs + " ms."); } + return hasContainerShutdown; } @@ -348,7 +354,7 @@ public class StreamProcessor { container = createSamzaContainer(processorId, jobModel); container.setContainerListener(new ContainerListener()); LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - executorService.submit(container); + containerExcecutorService.submit(container); } else { LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE); } @@ -359,8 +365,12 @@ public class StreamProcessor { public void onCoordinatorStop() { synchronized (lock) { LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId); - stopSamzaContainer(); - executorService.shutdownNow(); + boolean hasContainerShutdown = stopSamzaContainer(); + + // we only want to interrupt when container shutdown times out. + if (!hasContainerShutdown) { + containerExcecutorService.shutdownNow(); + } state = State.STOPPED; } if (containerException != null) @@ -374,8 +384,12 @@ public class StreamProcessor { public void onCoordinatorFailure(Throwable throwable) { synchronized (lock) { LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable); - stopSamzaContainer(); - executorService.shutdownNow(); + boolean hasContainerShutdown = stopSamzaContainer(); + + // we only want to interrupt when container shutdown times out. + if (!hasContainerShutdown) { + containerExcecutorService.shutdownNow(); + } state = State.STOPPED; } processorListener.afterFailure(throwable); @@ -413,6 +427,7 @@ public class StreamProcessor { @Override public void afterStop() { containerShutdownLatch.countDown(); + synchronized (lock) { if (state == State.IN_REBALANCE) { LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); @@ -426,11 +441,12 @@ public class StreamProcessor { @Override public void afterFailure(Throwable t) { + containerException = t; containerShutdownLatch.countDown(); + synchronized (lock) { LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t); state = State.STOPPING; - containerException = t; jobCoordinator.stop(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 417fc18..5c4723b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -878,9 +878,11 @@ class SamzaContainer( * @throws SamzaException, Thrown when the container has already been stopped or failed */ def shutdown(): Unit = { - if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) { - throw new IllegalContainerStateException("Cannot shutdown a container with status " + status) + if (status == SamzaContainerStatus.FAILED || status == SamzaContainerStatus.STOPPED) { + warn("Shutdown is no-op since the container is already in state: " + status) + return } + shutdownRunLoop() } @@ -1182,14 +1184,3 @@ class SamzaContainer( } } } - -/** - * Exception thrown when the SamzaContainer tries to transition to an illegal state. - * {@link SamzaContainerStatus} has more details on the state transitions. - * - * @param s String, Message associated with the exception - * @param t Throwable, Wrapped error/exception thrown, if any. - */ -class IllegalContainerStateException(s: String, t: Throwable) extends SamzaException(s, t) { - def this(s: String) = this(s, null) -} http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 673015a..93b157a 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -49,7 +49,7 @@ import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -392,14 +392,15 @@ public class TestStreamProcessor { ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); + StreamProcessor streamProcessor = new TestableStreamProcessor(config, new HashMap<>(), null, + lifecycleListener, mockJobCoordinator, mockSamzaContainer); - streamProcessor.container = mockSamzaContainer; streamProcessor.state = State.IN_REBALANCE; Mockito.doNothing().when(mockSamzaContainer).run(); streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>())); + Mockito.verify(mockSamzaContainer, Mockito.times(1)).setContainerListener(any()); Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run(); } http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java new file mode 100644 index 0000000..2dc76a4 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.framework; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.TaskApplicationDescriptor; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.task.ClosableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.test.operator.data.PageView; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness { + private static final String PAGE_VIEWS = "page-views"; + + @Test + public void testRaceCondition() throws InterruptedException { + int taskShutdownInMs = (int) (Math.random() * 10000); + + CountDownLatch containerShutdownLatch = new CountDownLatch(1); + Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.zk.ZkCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); + configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views"); + configs.put(TaskConfig.INPUT_STREAMS(), "kafka.page-views"); + configs.put(ZkConfig.ZK_CONNECT, zkConnect()); + configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "5000"); + + // we purposefully randomize the task.shutdown.ms to make sure we can consistently verify if status is unsuccessfulFinish + // even though the reason for failure can either be container exception or container shutdown timing out. + configs.put("task.shutdown.ms", Integer.toString(taskShutdownInMs)); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + + FaultInjectionStreamApp app = new FaultInjectionStreamApp(); + FaultInjectionStreamApp.containerShutdownLatch = containerShutdownLatch; + RunApplicationContext context = + runApplication(app, "fault-injection-app", configs); + + containerShutdownLatch.await(); + context.getRunner().kill(); + context.getRunner().waitForFinish(); + assertEquals(context.getRunner().status(), ApplicationStatus.UnsuccessfulFinish); + } + + private static class FaultInjectionStreamApp implements TaskApplication { + public static final String SYSTEM = "kafka"; + public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName"; + private static transient CountDownLatch containerShutdownLatch; + + @Override + public void describe(TaskApplicationDescriptor appDesc) { + Config config = appDesc.getConfig(); + String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); + + final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde); + appDesc.addInputStream(isd); + appDesc.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch)); + } + + private static class FaultInjectionTask implements StreamTask, ClosableTask { + private final transient CountDownLatch containerShutdownLatch; + + public FaultInjectionTask(CountDownLatch containerShutdownLatch) { + this.containerShutdownLatch = containerShutdownLatch; + } + + @Override + public void close() throws Exception { + containerShutdownLatch.countDown(); + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + throw new RuntimeException("Failed"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java index 7f13282..0c3d755 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java @@ -35,7 +35,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.SamzaApplication; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; import org.apache.samza.config.MapConfig; @@ -48,7 +48,7 @@ import scala.Option; import scala.Option$; /** - * Harness for writing integration tests for {@link StreamApplication}s. + * Harness for writing integration tests for {@link SamzaApplication}s. * * <p> This provides the following features for its sub-classes: * <ul> @@ -74,7 +74,7 @@ import scala.Option$; * State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and * Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()} <br/> * - * Execution model: {@link StreamApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. + * Execution model: {@link SamzaApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. * Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads. * {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls. * @@ -217,7 +217,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration * @return RunApplicationContext which contains objects created within runApplication, to be used for verification * if necessary */ - protected RunApplicationContext runApplication(StreamApplication streamApplication, + protected RunApplicationContext runApplication(SamzaApplication streamApplication, String appName, Map<String, String> overriddenConfigs) { Map<String, String> configMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index b249d4d..2ee17c0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -617,6 +617,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Trigger re-balancing phase, by manually adding a new processor. configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + + // Reset the task shutdown ms for 3rd application to give it ample time to shutdown cleanly + configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS); Config applicationConfig3 = new MapConfig(configMap); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); @@ -629,13 +632,14 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); processedMessagesLatch3.await(); + appRunner1.waitForFinish(); + appRunner2.waitForFinish(); /** * If the processing has started in the third stream processor, then other two stream processors should be stopped. */ - // TODO: This is a bug! Status should be unsuccessful finish. - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner1.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner2.status()); appRunner3.kill(); appRunner3.waitForFinish();