Repository: tez Updated Branches: refs/heads/master bb4fb6471 -> ad8a80d2b
TEZ-2049. Remove YARN references from Tez AsyncDispatcher (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad8a80d2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad8a80d2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad8a80d2 Branch: refs/heads/master Commit: ad8a80d2b4acbb481949c4899b406cc1cc9ddec4 Parents: bb4fb64 Author: Zhiyuan Yang <[email protected]> Authored: Mon Apr 24 16:58:42 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Mon Apr 24 16:58:42 2017 -0700 ---------------------------------------------------------------------- .../org/apache/tez/common/AsyncDispatcher.java | 17 ++++++++++++----- .../tez/common/AsyncDispatcherConcurrent.java | 12 +++++++----- .../java/org/apache/tez/dag/app/DAGAppMaster.java | 6 ++++-- 3 files changed, 23 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index ec5f6c7..3a59ff6 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -74,7 +74,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers = Maps.newHashMap(); - private boolean exitOnDispatchException; + private boolean exitOnDispatchException = false; public AsyncDispatcher(String name) { this(name, new LinkedBlockingQueue<Event>()); @@ -121,10 +121,6 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { @Override protected void serviceInit(Configuration conf) throws Exception { - // TODO TEZ-2049 remove YARN reference - this.exitOnDispatchException = - conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, - Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); super.serviceInit(conf); } @@ -225,6 +221,11 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { checkForExistingConcurrentDispatcher(eventType); } + @VisibleForTesting + public void enableExitOnDispatchException() { + exitOnDispatchException = true; + } + /** * Add an EventHandler for events handled inline on this dispatcher */ @@ -278,6 +279,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { LOG.info( "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass()); AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads); + if (exitOnDispatchException) { + dispatcher.enableExitOnDispatchException(); + } dispatcher.register(eventType, handler); concurrentEventDispatchers.put(eventType, dispatcher); addIfService(dispatcher); @@ -292,6 +296,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { checkForExistingDispatchers(true, eventType); LOG.info("Registering " + eventType + " with existing concurrent dispatch using: " + handler.getClass()); + if (exitOnDispatchException) { + dispatcher.enableExitOnDispatchException(); + } dispatcher.register(eventType, handler); concurrentEventDispatchers.put(eventType, dispatcher); } http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java index 321ea8b..4a632f5 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; @@ -77,7 +78,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap(); protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers = Maps.newHashMap(); - private boolean exitOnDispatchException; + private boolean exitOnDispatchException = false; AsyncDispatcherConcurrent(String name, int numThreads) { super(name); @@ -126,10 +127,6 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa @Override protected void serviceInit(Configuration conf) throws Exception { - // TODO TEZ-2049 remove YARN reference - this.exitOnDispatchException = - conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, - Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); super.serviceInit(conf); } @@ -284,6 +281,11 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa eventDispatchers.put(eventType, dispatcher); } + @VisibleForTesting + public void enableExitOnDispatchException() { + this.exitOnDispatchException = true; + } + @Override public EventHandler getEventHandler() { return handlerInstance; http://git-wip-us.apache.org/repos/asf/tez/blob/ad8a80d2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index fc24f04..76d9fdb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -478,16 +478,18 @@ public class DAGAppMaster extends AbstractService { } } + dispatcher = createDispatcher(); + if (isLocal) { conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT); + } else { + dispatcher.enableExitOnDispatchException(); } - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, !isLocal); String strAppId = this.appAttemptID.getApplicationId().toString(); this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId); - dispatcher = createDispatcher(); context = new RunningAppContext(conf); this.aclManager = new ACLManager(appMasterUgi.getShortUserName(), this.amConf);
