This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9b42f37 SAMZA-2258: Log thread dump when container stop times out
during processor rebalancing (#1087)
9b42f37 is described below
commit 9b42f379d29a92575594519f5b3ff3772c554312
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Jun 25 11:40:19 2019 -0700
SAMZA-2258: Log thread dump when container stop times out during processor
rebalancing (#1087)
---
.../apache/samza/processor/StreamProcessor.java | 8 +-
.../samza/processor/TestStreamProcessor.java | 121 +++++++++++++++++----
2 files changed, 107 insertions(+), 22 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 75dc62d..9877a62 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -59,6 +59,7 @@ import org.apache.samza.task.TaskFactory;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.util.ThreadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -388,8 +389,11 @@ public class StreamProcessor {
// we propagate to the application runner maybe overwritten by container
failure cause in case of interleaved execution.
// It is acceptable since container exception is much more useful compared
to timeout exception.
// We can infer from the logs about the fact that container shutdown timed
out or not for additional inference.
- if (!hasContainerShutdown && containerException == null) {
- containerException = new TimeoutException("Container shutdown timed out
after " + taskShutdownMs + " ms.");
+ if (!hasContainerShutdown) {
+ ThreadUtil.logThreadDump("Thread dump at failure for stopping container
in stream processor");
+ if (containerException == null) {
+ containerException = new TimeoutException("Container shutdown timed
out after " + taskShutdownMs + " ms.");
+ }
}
return hasContainerShutdown;
diff --git
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index d5bce16..b809ead 100644
---
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -18,7 +18,8 @@
*/
package org.apache.samza.processor;
-import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -26,13 +27,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.container.SamzaContainerStatus;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.RunLoop;
import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainerStatus;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
@@ -50,6 +54,8 @@ import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
@@ -84,17 +90,29 @@ public class TestStreamProcessor {
class TestableStreamProcessor extends StreamProcessor {
private final CountDownLatch containerStop = new CountDownLatch(1);
private final CountDownLatch runLoopStartForMain = new CountDownLatch(1);
- public SamzaContainer container = null;
+ private SamzaContainer container = null;
+ private final Duration runLoopShutdownDuration;
- public TestableStreamProcessor(
- Config config,
+ public TestableStreamProcessor(Config config,
Map<String, MetricsReporter> customMetricsReporters,
StreamTaskFactory streamTaskFactory,
ProcessorLifecycleListener processorListener,
JobCoordinator jobCoordinator,
SamzaContainer container) {
- super("TEST_PROCESSOR_ID", config, customMetricsReporters,
streamTaskFactory, processorListener, jobCoordinator);
+ this(config, customMetricsReporters, streamTaskFactory,
processorListener, jobCoordinator, container,
+ Duration.ZERO);
+ }
+
+ public TestableStreamProcessor(Config config,
+ Map<String, MetricsReporter> customMetricsReporters,
+ StreamTaskFactory streamTaskFactory,
+ ProcessorLifecycleListener processorListener,
+ JobCoordinator jobCoordinator,
+ SamzaContainer container,
+ Duration runLoopShutdownDuration) {
+ super("TEST_PROCESSOR_ID", config, customMetricsReporters,
streamTaskFactory, processorListener, jobCoordinator);
this.container = container;
+ this.runLoopShutdownDuration = runLoopShutdownDuration;
}
@Override
@@ -103,13 +121,9 @@ public class TestStreamProcessor {
RunLoop mockRunLoop = mock(RunLoop.class);
doAnswer(invocation ->
{
- try {
- runLoopStartForMain.countDown();
- containerStop.await();
- } catch (InterruptedException e) {
- System.out.println("In exception" + e);
- e.printStackTrace();
- }
+ runLoopStartForMain.countDown();
+ containerStop.await();
+ Thread.sleep(this.runLoopShutdownDuration.toMillis());
return null;
}).when(mockRunLoop).run();
@@ -216,13 +230,80 @@ public class TestStreamProcessor {
processorListenerStop.await();
// Assertions on which callbacks are expected to be invoked
-
Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
-
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
- Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_STOP));
+ assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
+ assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
+ assertTrue(processorListenerState.get(ListenerCallback.AFTER_STOP));
Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
}
/**
+ * Given that the job model expires, but the container takes too long to
stop, a TimeoutException should be propagated
+ * to the processor lifecycle listener.
+ */
+ @Test
+ public void testJobModelExpiredContainerShutdownTimeout() throws
InterruptedException {
+ JobCoordinator mockJobCoordinator = mock(JobCoordinator.class);
+ // use this to store the exception passed to afterFailure for the
processor lifecycle listener
+ AtomicReference<Throwable> afterFailureException = new
AtomicReference<>(null);
+ TestableStreamProcessor processor = new TestableStreamProcessor(
+ // set a small shutdown timeout so it triggers faster
+ new MapConfig(ImmutableMap.of(TaskConfig.TASK_SHUTDOWN_MS, "1")),
+ new HashMap<>(),
+ mock(StreamTaskFactory.class),
+ new ProcessorLifecycleListener() {
+ @Override
+ public void beforeStart() { }
+
+ @Override
+ public void afterStart() { }
+
+ @Override
+ public void afterFailure(Throwable t) {
+ afterFailureException.set(t);
+ }
+
+ @Override
+ public void afterStop() { }
+ },
+ mockJobCoordinator,
+ null,
+ // take an extra second to shut down so that task shutdown timeout
gets reached
+ Duration.of(1, ChronoUnit.SECONDS));
+
+ Thread jcThread = new Thread(() -> {
+ // gets processor into rebalance mode so onNewJobModel creates a new
container
+ processor.jobCoordinatorListener.onJobModelExpired();
+ processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel());
+ try {
+ // wait for the run loop to be ready before triggering rebalance
+ processor.runLoopStartForMain.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ processor.jobCoordinatorListener.onJobModelExpired();
+ });
+ doAnswer(invocation -> {
+ jcThread.start();
+ return null;
+ }).when(mockJobCoordinator).start();
+
+ // ensure that the coordinator stop occurred before checking the exception
being thrown
+ CountDownLatch coordinatorStop = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ processor.jobCoordinatorListener.onCoordinatorStop();
+ coordinatorStop.countDown();
+ return null;
+ }).when(mockJobCoordinator).stop();
+
+ processor.start();
+
+ // make sure the job model expired callback completed
+ assertTrue("Job coordinator stop not called", coordinatorStop.await(10,
TimeUnit.SECONDS));
+ assertNotNull(afterFailureException.get());
+ assertTrue(afterFailureException.get() instanceof TimeoutException);
+ }
+
+ /**
* Tests that a failure in container correctly stops a running
JobCoordinator and propagates the exception
* through the StreamProcessor
*
@@ -309,15 +390,15 @@ public class TestStreamProcessor {
// This block is required for the mockRunloop is actually started.
// Otherwise, processor.stop gets triggered before mockRunloop begins to
block
runLoopStartedLatch.await();
- Assert.assertTrue(
+ assertTrue(
"Container failed and processor listener failed was not invoked within
timeout!",
processorListenerFailed.await(30, TimeUnit.SECONDS));
assertEquals(expectedThrowable, actualThrowable.get());
-
Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
-
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
+ assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
+ assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_STOP));
-
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
+ assertTrue(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
}
@Test