HADOOP-14912. FairCallQueue may defer servicing calls. Contributed by Daryn Sharp
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1123f8f0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1123f8f0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1123f8f0 Branch: refs/heads/YARN-5881 Commit: 1123f8f0b62292197f5433cd40e66d8620044608 Parents: f29e55b Author: Jason Lowe <[email protected]> Authored: Tue Oct 10 14:47:25 2017 -0500 Committer: Jason Lowe <[email protected]> Committed: Tue Oct 10 14:47:25 2017 -0500 ---------------------------------------------------------------------- .../src/main/java/org/apache/hadoop/ipc/FairCallQueue.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1123f8f0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 20161b8..6d9ea3e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -122,13 +122,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E> private E removeNextElement() { int priority = multiplexer.getAndAdvanceCurrentIndex(); E e = queues.get(priority).poll(); - if (e == null) { + // a semaphore permit has been acquired, so an element MUST be extracted + // or the semaphore and queued elements will go out of sync. loop to + // avoid race condition if elements are added behind the current position, + // awakening other threads that poll the elements ahead of our position. + while (e == null) { for (int idx = 0; e == null && idx < queues.size(); idx++) { e = queues.get(idx).poll(); } } - // guaranteed to find an element if caller acquired permit. - assert e != null : "consumer didn't acquire semaphore!"; return e; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
