This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 5a2df5490 TEZ-4501: Fix TestLocalMode timeouts (#300). (Laszlo Bodor,
reviewed by Ayush Saxena)
5a2df5490 is described below
commit 5a2df5490bea37fc68224dc8130a7cf7c954febf
Author: Bodor Laszlo <[email protected]>
AuthorDate: Thu Jan 25 09:07:16 2024 +0100
TEZ-4501: Fix TestLocalMode timeouts (#300). (Laszlo Bodor, reviewed by
Ayush Saxena)
---
.../main/java/org/apache/tez/common/AsyncDispatcher.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git
a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index f9f21ca31..14f712183 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -105,9 +105,12 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
Event event;
try {
event = eventQueue.take();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("AsyncDispatcher taken event: {}", event);
+ }
} catch(InterruptedException ie) {
if (!stopped) {
- LOG.warn("AsyncDispatcher thread interrupted", ie);
+ LOG.warn("AsyncDispatcher thread interrupted (while taking
event)", ie);
}
return;
}
@@ -140,6 +143,8 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
@Override
protected void serviceStop() throws Exception {
+ LOG.info("AsyncDispatcher serviceStop called, drainEventsOnStop: {},
drained: {}, eventQueue size: {}",
+ drainEventsOnStop, drained, eventQueue.size());
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, ignoring any new
events.");
@@ -148,7 +153,7 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT);
synchronized (waitForDrained) {
- while (!drained && eventHandlingThread.isAlive() &&
System.currentTimeMillis() < endTime) {
+ while (!eventQueue.isEmpty() && eventHandlingThread.isAlive() &&
System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000);
LOG.info(
"Waiting for AsyncDispatcher to drain. Current queue size: {},
handler thread state: {}",
@@ -364,9 +369,12 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
}
try {
eventQueue.put(event);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("AsyncDispatcher put event: {}", event);
+ }
} catch (InterruptedException e) {
if (!stopped) {
- LOG.warn("AsyncDispatcher thread interrupted", e);
+ LOG.warn("AsyncDispatcher thread interrupted (while putting event):
{}", event, e);
}
throw new YarnRuntimeException(e);
}