This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b42f37  SAMZA-2258: Log thread dump when container stop times out 
during processor rebalancing (#1087)
9b42f37 is described below

commit 9b42f379d29a92575594519f5b3ff3772c554312
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Jun 25 11:40:19 2019 -0700

    SAMZA-2258: Log thread dump when container stop times out during processor 
rebalancing (#1087)
---
 .../apache/samza/processor/StreamProcessor.java    |   8 +-
 .../samza/processor/TestStreamProcessor.java       | 121 +++++++++++++++++----
 2 files changed, 107 insertions(+), 22 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 75dc62d..9877a62 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -59,6 +59,7 @@ import org.apache.samza.task.TaskFactory;
 import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -388,8 +389,11 @@ public class StreamProcessor {
     // we propagate to the application runner maybe overwritten by container 
failure cause in case of interleaved execution.
     // It is acceptable since container exception is much more useful compared 
to timeout exception.
     // We can infer from the logs about the fact that container shutdown timed 
out or not for additional inference.
-    if (!hasContainerShutdown && containerException == null) {
-      containerException = new TimeoutException("Container shutdown timed out 
after " + taskShutdownMs + " ms.");
+    if (!hasContainerShutdown) {
+      ThreadUtil.logThreadDump("Thread dump at failure for stopping container 
in stream processor");
+      if (containerException == null) {
+        containerException = new TimeoutException("Container shutdown timed 
out after " + taskShutdownMs + " ms.");
+      }
     }
 
     return hasContainerShutdown;
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index d5bce16..b809ead 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -18,7 +18,8 @@
  */
 package org.apache.samza.processor;
 
-import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -26,13 +27,16 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.container.SamzaContainerStatus;
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.RunLoop;
 import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainerStatus;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -50,6 +54,8 @@ import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -84,17 +90,29 @@ public class TestStreamProcessor {
   class TestableStreamProcessor extends StreamProcessor {
     private final CountDownLatch containerStop = new CountDownLatch(1);
     private final CountDownLatch runLoopStartForMain = new CountDownLatch(1);
-    public SamzaContainer container = null;
+    private SamzaContainer container = null;
+    private final Duration runLoopShutdownDuration;
 
-    public TestableStreamProcessor(
-        Config config,
+    public TestableStreamProcessor(Config config,
         Map<String, MetricsReporter> customMetricsReporters,
         StreamTaskFactory streamTaskFactory,
         ProcessorLifecycleListener processorListener,
         JobCoordinator jobCoordinator,
         SamzaContainer container) {
-      super("TEST_PROCESSOR_ID", config,  customMetricsReporters, 
streamTaskFactory, processorListener, jobCoordinator);
+      this(config, customMetricsReporters, streamTaskFactory, 
processorListener, jobCoordinator, container,
+          Duration.ZERO);
+    }
+
+    public TestableStreamProcessor(Config config,
+        Map<String, MetricsReporter> customMetricsReporters,
+        StreamTaskFactory streamTaskFactory,
+        ProcessorLifecycleListener processorListener,
+        JobCoordinator jobCoordinator,
+        SamzaContainer container,
+        Duration runLoopShutdownDuration) {
+      super("TEST_PROCESSOR_ID", config, customMetricsReporters, 
streamTaskFactory, processorListener, jobCoordinator);
       this.container = container;
+      this.runLoopShutdownDuration = runLoopShutdownDuration;
     }
 
     @Override
@@ -103,13 +121,9 @@ public class TestStreamProcessor {
         RunLoop mockRunLoop = mock(RunLoop.class);
         doAnswer(invocation ->
           {
-            try {
-              runLoopStartForMain.countDown();
-              containerStop.await();
-            } catch (InterruptedException e) {
-              System.out.println("In exception" + e);
-              e.printStackTrace();
-            }
+            runLoopStartForMain.countDown();
+            containerStop.await();
+            Thread.sleep(this.runLoopShutdownDuration.toMillis());
             return null;
           }).when(mockRunLoop).run();
 
@@ -216,13 +230,80 @@ public class TestStreamProcessor {
     processorListenerStop.await();
 
     // Assertions on which callbacks are expected to be invoked
-    
Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
-    
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
-    Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_STOP));
+    assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
+    assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
+    assertTrue(processorListenerState.get(ListenerCallback.AFTER_STOP));
     
Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
   }
 
   /**
+   * Given that the job model expires, but the container takes too long to 
stop, a TimeoutException should be propagated
+   * to the processor lifecycle listener.
+   */
+  @Test
+  public void testJobModelExpiredContainerShutdownTimeout() throws 
InterruptedException {
+    JobCoordinator mockJobCoordinator = mock(JobCoordinator.class);
+    // use this to store the exception passed to afterFailure for the 
processor lifecycle listener
+    AtomicReference<Throwable> afterFailureException = new 
AtomicReference<>(null);
+    TestableStreamProcessor processor = new TestableStreamProcessor(
+        // set a small shutdown timeout so it triggers faster
+        new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, "1")),
+        new HashMap<>(),
+        mock(StreamTaskFactory.class),
+        new ProcessorLifecycleListener() {
+          @Override
+          public void beforeStart() { }
+
+          @Override
+          public void afterStart() { }
+
+          @Override
+          public void afterFailure(Throwable t) {
+            afterFailureException.set(t);
+          }
+
+          @Override
+          public void afterStop() { }
+        },
+        mockJobCoordinator,
+        null,
+        // take an extra second to shut down so that task shutdown timeout 
gets reached
+        Duration.of(1, ChronoUnit.SECONDS));
+
+    Thread jcThread = new Thread(() -> {
+        // gets processor into rebalance mode so onNewJobModel creates a new 
container
+        processor.jobCoordinatorListener.onJobModelExpired();
+        processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel());
+        try {
+          // wait for the run loop to be ready before triggering rebalance
+          processor.runLoopStartForMain.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        processor.jobCoordinatorListener.onJobModelExpired();
+      });
+    doAnswer(invocation -> {
+        jcThread.start();
+        return null;
+      }).when(mockJobCoordinator).start();
+
+    // ensure that the coordinator stop occurred before checking the exception 
being thrown
+    CountDownLatch coordinatorStop = new CountDownLatch(1);
+    doAnswer(invocation -> {
+        processor.jobCoordinatorListener.onCoordinatorStop();
+        coordinatorStop.countDown();
+        return null;
+      }).when(mockJobCoordinator).stop();
+
+    processor.start();
+
+    // make sure the job model expired callback completed
+    assertTrue("Job coordinator stop not called", coordinatorStop.await(10, 
TimeUnit.SECONDS));
+    assertNotNull(afterFailureException.get());
+    assertTrue(afterFailureException.get() instanceof TimeoutException);
+  }
+
+  /**
    * Tests that a failure in container correctly stops a running 
JobCoordinator and propagates the exception
    * through the StreamProcessor
    *
@@ -309,15 +390,15 @@ public class TestStreamProcessor {
     // This block is required for the mockRunloop is actually started.
     // Otherwise, processor.stop gets triggered before mockRunloop begins to 
block
     runLoopStartedLatch.await();
-    Assert.assertTrue(
+    assertTrue(
         "Container failed and processor listener failed was not invoked within 
timeout!",
         processorListenerFailed.await(30, TimeUnit.SECONDS));
     assertEquals(expectedThrowable, actualThrowable.get());
 
-    
Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
-    
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
+    assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
+    assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
     
Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_STOP));
-    
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
+    assertTrue(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
   }
 
   @Test

Reply via email to