Repository: hadoop
Updated Branches:
  refs/heads/branch-2 24d464a15 -> 0b5d96abb


YARN-5436. Race in AsyncDispatcher can cause random test failures in Tez 
(probably YARN also). (Zhiyuan Yang via gtcarrera9)

(cherry picked from commit 7086fc72eebc41fd174d91839ed703c014aac920)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b5d96ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b5d96ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b5d96ab

Branch: refs/heads/branch-2
Commit: 0b5d96abb5c3c1831489588ed81e15f9bf18bbe5
Parents: 24d464a
Author: Li Lu <gtcarre...@apache.org>
Authored: Thu Jul 28 16:50:57 2016 -0700
Committer: Li Lu <gtcarre...@apache.org>
Committed: Thu Jul 28 16:52:21 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/event/AsyncDispatcher.java      |  8 ++-
 .../hadoop/yarn/event/DrainDispatcher.java      | 53 +++++++++++++++++++-
 2 files changed, 55 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b5d96ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index f5361c8..5dea1c8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -59,6 +59,9 @@ public class AsyncDispatcher extends AbstractService 
implements Dispatcher {
 
   // Indicates all the remaining dispatcher's events on stop have been drained
   // and processed.
+  // Race condition happens if dispatcher thread sets drained to true between
+  // handler setting drained to false and enqueueing event. YARN-3878 decided
+  // to ignore it because of its tiny impact. Also see YARN-5436.
   private volatile boolean drained = true;
   private final Object waitForDrained = new Object();
 
@@ -300,9 +303,4 @@ public class AsyncDispatcher extends AbstractService 
implements Dispatcher {
   protected boolean isEventThreadWaiting() {
     return eventHandlingThread.getState() == Thread.State.WAITING;
   }
-
-  @VisibleForTesting
-  protected boolean isDrained() {
-    return this.drained;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b5d96ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index e4a5a82..cf4b1b5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -22,6 +22,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 @SuppressWarnings("rawtypes")
 public class DrainDispatcher extends AsyncDispatcher {
+  private volatile boolean drained = false;
+  private volatile boolean stopped = false;
+  private final BlockingQueue<Event> queue;
+  private final Object mutex;
 
   public DrainDispatcher() {
     this(new LinkedBlockingQueue<Event>());
@@ -29,6 +33,8 @@ public class DrainDispatcher extends AsyncDispatcher {
 
   public DrainDispatcher(BlockingQueue<Event> eventQueue) {
     super(eventQueue);
+    this.queue = eventQueue;
+    this.mutex = this;
   }
 
   /**
@@ -44,8 +50,53 @@ public class DrainDispatcher extends AsyncDispatcher {
    * Busy loop waiting for all queued events to drain.
    */
   public void await() {
-    while (!isDrained()) {
+    while (!drained) {
       Thread.yield();
     }
   }
+
+  @Override
+  Runnable createThread() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          synchronized (mutex) {
+            // !drained if dispatch queued new events on this dispatcher
+            drained = queue.isEmpty();
+          }
+          Event event;
+          try {
+            event = queue.take();
+          } catch (InterruptedException ie) {
+            return;
+          }
+          if (event != null) {
+            dispatch(event);
+          }
+        }
+      }
+    };
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public EventHandler getEventHandler() {
+    final EventHandler actual = super.getEventHandler();
+    return new EventHandler() {
+      @Override
+      public void handle(Event event) {
+        synchronized (mutex) {
+          actual.handle(event);
+          drained = false;
+        }
+      }
+    };
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopped = true;
+    super.serviceStop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to