This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a2df5490 TEZ-4501: Fix TestLocalMode timeouts (#300). (Laszlo Bodor, 
reviewed by Ayush Saxena)
5a2df5490 is described below

commit 5a2df5490bea37fc68224dc8130a7cf7c954febf
Author: Bodor Laszlo <[email protected]>
AuthorDate: Thu Jan 25 09:07:16 2024 +0100

    TEZ-4501: Fix TestLocalMode timeouts (#300). (Laszlo Bodor, reviewed by 
Ayush Saxena)
---
 .../main/java/org/apache/tez/common/AsyncDispatcher.java   | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java 
b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index f9f21ca31..14f712183 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -105,9 +105,12 @@ public class AsyncDispatcher extends CompositeService 
implements Dispatcher {
           Event event;
           try {
             event = eventQueue.take();
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("AsyncDispatcher taken event: {}", event);
+            }
           } catch(InterruptedException ie) {
             if (!stopped) {
-              LOG.warn("AsyncDispatcher thread interrupted", ie);
+              LOG.warn("AsyncDispatcher thread interrupted (while taking 
event)", ie);
             }
             return;
           }
@@ -140,6 +143,8 @@ public class AsyncDispatcher extends CompositeService 
implements Dispatcher {
 
   @Override
   protected void serviceStop() throws Exception {
+    LOG.info("AsyncDispatcher serviceStop called, drainEventsOnStop: {}, 
drained: {}, eventQueue size: {}",
+        drainEventsOnStop, drained, eventQueue.size());
     if (drainEventsOnStop) {
       blockNewEvents = true;
       LOG.info("AsyncDispatcher is draining to stop, ignoring any new 
events.");
@@ -148,7 +153,7 @@ public class AsyncDispatcher extends CompositeService 
implements Dispatcher {
               TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT);
 
       synchronized (waitForDrained) {
-        while (!drained && eventHandlingThread.isAlive() && 
System.currentTimeMillis() < endTime) {
+        while (!eventQueue.isEmpty() && eventHandlingThread.isAlive() && 
System.currentTimeMillis() < endTime) {
           waitForDrained.wait(1000);
           LOG.info(
               "Waiting for AsyncDispatcher to drain. Current queue size: {}, 
handler thread state: {}",
@@ -364,9 +369,12 @@ public class AsyncDispatcher extends CompositeService 
implements Dispatcher {
       }
       try {
         eventQueue.put(event);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("AsyncDispatcher put event: {}", event);
+        }
       } catch (InterruptedException e) {
         if (!stopped) {
-          LOG.warn("AsyncDispatcher thread interrupted", e);
+          LOG.warn("AsyncDispatcher thread interrupted (while putting event): 
{}", event, e);
         }
         throw new YarnRuntimeException(e);
       }

Reply via email to