Repository: samza
Updated Branches:
  refs/heads/master 603bd8eac -> 6e5e1621a


SAMZA-1832: Fix race condition between StreamProcessor and 
SamzaContainerListener

The PR addresses the following issues

- We have few scenarios where there is a race condition between 
`SamzaContainerListener` and `StreamProcessor` that results in incorrect 
application status being propagated to client

> Consider the scenario when the container runs into an exception in run loop 
> and triggers shutdown sequence and at the same time the user triggers a stop 
> on the `StreamProcessor`. The user request comes in and notices that the 
> container is already shutting down and proceeds to shutdown the job 
> coordinator which in turns shuts down successfully and the 
> `processorListener.onStop()` is invoked. The container eventually invokes the 
> `SamzaContainerListener` callback and updates the exception state after the 
> `StreamProcessor` has finished shutting down.

- Currently, we only propagate failures to `processorListener` when 
containerException is not null. It is possible for the samza container to take 
longer than `task.shutdown.ms` to shutdown in which case, we need to propagate 
a timeout exception to the `processorListener` as opposed assuming the shutdown 
was successful.

- Make container shutdown idempotent since its only setting a boolean flag to 
true and there is no need to throw an exception on subsequent invocations.
           1. It simplifies the interaction of other components 
(`StreamProcessor`) with container and prevents unnecessary check on container 
state to determine if its safe to call shutdown or not.
           2. Enables to reason about the impact of container state change on 
`StreamProcessor` easily since we restrict container interaction w/ 
`StreamProcessor` only via `SamzaContainerListener`.

Author: bharathkk <codin.mart...@gmail.com>

Reviewers: Yi Pan <nickpa...@gmail.com>, Shanthoosh Venkatraman 
<svenk...@linkedin.com>

Closes #673 from bharathkk/samza-1832-alternative and squashes the following 
commits:

131d79681 [bharathkk] Addressed PR comments
11d577dda [bharathkk] Merge with latest master; dummy commit to trigger tests
60792166e [bharathkk] Address PR comments
5e6acda5c [bharathkk] Minor edits
891f65617 [bharathkk] Randomize task.shutdown.ms to make sure the status 
returned is deterministic in all scenarios
5ef5963be [bharathkk] Minor edits
605caa572 [bharathkk] Fix race condition between SamzaContainerListener and 
StreamProcessor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6e5e1621
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6e5e1621
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6e5e1621

Branch: refs/heads/master
Commit: 6e5e1621abdbbeccdea9906081676c552b1b213b
Parents: 603bd8e
Author: bharathkk <codin.mart...@gmail.com>
Authored: Thu Oct 4 10:56:21 2018 -0700
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Thu Oct 4 10:56:21 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/processor/StreamProcessor.java |  64 ++++++----
 .../apache/samza/container/SamzaContainer.scala |  17 +--
 .../samza/processor/TestStreamProcessor.java    |   7 +-
 .../test/framework/FaultInjectionTest.java      | 126 +++++++++++++++++++
 ...StreamApplicationIntegrationTestHarness.java |   8 +-
 .../processor/TestZkLocalApplicationRunner.java |  10 +-
 6 files changed, 185 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 7910216..26e52f2 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -28,11 +28,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.container.IllegalContainerStateException;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.coordinator.JobCoordinator;
@@ -101,10 +101,10 @@ public class StreamProcessor {
   private final Config config;
   private final long taskShutdownMs;
   private final String processorId;
-  private final ExecutorService executorService;
+  private final ExecutorService containerExcecutorService;
   private final Object lock = new Object();
 
-  private Throwable containerException = null;
+  private volatile Throwable containerException = null;
 
   volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1);
 
@@ -198,7 +198,7 @@ public class StreamProcessor {
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
     ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
-    this.executorService = Executors.newSingleThreadExecutor(threadFactory);
+    this.containerExcecutorService = 
Executors.newSingleThreadExecutor(threadFactory);
     // TODO: remove the dependency on jobCoordinator for processorId after 
fixing SAMZA-1835
     this.processorId = this.jobCoordinator.getProcessorId();
     this.processorListener = listenerFactory.createInstance(this);
@@ -258,7 +258,7 @@ public class StreamProcessor {
           boolean hasContainerShutdown = stopSamzaContainer();
           if (!hasContainerShutdown) {
             LOGGER.info("Interrupting the container: {} thread to die.", 
container);
-            executorService.shutdownNow();
+            containerExcecutorService.shutdownNow();
           }
         } catch (Throwable throwable) {
           LOGGER.error(String.format("Exception occurred on container: %s 
shutdown of stream processor: %s.", container, processorId), throwable);
@@ -298,22 +298,28 @@ public class StreamProcessor {
   private boolean stopSamzaContainer() {
     boolean hasContainerShutdown = true;
     if (container != null) {
-      if (!container.hasStopped()) {
-        try {
-          container.shutdown();
-          LOGGER.info("Waiting {} ms for the container: {} to shutdown.", 
taskShutdownMs, container);
-          hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, 
TimeUnit.MILLISECONDS);
-        } catch (IllegalContainerStateException icse) {
-          LOGGER.info(String.format("Cannot shutdown container: %s for stream 
processor: %s. Container is not running.", container, processorId), icse);
-        } catch (Exception e) {
-          LOGGER.error("Exception occurred when shutting down the container: 
{}.", container, e);
-          hasContainerShutdown = false;
+      try {
+        container.shutdown();
+        LOGGER.info("Waiting {} ms for the container: {} to shutdown.", 
taskShutdownMs, container);
+        hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, 
TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOGGER.error("Exception occurred when shutting down the container: 
{}.", container, e);
+        hasContainerShutdown = false;
+        if (containerException != null) {
+          containerException = e;
         }
-        LOGGER.info(String.format("Shutdown status of container: %s for stream 
processor: %s is: %b.", container, processorId, hasContainerShutdown));
-      } else {
-        LOGGER.info("Container is not instantiated for stream processor: {}.", 
processorId);
       }
+      LOGGER.info(String.format("Shutdown status of container: %s for stream 
processor: %s is: %b.", container, processorId, hasContainerShutdown));
+    }
+
+    // We want to propagate TimeoutException when container shutdown times 
out. It is possible that the timeout exception
+    // we propagate to the application runner maybe overwritten by container 
failure cause in case of interleaved execution.
+    // It is acceptable since container exception is much more useful compared 
to timeout exception.
+    // We can infer from the logs about the fact that container shutdown timed 
out or not for additional inference.
+    if (!hasContainerShutdown && containerException == null) {
+      containerException = new TimeoutException("Container shutdown timed out 
after " + taskShutdownMs + " ms.");
     }
+
     return hasContainerShutdown;
   }
 
@@ -348,7 +354,7 @@ public class StreamProcessor {
             container = createSamzaContainer(processorId, jobModel);
             container.setContainerListener(new ContainerListener());
             LOGGER.info("Starting the container: {} for the stream processor: 
{}.", container, processorId);
-            executorService.submit(container);
+            containerExcecutorService.submit(container);
           } else {
             LOGGER.info("Ignoring onNewJobModel invocation since the current 
state is {} and not {}.", state, State.IN_REBALANCE);
           }
@@ -359,8 +365,12 @@ public class StreamProcessor {
       public void onCoordinatorStop() {
         synchronized (lock) {
           LOGGER.info("Shutting down the executor service of the stream 
processor: {}.", processorId);
-          stopSamzaContainer();
-          executorService.shutdownNow();
+          boolean hasContainerShutdown = stopSamzaContainer();
+
+          // we only want to interrupt when container shutdown times out.
+          if (!hasContainerShutdown) {
+            containerExcecutorService.shutdownNow();
+          }
           state = State.STOPPED;
         }
         if (containerException != null)
@@ -374,8 +384,12 @@ public class StreamProcessor {
       public void onCoordinatorFailure(Throwable throwable) {
         synchronized (lock) {
           LOGGER.info(String.format("Coordinator: %s failed with an exception. 
Stopping the stream processor: %s. Original exception:", jobCoordinator, 
processorId), throwable);
-          stopSamzaContainer();
-          executorService.shutdownNow();
+          boolean hasContainerShutdown = stopSamzaContainer();
+
+          // we only want to interrupt when container shutdown times out.
+          if (!hasContainerShutdown) {
+            containerExcecutorService.shutdownNow();
+          }
           state = State.STOPPED;
         }
         processorListener.afterFailure(throwable);
@@ -413,6 +427,7 @@ public class StreamProcessor {
     @Override
     public void afterStop() {
       containerShutdownLatch.countDown();
+
       synchronized (lock) {
         if (state == State.IN_REBALANCE) {
           LOGGER.info("Container: {} of the stream processor: {} was stopped 
by the JobCoordinator.", container, processorId);
@@ -426,11 +441,12 @@ public class StreamProcessor {
 
     @Override
     public void afterFailure(Throwable t) {
+      containerException = t;
       containerShutdownLatch.countDown();
+
       synchronized (lock) {
         LOGGER.error(String.format("Container: %s failed with an exception. 
Stopping the stream processor: %s. Original exception:", container, 
processorId), t);
         state = State.STOPPING;
-        containerException = t;
         jobCoordinator.stop();
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 417fc18..5c4723b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -878,9 +878,11 @@ class SamzaContainer(
    * @throws SamzaException, Thrown when the container has already been 
stopped or failed
    */
   def shutdown(): Unit = {
-    if (status == SamzaContainerStatus.STOPPED || status == 
SamzaContainerStatus.FAILED) {
-      throw new IllegalContainerStateException("Cannot shutdown a container 
with status " + status)
+    if (status == SamzaContainerStatus.FAILED || status == 
SamzaContainerStatus.STOPPED) {
+      warn("Shutdown is no-op since the container is already in state: " + 
status)
+      return
     }
+
     shutdownRunLoop()
   }
 
@@ -1182,14 +1184,3 @@ class SamzaContainer(
     }
   }
 }
-
-/**
- * Exception thrown when the SamzaContainer tries to transition to an illegal 
state.
- * {@link SamzaContainerStatus} has more details on the state transitions.
- *
- * @param s String, Message associated with the exception
- * @param t Throwable, Wrapped error/exception thrown, if any.
- */
-class IllegalContainerStateException(s: String, t: Throwable) extends 
SamzaException(s, t) {
-  def this(s: String) = this(s, null)
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 673015a..93b157a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -49,7 +49,7 @@ import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -392,14 +392,15 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = PowerMockito.spy(new 
StreamProcessor(config, new HashMap<>(), null, lifecycleListener, 
mockJobCoordinator));
+    StreamProcessor streamProcessor = new TestableStreamProcessor(config, new 
HashMap<>(), null,
+        lifecycleListener, mockJobCoordinator, mockSamzaContainer);
 
-    streamProcessor.container = mockSamzaContainer;
     streamProcessor.state = State.IN_REBALANCE;
     Mockito.doNothing().when(mockSamzaContainer).run();
 
     streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", 
new JobModel(new MapConfig(), new HashMap<>()));
 
+    Mockito.verify(mockSamzaContainer, 
Mockito.times(1)).setContainerListener(any());
     Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
new file mode 100644
index 0000000..2dc76a4
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.framework;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.task.ClosableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.operator.data.PageView;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class FaultInjectionTest extends 
StreamApplicationIntegrationTestHarness {
+  private static final String PAGE_VIEWS = "page-views";
+
+  @Test
+  public void testRaceCondition() throws InterruptedException {
+    int taskShutdownInMs = (int) (Math.random() * 10000);
+
+    CountDownLatch containerShutdownLatch = new CountDownLatch(1);
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.zk.ZkJobCoordinatorFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.zk.ZkCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views");
+    configs.put(TaskConfig.INPUT_STREAMS(), "kafka.page-views");
+    configs.put(ZkConfig.ZK_CONNECT, zkConnect());
+    configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "5000");
+
+    // we purposefully randomize the task.shutdown.ms to make sure we can 
consistently verify if status is unsuccessfulFinish
+    // even though the reason for failure can either be container exception or 
container shutdown timing out.
+    configs.put("task.shutdown.ms", Integer.toString(taskShutdownInMs));
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+
+    createTopic(PAGE_VIEWS, 2);
+
+    // create events for the following user activity.
+    // userId: (viewId, pageId, (adIds))
+    // u1: (v1, p1, (a1)), (v2, p2, (a3))
+    // u2: (v3, p1, (a1)), (v4, p3, (a5))
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+
+    FaultInjectionStreamApp app = new FaultInjectionStreamApp();
+    FaultInjectionStreamApp.containerShutdownLatch = containerShutdownLatch;
+    RunApplicationContext context =
+        runApplication(app, "fault-injection-app", configs);
+
+    containerShutdownLatch.await();
+    context.getRunner().kill();
+    context.getRunner().waitForFinish();
+    assertEquals(context.getRunner().status(), 
ApplicationStatus.UnsuccessfulFinish);
+  }
+
+  private static class FaultInjectionStreamApp implements TaskApplication {
+    public static final String SYSTEM = "kafka";
+    public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
+    private static transient CountDownLatch containerShutdownLatch;
+
+    @Override
+    public void describe(TaskApplicationDescriptor appDesc) {
+      Config config = appDesc.getConfig();
+      String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
+
+      final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
+      KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+      KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, 
serde);
+      appDesc.addInputStream(isd);
+      appDesc.setTaskFactory((StreamTaskFactory) () -> new 
FaultInjectionTask(containerShutdownLatch));
+    }
+
+    private static class FaultInjectionTask implements StreamTask, 
ClosableTask {
+      private final transient CountDownLatch containerShutdownLatch;
+
+      public FaultInjectionTask(CountDownLatch containerShutdownLatch) {
+        this.containerShutdownLatch = containerShutdownLatch;
+      }
+
+      @Override
+      public void close() throws Exception {
+        containerShutdownLatch.countDown();
+      }
+
+      @Override
+      public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+          throws Exception {
+        throw new RuntimeException("Failed");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 7f13282..0c3d755 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -35,7 +35,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
@@ -48,7 +48,7 @@ import scala.Option;
 import scala.Option$;
 
 /**
- * Harness for writing integration tests for {@link StreamApplication}s.
+ * Harness for writing integration tests for {@link SamzaApplication}s.
  *
  * <p> This provides the following features for its sub-classes:
  * <ul>
@@ -74,7 +74,7 @@ import scala.Option$;
  * State persistence: {@link #tearDown()} clears all associated state 
(including topics and metadata) in Kafka and
  * Zookeeper. Hence, the state is not durable across invocations of {@link 
#tearDown()} <br/>
  *
- * Execution model: {@link StreamApplication}s are run as their own {@link 
org.apache.samza.job.local.ThreadJob}s.
+ * Execution model: {@link SamzaApplication}s are run as their own {@link 
org.apache.samza.job.local.ThreadJob}s.
  * Similarly, embedded Kafka servers and Zookeeper servers are run as their 
own threads.
  * {@link #produceMessage(String, int, String, String)} and {@link 
#consumeMessages(Collection, int)} are blocking calls.
  *
@@ -217,7 +217,7 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
    * @return RunApplicationContext which contains objects created within 
runApplication, to be used for verification
    * if necessary
    */
-  protected RunApplicationContext runApplication(StreamApplication 
streamApplication,
+  protected RunApplicationContext runApplication(SamzaApplication 
streamApplication,
       String appName,
       Map<String, String> overriddenConfigs) {
     Map<String, String> configMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/6e5e1621/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index b249d4d..2ee17c0 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -617,6 +617,9 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     // Trigger re-balancing phase, by manually adding a new processor.
 
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+
+    // Reset the task shutdown ms for 3rd application to give it ample time to 
shutdown cleanly
+    configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS);
     Config applicationConfig3 = new MapConfig(configMap);
 
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
@@ -629,13 +632,14 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
     processedMessagesLatch3.await();
+    appRunner1.waitForFinish();
+    appRunner2.waitForFinish();
 
     /**
      * If the processing has started in the third stream processor, then other 
two stream processors should be stopped.
      */
-    // TODO: This is a bug! Status should be unsuccessful finish.
-    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
-    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status());
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner1.status());
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner2.status());
 
     appRunner3.kill();
     appRunner3.waitForFinish();

Reply via email to