This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 73bcabd  TEZ-4277: AsyncDispatcher can hang on serviceStop if the 
eventhandling thread is in BLOCKED state (#97) (Laszlo Bodor reviewed by Rajesh 
Balamohan)
73bcabd is described below

commit 73bcabd2bca2536bf4f3673443a8dcdaaf79a4eb
Author: Bodor Laszlo <[email protected]>
AuthorDate: Mon Feb 1 11:33:19 2021 +0100

    TEZ-4277: AsyncDispatcher can hang on serviceStop if the eventhandling 
thread is in BLOCKED state (#97) (Laszlo Bodor reviewed by Rajesh Balamohan)
    
    TEZ-4277: AsyncDispatcher can hang on serviceStop if the eventhandling 
thread is in BLOCKED state
---
 .../org/apache/tez/dag/api/TezConfiguration.java   | 11 +++++++++
 .../org/apache/tez/common/AsyncDispatcher.java     | 28 ++++++++++++++++++----
 .../java/org/apache/tez/client/LocalClient.java    |  3 ++-
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 05eb4b2..eef0d65 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -194,6 +194,17 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT = 
10;
 
   /**
+   * Integer value. Milliseconds while AsyncDispatcher should wait for events 
to be processed on
+   * serviceStop. The idea is borrowed from YARN-3999.
+   */
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 
TEZ_AM_PREFIX
+      + "dispatcher.drain-events.timeout";
+  @Private
+  public static final int TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT = 
10000;
+
+  /**
    * Boolean value. Execution mode for the Tez application. True implies 
session mode. If the client
    * code is written according to best practices then the same code can 
execute in either mode based
    * on this configuration. Session mode is more aggressive in reserving 
execution resources and is
diff --git 
a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java 
b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index c197f1d..f9f21ca 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,19 +143,34 @@ public class AsyncDispatcher extends CompositeService 
implements Dispatcher {
     if (drainEventsOnStop) {
       blockNewEvents = true;
       LOG.info("AsyncDispatcher is draining to stop, ignoring any new 
events.");
+      long endTime = System.currentTimeMillis() + getConfig()
+          .getInt(TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT,
+              TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT);
+
       synchronized (waitForDrained) {
-        while (!drained && eventHandlingThread.isAlive()) {
+        while (!drained && eventHandlingThread.isAlive() && 
System.currentTimeMillis() < endTime) {
           waitForDrained.wait(1000);
-          LOG.info("Waiting for AsyncDispatcher to drain.");
+          LOG.info(
+              "Waiting for AsyncDispatcher to drain. Current queue size: {}, 
handler thread state: {}",
+              eventQueue.size(), eventHandlingThread.getState());
         }
       }
-      
     }
     stopped = true;
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
       try {
-        eventHandlingThread.join();
+        /*
+         * The event handling thread can be alive at this point, but in 
BLOCKED state, which leads
+         * to app hang, as a BLOCKED thread might never finish under some 
circumstances
+         */
+        if (eventHandlingThread.getState() == Thread.State.BLOCKED) {
+          LOG.warn(
+              "eventHandlingThread is in BLOCKED state, let's not wait for it 
in order to prevent app hang");
+        } else {
+          eventHandlingThread.join();
+          LOG.info("joined event handling thread, state: {}", 
eventHandlingThread.getState());
+        }
       } catch (InterruptedException ie) {
         LOG.warn("Interrupted Exception while stopping", ie);
       }
@@ -181,6 +197,10 @@ public class AsyncDispatcher extends CompositeService 
implements Dispatcher {
         throw new Exception("No handler for registered for " + type);
       }
     } catch (Throwable t) {
+      if (t instanceof InterruptedException) {
+        LOG.warn("Interrupted Exception while handling event: " + 
event.getType(), t);
+        Thread.currentThread().interrupt();
+      }
       LOG.error("Error in dispatcher thread", t);
       // If serviceStop is called, we should exit this thread gracefully.
       if (exitOnDispatchException
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java 
b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index c76bd6b..f6d9587 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.AsyncDispatcher;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -356,7 +357,7 @@ public class LocalClient extends FrameworkClient {
                   amCredentials, 
UserGroupInformation.getCurrentUser().getShortUserName());
           DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
           clientHandler = new DAGClientHandler(dagAppMaster);
-
+          
((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
         } catch (Throwable t) {
           LOG.error("Error starting DAGAppMaster", t);
           if (dagAppMaster != null) {

Reply via email to