Repository: tez Updated Branches: refs/heads/branch-0.7 55df029a8 -> cbd4eacb0
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. (Zhiyuan Yang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbd4eacb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbd4eacb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbd4eacb Branch: refs/heads/branch-0.7 Commit: cbd4eacb09b6a8ec350723848c397be42ca524c3 Parents: 55df029 Author: Hitesh Shah <[email protected]> Authored: Tue Sep 6 11:15:51 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Sep 6 11:15:51 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/AsyncDispatcher.java | 4 +- .../org/apache/tez/common/DrainDispatcher.java | 123 +++++++++++++++++++ .../apache/tez/dag/app/dag/impl/TestCommit.java | 2 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 2 +- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 2 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 2 +- .../dag/app/dag/impl/TestVertexRecovery.java | 2 +- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 2 +- 9 files changed, 133 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3bba3e4..65da496 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. TEZ-3326. Display JVM system properties in AM and task logs. TEZ-3009. Errors that occur during container task acquisition are not logged. TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion for AppLaunchedEvent. http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index 159ccd9..ec5f6c7 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; @@ -215,7 +216,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName()); } - private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) { + @VisibleForTesting + protected void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) { if (checkHandler) { checkForExistingHandler(eventType); } http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java new file mode 100644 index 0000000..fd1fc0a --- /dev/null +++ b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.common; + +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class DrainDispatcher extends AsyncDispatcher { + static final String DEFAULT_NAME = "dispatcher"; + private volatile boolean drained = false; + private volatile boolean stopped = false; + private final BlockingQueue<Event> queue; + private final Object mutex; + private static final Logger LOG = LoggerFactory.getLogger(DrainDispatcher.class); + + public DrainDispatcher() { + this(DEFAULT_NAME, new LinkedBlockingQueue<Event>()); + } + + public DrainDispatcher(String name, BlockingQueue<Event> eventQueue) { + super(name, eventQueue); + this.queue = eventQueue; + this.mutex = this; + } + + @SuppressWarnings("unchecked") + @Override + public void register(Class<? extends Enum> eventType, + EventHandler handler) { + /* check to see if we have a listener registered */ + EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType); + checkForExistingDispatchers(false, eventType); + LOG.info("Registering " + eventType + " for " + handler.getClass()); + if (registeredHandler == null) { + eventHandlers.put(eventType, handler); + } else if (!(registeredHandler instanceof MultiListenerHandler)){ + /* for multiple listeners of an event add the multiple listener handler */ + MultiListenerHandler multiHandler = new MultiListenerHandler(); + multiHandler.addHandler(registeredHandler); + multiHandler.addHandler(handler); + eventHandlers.put(eventType, multiHandler); + } else { + /* already a multilistener, just add to it */ + MultiListenerHandler multiHandler + = (MultiListenerHandler) registeredHandler; + multiHandler.addHandler(handler); + } + } + + /** + * Busy loop waiting for all queued events to drain. + */ + public void await() { + while (!drained) { + Thread.yield(); + } + } + + @Override + public Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } + Event event; + try { + event = queue.take(); + } catch (InterruptedException ie) { + return; + } + if (event != null) { + dispatch(event); + } + } + } + }; + } + + @SuppressWarnings("unchecked") + @Override + public EventHandler getEventHandler() { + final EventHandler actual = super.getEventHandler(); + return new EventHandler() { + @Override + public void handle(Event event) { + synchronized (mutex) { + actual.handle(event); + drained = false; + } + } + }; + } + + @Override + protected void serviceStop() throws Exception { + stopped = true; + super.serviceStop(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index 83421a2..464a370 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -45,10 +45,10 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.DrainDispatcher; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 6d45433..52ce23f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; +import org.apache.tez.common.DrainDispatcher; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; import org.slf4j.Logger; @@ -47,7 +48,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index fbf815d..2c47624 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -35,9 +35,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.DrainDispatcher; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 520d10f..da59539 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -50,6 +50,7 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.tez.common.DrainDispatcher; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; @@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java index bdb2377..79364ac 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java @@ -29,13 +29,13 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.tez.common.DrainDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.counters.TezCounters; http://git-wip-us.apache.org/repos/asf/tez/blob/cbd4eacb/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java index 0072f6a..4f0c182 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import java.util.List; +import org.apache.tez.common.DrainDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.TezConfiguration;
