This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 73bcabd TEZ-4277: AsyncDispatcher can hang on serviceStop if the
eventhandling thread is in BLOCKED state (#97) (Laszlo Bodor reviewed by Rajesh
Balamohan)
73bcabd is described below
commit 73bcabd2bca2536bf4f3673443a8dcdaaf79a4eb
Author: Bodor Laszlo <[email protected]>
AuthorDate: Mon Feb 1 11:33:19 2021 +0100
TEZ-4277: AsyncDispatcher can hang on serviceStop if the eventhandling
thread is in BLOCKED state (#97) (Laszlo Bodor reviewed by Rajesh Balamohan)
TEZ-4277: AsyncDispatcher can hang on serviceStop if the eventhandling
thread is in BLOCKED state
---
.../org/apache/tez/dag/api/TezConfiguration.java | 11 +++++++++
.../org/apache/tez/common/AsyncDispatcher.java | 28 ++++++++++++++++++----
.../java/org/apache/tez/client/LocalClient.java | 3 ++-
3 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 05eb4b2..eef0d65 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -194,6 +194,17 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT =
10;
/**
+ * Integer value. Milliseconds while AsyncDispatcher should wait for events
to be processed on
+ * serviceStop. The idea is borrowed from YARN-3999.
+ */
+ @Private
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT =
TEZ_AM_PREFIX
+ + "dispatcher.drain-events.timeout";
+ @Private
+ public static final int TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT =
10000;
+
+ /**
* Boolean value. Execution mode for the Tez application. True implies
session mode. If the client
* code is written according to best practices then the same code can
execute in either mode based
* on this configuration. Session mode is more aggressive in reserving
execution resources and is
diff --git
a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index c197f1d..f9f21ca 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -142,19 +143,34 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, ignoring any new
events.");
+ long endTime = System.currentTimeMillis() + getConfig()
+ .getInt(TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT,
+ TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT);
+
synchronized (waitForDrained) {
- while (!drained && eventHandlingThread.isAlive()) {
+ while (!drained && eventHandlingThread.isAlive() &&
System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000);
- LOG.info("Waiting for AsyncDispatcher to drain.");
+ LOG.info(
+ "Waiting for AsyncDispatcher to drain. Current queue size: {},
handler thread state: {}",
+ eventQueue.size(), eventHandlingThread.getState());
}
}
-
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
- eventHandlingThread.join();
+ /*
+ * The event handling thread can be alive at this point, but in
BLOCKED state, which leads
+ * to app hang, as a BLOCKED thread might never finish under some
circumstances
+ */
+ if (eventHandlingThread.getState() == Thread.State.BLOCKED) {
+ LOG.warn(
+ "eventHandlingThread is in BLOCKED state, let's not wait for it
in order to prevent app hang");
+ } else {
+ eventHandlingThread.join();
+ LOG.info("joined event handling thread, state: {}",
eventHandlingThread.getState());
+ }
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
@@ -181,6 +197,10 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ LOG.warn("Interrupted Exception while handling event: " +
event.getType(), t);
+ Thread.currentThread().interrupt();
+ }
LOG.error("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index c76bd6b..f6d9587 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -47,6 +47,7 @@ import
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.AsyncDispatcher;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -356,7 +357,7 @@ public class LocalClient extends FrameworkClient {
amCredentials,
UserGroupInformation.getCurrentUser().getShortUserName());
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
clientHandler = new DAGClientHandler(dagAppMaster);
-
+
((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
} catch (Throwable t) {
LOG.error("Error starting DAGAppMaster", t);
if (dagAppMaster != null) {