Repository: tez
Updated Branches:
refs/heads/branch-0.8 90519d9f7 -> 6e0c38180
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. (Zhiyuan Yang
via hitesh)
(cherry picked from commit a9eb9370108f1b764c27aebba853c5a5940ce938)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6e0c3818
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6e0c3818
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6e0c3818
Branch: refs/heads/branch-0.8
Commit: 6e0c38180b66518b007d1db9ec490a17858b4ad8
Parents: 90519d9
Author: Hitesh Shah <[email protected]>
Authored: Thu Aug 25 14:56:31 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Thu Aug 25 14:57:39 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/AsyncDispatcher.java | 4 +-
.../org/apache/tez/common/DrainDispatcher.java | 123 +++++++++++++++++++
.../apache/tez/dag/app/dag/impl/TestCommit.java | 2 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 2 +-
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 2 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 3 +-
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 2 +-
8 files changed, 132 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 87adaa2..c2f8340 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion
for AppLaunchedEvent.
TEZ-3352. MRInputHelpers getStringProperty() should not fail if property
value is null.
TEZ-3409. Log dagId along with other information when submitting a dag.
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 159ccd9..ec5f6c7 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
@@ -215,7 +216,8 @@ public class AsyncDispatcher extends CompositeService
implements Dispatcher {
"Multiple concurrent dispatchers cannot be registered for: " +
eventType.getName());
}
- private void checkForExistingDispatchers(boolean checkHandler, Class<?
extends Enum> eventType) {
+ @VisibleForTesting
+ protected void checkForExistingDispatchers(boolean checkHandler, Class<?
extends Enum> eventType) {
if (checkHandler) {
checkForExistingHandler(eventType);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
new file mode 100644
index 0000000..fd1fc0a
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/DrainDispatcher.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class DrainDispatcher extends AsyncDispatcher {
+ static final String DEFAULT_NAME = "dispatcher";
+ private volatile boolean drained = false;
+ private volatile boolean stopped = false;
+ private final BlockingQueue<Event> queue;
+ private final Object mutex;
+ private static final Logger LOG =
LoggerFactory.getLogger(DrainDispatcher.class);
+
+ public DrainDispatcher() {
+ this(DEFAULT_NAME, new LinkedBlockingQueue<Event>());
+ }
+
+ public DrainDispatcher(String name, BlockingQueue<Event> eventQueue) {
+ super(name, eventQueue);
+ this.queue = eventQueue;
+ this.mutex = this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void register(Class<? extends Enum> eventType,
+ EventHandler handler) {
+ /* check to see if we have a listener registered */
+ EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventHandlers.get(eventType);
+ checkForExistingDispatchers(false, eventType);
+ LOG.info("Registering " + eventType + " for " + handler.getClass());
+ if (registeredHandler == null) {
+ eventHandlers.put(eventType, handler);
+ } else if (!(registeredHandler instanceof MultiListenerHandler)){
+ /* for multiple listeners of an event add the multiple listener handler
*/
+ MultiListenerHandler multiHandler = new MultiListenerHandler();
+ multiHandler.addHandler(registeredHandler);
+ multiHandler.addHandler(handler);
+ eventHandlers.put(eventType, multiHandler);
+ } else {
+ /* already a multilistener, just add to it */
+ MultiListenerHandler multiHandler
+ = (MultiListenerHandler) registeredHandler;
+ multiHandler.addHandler(handler);
+ }
+ }
+
+ /**
+ * Busy loop waiting for all queued events to drain.
+ */
+ public void await() {
+ while (!drained) {
+ Thread.yield();
+ }
+ }
+
+ @Override
+ public Runnable createThread() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ synchronized (mutex) {
+ // !drained if dispatch queued new events on this dispatcher
+ drained = queue.isEmpty();
+ }
+ Event event;
+ try {
+ event = queue.take();
+ } catch (InterruptedException ie) {
+ return;
+ }
+ if (event != null) {
+ dispatch(event);
+ }
+ }
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public EventHandler getEventHandler() {
+ final EventHandler actual = super.getEventHandler();
+ return new EventHandler() {
+ @Override
+ public void handle(Event event) {
+ synchronized (mutex) {
+ actual.handle(event);
+ drained = false;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopped = true;
+ super.serviceStop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index fd56495..d3b474c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -46,10 +46,10 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 4471278..c65f7e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
@@ -51,7 +52,6 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 3c284ec..260bd42 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -39,12 +39,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.TezConfiguration;
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 06ae442..b7e63f6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -51,6 +51,7 @@ import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
@@ -79,7 +80,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -6909,7 +6909,6 @@ public class TestVertexImpl {
new ContainerContextMatcher(), appContext);
containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
- dispatcher.register(DAGEventType.class, dagEventDispatcher);
initAllVertices(VertexState.INITED);
http://git-wip-us.apache.org/repos/asf/tez/blob/6e0c3818/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index 25d1784..e123dd1 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;