This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 4cd1596 YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu. 4cd1596 is described below commit 4cd1596fe7c8d80d7124a517ea6cdc3b3df90292 Author: Peter Bacsko <pbac...@cloudera.com> AuthorDate: Tue May 11 15:18:16 2021 +0200 YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu. --- .../apache/hadoop/yarn/event/AsyncDispatcher.java | 19 ++++--- .../hadoop/yarn/event/TestAsyncDispatcher.java | 58 +++++++++++++++++++++- 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 81448c5..acfba27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -20,11 +20,11 @@ package org.apache.hadoop.yarn.event; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -265,11 +265,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } class GenericEventHandler implements EventHandler<Event> { - private void printEventQueueDetails(BlockingQueue<Event> queue) { - Map<Enum, Long> counterMap = eventQueue.stream(). - collect(Collectors. - groupingBy(e -> e.getType(), Collectors.counting()) - ); + private void printEventQueueDetails() { + Iterator<Event> iterator = eventQueue.iterator(); + Map<Enum, Long> counterMap = new HashMap<>(); + while (iterator.hasNext()) { + Enum eventType = iterator.next().getType(); + if (!counterMap.containsKey(eventType)) { + counterMap.put(eventType, 0L); + } + counterMap.put(eventType, counterMap.get(eventType) + 1); + } for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) { long num = entry.getValue(); LOG.info("Event type: " + entry.getKey() @@ -292,7 +297,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { if (qSize != 0 && qSize % detailsInterval == 0 && lastEventDetailsQueueSizeLogged != qSize) { lastEventDetailsQueueSizeLogged = qSize; - printEventQueueDetails(eventQueue); + printEventQueueDetails(); printTrigger = true; } int remCapacity = eventQueue.remainingCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 53e2d05..88855fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -97,12 +97,23 @@ public class TestAsyncDispatcher { } private static class TestHandler implements EventHandler<Event> { + + private long sleepTime = 1500; + + TestHandler() { + } + + TestHandler(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override public void handle(Event event) { try { // As long as 10000 events queued - Thread.sleep(1500); - } catch (InterruptedException e) {} + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + } } } @@ -170,11 +181,54 @@ public class TestAsyncDispatcher { //Make sure more than one event to take verify(log, atLeastOnce()). info("Latest dispatch event type: TestEventType"); + } finally { + //... restore logger object + logger.set(null, oldLog); dispatcher.stop(); + } + } + + //Test print dispatcher details when the blocking queue is heavy + @Test(timeout = 60000) + public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception { + for (int i = 0; i < 5; i++) { + testPrintDispatcherEventDetailsAvoidDeadLoopInternal(); + } + } + + public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration. + YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10); + Log log = mock(Log.class); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + + Field logger = AsyncDispatcher.class.getDeclaredField("LOG"); + logger.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL); + Object oldLog = logger.get(null); + + try { + logger.set(null, log); + dispatcher.register(TestEnum.class, new TestHandler(0)); + dispatcher.start(); + + for (int i = 0; i < 10000; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + Thread.sleep(3000); } finally { //... restore logger object logger.set(null, oldLog); + dispatcher.stop(); } } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org