Repository: tez Updated Branches: refs/heads/master 9f090279d -> f6ea0fb33
TEZ-1897. Create a concurrent version of AsyncDispatcher (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f6ea0fb3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f6ea0fb3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f6ea0fb3 Branch: refs/heads/master Commit: f6ea0fb3306faa709c445e4d76081de60545d760 Parents: 9f09027 Author: Bikas Saha <[email protected]> Authored: Sat May 2 15:21:17 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Sat May 2 15:21:17 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 14 + tez-common/findbugs-exclude.xml | 5 + .../org/apache/tez/common/AsyncDispatcher.java | 85 ++++- .../tez/common/AsyncDispatcherConcurrent.java | 368 +++++++++++++++++++ .../org/apache/tez/common/TezAbstractEvent.java | 45 +++ .../org/apache/tez/dag/records/TezTaskID.java | 20 +- .../apache/tez/common/TestAsyncDispatcher.java | 2 +- .../common/TestAsyncDispatcherConcurrent.java | 194 ++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 19 +- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 2 - .../tez/dag/app/dag/event/CallableEvent.java | 4 +- .../dag/app/dag/event/DAGAppMasterEvent.java | 5 +- .../apache/tez/dag/app/dag/event/DAGEvent.java | 4 +- .../tez/dag/app/dag/event/SpeculatorEvent.java | 4 +- .../tez/dag/app/dag/event/TaskAttemptEvent.java | 9 +- .../apache/tez/dag/app/dag/event/TaskEvent.java | 9 +- .../tez/dag/app/dag/event/VertexEvent.java | 4 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 1 - .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 29 +- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 10 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 13 +- .../tez/dag/app/TestMockDAGAppMaster.java | 50 ++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 42 ++- .../app/dag/impl/TestTaskAttemptRecovery.java | 2 +- .../tez/dag/app/dag/impl/TestTaskImpl.java | 4 +- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 2 +- 27 files changed, 857 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 609db3c..8108ac8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-1897. Create a concurrent version of AsyncDispatcher TEZ-2394. Issues when there is an error in VertexManager callbacks TEZ-2386. Tez UI: Inconsistent usage of icon colors TEZ-2395. Tez UI: Minimum/Maximum Duration show a empty bracket next to 0 secs when you purposefully failed a job. http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 14e773d..a301957 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -141,6 +141,20 @@ public class TezConfiguration extends Configuration { @ConfigurationScope(Scope.AM) public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path"; + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_USE_CONCURRENT_DISPATCHER = TEZ_AM_PREFIX + + "use.concurrent-dispatcher"; + @Private + public static boolean TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT = true; + + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY = TEZ_AM_PREFIX + + "concurrent-dispatcher.concurrency"; + @Private + public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT = 10; + /** * Boolean value. Execution mode for the Tez application. True implies session mode. If the client * code is written according to best practices then the same code can execute in either mode based http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-common/findbugs-exclude.xml b/tez-common/findbugs-exclude.xml index 7814585..6f6253d 100644 --- a/tez-common/findbugs-exclude.xml +++ b/tez-common/findbugs-exclude.xml @@ -20,4 +20,9 @@ <Bug pattern="DM_EXIT"/> </Match> + <Match> + <Class name="org.apache.tez.common.AsyncDispatcherConcurrent$1"/> + <Method name="run" /> + <Bug pattern="DM_EXIT"/> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index 5aaa4cf..4319f4f 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -68,8 +68,11 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { private EventHandler handlerInstance = new GenericEventHandler(); private Thread eventHandlingThread; - protected final Map<Class<? extends Enum>, EventHandler> eventHandlers; - protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers; + protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap(); + protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers = Maps.newHashMap(); + protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers = + Maps.newHashMap(); + private boolean exitOnDispatchException; public AsyncDispatcher(String name) { @@ -77,11 +80,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { } public AsyncDispatcher(String name, BlockingQueue<Event> eventQueue) { - super("Dispatcher"); + super(name); this.name = name; this.eventQueue = eventQueue; - this.eventHandlers = Maps.newHashMap(); - this.eventDispatchers = Maps.newHashMap(); } public Runnable createThread() { @@ -195,6 +196,32 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { } } } + + private void checkForExistingHandler(Class<? extends Enum> eventType) { + EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType); + Preconditions.checkState(registeredHandler == null, + "Cannot register same event on multiple dispatchers"); + } + + private void checkForExistingDispatcher(Class<? extends Enum> eventType) { + AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType); + Preconditions.checkState(registeredDispatcher == null, + "Multiple dispatchers cannot be registered for: " + eventType.getName()); + } + + private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) { + AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType); + Preconditions.checkState(concurrentDispatcher == null, + "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName()); + } + + private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) { + if (checkHandler) { + checkForExistingHandler(eventType); + } + checkForExistingDispatcher(eventType); + checkForExistingConcurrentDispatcher(eventType); + } /** * Add an EventHandler for events handled inline on this dispatcher @@ -205,9 +232,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { Preconditions.checkState(getServiceState() == STATE.NOTINITED); /* check to see if we have a listener registered */ EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType); - AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType); - Preconditions.checkState(registeredDispatcher == null, - "Cannot register same event on multiple dispatchers"); + checkForExistingDispatchers(false, eventType); LOG.info("Registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) { eventHandlers.put(eventType, handler); @@ -231,20 +256,41 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { public void registerAndCreateDispatcher(Class<? extends Enum> eventType, EventHandler handler, String dispatcherName) { Preconditions.checkState(getServiceState() == STATE.NOTINITED); - AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName); - dispatcher.register(eventType, handler); /* check to see if we have a listener registered */ - AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType); - EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType); - Preconditions.checkState(registeredHandler == null, - "Cannot register same event on multiple dispatchers"); + checkForExistingDispatchers(true, eventType); LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass()); - Preconditions.checkState(registeredDispatcher == null, - "Multiple dispatchers cannot be registered for: " + eventType.getName()); + AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName); + dispatcher.register(eventType, handler); eventDispatchers.put(eventType, dispatcher); addIfService(dispatcher); } + + public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType, + EventHandler handler, String dispatcherName, int numThreads) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + + /* check to see if we have a listener registered */ + checkForExistingDispatchers(true, eventType); + LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass()); + AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads); + dispatcher.register(eventType, handler); + concurrentEventDispatchers.put(eventType, dispatcher); + addIfService(dispatcher); + return dispatcher; + } + + public void registerWithExistingDispatcher(Class<? extends Enum> eventType, + EventHandler handler, AsyncDispatcherConcurrent dispatcher) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + + /* check to see if we have a listener registered */ + checkForExistingDispatchers(true, eventType); + LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: " + + handler.getClass()); + dispatcher.register(eventType, handler); + concurrentEventDispatchers.put(eventType, dispatcher); + } @Override public EventHandler getEventHandler() { @@ -261,13 +307,18 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { } drained = false; - // offload to specific dispatcher is one exists + // offload to specific dispatcher if one exists Class<? extends Enum> type = event.getType().getDeclaringClass(); AsyncDispatcher registeredDispatcher = eventDispatchers.get(type); if (registeredDispatcher != null) { registeredDispatcher.getEventHandler().handle(event); return; } + AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(type); + if (concurrentDispatcher != null) { + concurrentDispatcher.getEventHandler().handle(event); + return; + } // no registered dispatcher. use internal dispatcher. http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java new file mode 100644 index 0000000..d19bf9e --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A dispatcher that can schedule events concurrently. Uses a fixed size threadpool + * to schedule events. Events that have the same serializing hash will get scheduled + * on the same thread in the threadpool. This can be used to prevent concurrency issues + * for events that may not be independently processed. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +@Private +public class AsyncDispatcherConcurrent extends CompositeService implements Dispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class); + + private final String name; + private final ArrayList<LinkedBlockingQueue<Event>> eventQueues; + private volatile boolean stopped = false; + + // Configuration flag for enabling/disabling draining dispatcher's events on + // stop functionality. + private volatile boolean drainEventsOnStop = false; + + // Indicates all the remaining dispatcher's events on stop have been drained + // and processed. + private volatile boolean drained = true; + private Object waitForDrained = new Object(); + + // For drainEventsOnStop enabled only, block newly coming events into the + // queue while stopping. + private volatile boolean blockNewEvents = false; + private EventHandler handlerInstance = new GenericEventHandler(); + + private ExecutorService execService; + private final int numThreads; + + protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap(); + protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers = + Maps.newHashMap(); + private boolean exitOnDispatchException; + + AsyncDispatcherConcurrent(String name, int numThreads) { + super(name); + Preconditions.checkArgument(numThreads > 0); + this.name = name; + this.eventQueues = Lists.newArrayListWithCapacity(numThreads); + this.numThreads = numThreads; + } + + class DispatchRunner implements Runnable { + final LinkedBlockingQueue<Event> queue; + + public DispatchRunner(LinkedBlockingQueue<Event> queue) { + this.queue = queue; + } + + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = queue.isEmpty(); + // blockNewEvents is only set when dispatcher is draining to stop, + // adding this check is to avoid the overhead of acquiring the lock + // and calling notify every time in the normal run of the loop. + if (blockNewEvents) { + synchronized (waitForDrained) { + if (drained) { + waitForDrained.notify(); + } + } + } + Event event; + try { + event = queue.take(); + } catch(InterruptedException ie) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } + return; + } + if (event != null) { + dispatch(event); + } + } + } + }; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // TODO TEZ-2049 remove YARN reference + this.exitOnDispatchException = + conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Dispatcher [" + this.name + "] #%d").build()); + for (int i=0; i<numThreads; ++i) { + eventQueues.add(new LinkedBlockingQueue<Event>()); + } + for (int i=0; i<numThreads; ++i) { + execService.execute(new DispatchRunner(eventQueues.get(i))); + } + //start all the components + super.serviceStart(); + } + + public void setDrainEventsOnStop() { + drainEventsOnStop = true; + } + + @Override + protected void serviceStop() throws Exception { + if (execService != null) { + if (drainEventsOnStop) { + blockNewEvents = true; + LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); + synchronized (waitForDrained) { + while (!drained && !execService.isShutdown()) { + LOG.info("Waiting for AsyncDispatcher to drain."); + waitForDrained.wait(1000); + } + } + } + + stopped = true; + + for (int i=0; i<numThreads; ++i) { + LOG.info("AsyncDispatcher stopping with events: " + eventQueues.get(i).size() + + " in queue: " + i); + } + execService.shutdownNow(); + } + + // stop all the components + super.serviceStop(); + } + + protected void dispatch(Event event) { + //all events go thru this loop + if (LOG.isDebugEnabled()) { + LOG.debug("Dispatching the event " + event.getClass().getName() + "." + + event.toString()); + } + + Class<? extends Enum> type = event.getType().getDeclaringClass(); + + try{ + EventHandler handler = eventHandlers.get(type); + if(handler != null) { + handler.handle(event); + } else { + throw new Exception("No handler for registered for " + type); + } + } catch (Throwable t) { + LOG.error("Error in dispatcher thread", t); + // If serviceStop is called, we should exit this thread gracefully. + if (exitOnDispatchException + && (ShutdownHookManager.get().isShutdownInProgress()) == false + && stopped == false) { + Thread shutDownThread = new Thread(createShutDownThread()); + shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.start(); + } + } + } + + private void checkForExistingHandler(Class<? extends Enum> eventType) { + EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType); + Preconditions.checkState(registeredHandler == null, + "Cannot register same event on multiple dispatchers"); + } + + private void checkForExistingDispatcher(Class<? extends Enum> eventType) { + AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType); + Preconditions.checkState(registeredDispatcher == null, + "Multiple dispatchers cannot be registered for: " + eventType.getName()); + } + + private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) { + if (checkHandler) { + checkForExistingHandler(eventType); + } + checkForExistingDispatcher(eventType); + } + + /** + * Add an EventHandler for events handled inline on this dispatcher + */ + @Override + public void register(Class<? extends Enum> eventType, + EventHandler handler) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + /* check to see if we have a listener registered */ + EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType); + checkForExistingDispatchers(false, eventType); + LOG.info("Registering " + eventType + " for " + handler.getClass()); + if (registeredHandler == null) { + eventHandlers.put(eventType, handler); + } else if (!(registeredHandler instanceof MultiListenerHandler)){ + /* for multiple listeners of an event add the multiple listener handler */ + MultiListenerHandler multiHandler = new MultiListenerHandler(); + multiHandler.addHandler(registeredHandler); + multiHandler.addHandler(handler); + eventHandlers.put(eventType, multiHandler); + } else { + /* already a multilistener, just add to it */ + MultiListenerHandler multiHandler + = (MultiListenerHandler) registeredHandler; + multiHandler.addHandler(handler); + } + } + + /** + * Add an EventHandler for events handled in their own dispatchers with given name and threads + */ + + public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType, + EventHandler handler, String dispatcherName, int numThreads) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + + /* check to see if we have a listener registered */ + checkForExistingDispatchers(true, eventType); + LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass()); + AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads); + dispatcher.register(eventType, handler); + eventDispatchers.put(eventType, dispatcher); + addIfService(dispatcher); + return dispatcher; + } + + public void registerWithExistingDispatcher(Class<? extends Enum> eventType, + EventHandler handler, AsyncDispatcherConcurrent dispatcher) { + Preconditions.checkState(getServiceState() == STATE.NOTINITED); + + /* check to see if we have a listener registered */ + checkForExistingDispatchers(true, eventType); + LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: " + + handler.getClass()); + dispatcher.register(eventType, handler); + eventDispatchers.put(eventType, dispatcher); + } + + @Override + public EventHandler getEventHandler() { + return handlerInstance; + } + + class GenericEventHandler implements EventHandler<TezAbstractEvent> { + public void handle(TezAbstractEvent event) { + if (stopped) { + return; + } + if (blockNewEvents) { + return; + } + drained = false; + + // offload to specific dispatcher if one exists + Class<? extends Enum> type = event.getType().getDeclaringClass(); + AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type); + if (registeredDispatcher != null) { + registeredDispatcher.getEventHandler().handle(event); + return; + } + + int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0; + + // no registered dispatcher. use internal dispatcher. + LinkedBlockingQueue<Event> queue = eventQueues.get(index); + /* all this method does is enqueue all the events onto the queue */ + int qSize = queue.size(); + if (qSize !=0 && qSize %1000 == 0) { + LOG.info("Size of event-queue is " + qSize); + } + int remCapacity = queue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue: " + + remCapacity); + } + try { + queue.put(event); + } catch (InterruptedException e) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } + throw new YarnRuntimeException(e); + } + }; + } + + /** + * Multiplexing an event. Sending it to different handlers that + * are interested in the event. + * @param <T> the type of event these multiple handlers are interested in. + */ + static class MultiListenerHandler implements EventHandler<Event> { + List<EventHandler<Event>> listofHandlers; + + public MultiListenerHandler() { + listofHandlers = new ArrayList<EventHandler<Event>>(); + } + + @Override + public void handle(Event event) { + for (EventHandler<Event> handler: listofHandlers) { + handler.handle(event); + } + } + + void addHandler(EventHandler<Event> handler) { + listofHandlers.add(handler); + } + + } + + Runnable createShutDownThread() { + return new Runnable() { + @Override + public void run() { + LOG.info("Exiting, bbye.."); + System.exit(-1); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java b/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java new file mode 100644 index 0000000..b736112 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java @@ -0,0 +1,45 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + +package org.apache.tez.common; + +/** + * Event that allows running in parallel for different instances + * + * @param <TYPE> + * Event type + */ +public abstract class TezAbstractEvent<TYPE extends Enum<TYPE>> extends + org.apache.hadoop.yarn.event.AbstractEvent<TYPE> { + + public TezAbstractEvent(TYPE type) { + super(type); + } + + /** + * Returning a number that is identical for event instances that need to be + * serialized while processing. + * + * @return Serializing identifier. Not overriding this causes serialization + * for all events instances + */ + public int getSerializingHash() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java index b4c7b32..3d28348 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java @@ -44,6 +44,7 @@ import com.google.common.cache.LoadingCache; @InterfaceStability.Stable public class TezTaskID extends TezID { public static final String TASK = "task"; + private final int serializingHash; static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() { @Override @@ -67,10 +68,6 @@ public class TezTaskID extends TezID { private TezVertexID vertexId; - // Public for Writable serialization. Verify if this is actually required. - public TezTaskID() { - } - /** * Constructs a TezTaskID object from given {@link TezVertexID}. * @param vertexID the vertexID object for this TezTaskID @@ -91,6 +88,11 @@ public class TezTaskID extends TezID { super(id); Preconditions.checkArgument(vertexID != null, "vertexID cannot be null"); this.vertexId = vertexID; + this.serializingHash = getHashCode(true); + } + + public int getSerializingHash() { + return serializingHash; } /** Returns the {@link TezVertexID} object that this task belongs to */ @@ -135,7 +137,15 @@ public class TezTaskID extends TezID { @Override public int hashCode() { - return vertexId.hashCode() * 535013 + id; + return getHashCode(false); + } + + public int getHashCode(boolean makePositive) { + int code = vertexId.hashCode() * 535013 + id; + if (makePositive) { + code = (code < 0 ? -code : code); + } + return code; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java index ad7f5df..bcd1c5f 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java @@ -116,7 +116,7 @@ public class TestAsyncDispatcher { central.register(TestEventType1.class, new TestEventHandler1()); Assert.fail(); } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers")); + Assert.assertTrue(e.getMessage().contains("Multiple dispatchers cannot be registered for")); } finally { central.close(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java new file mode 100644 index 0000000..1fa8123 --- /dev/null +++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.common; + +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Assert; +import org.junit.Test; + +@SuppressWarnings("unchecked") +public class TestAsyncDispatcherConcurrent { + + static class CountDownEventHandler { + static CountDownLatch latch; + static void init(CountDownLatch latch) { + CountDownEventHandler.latch = latch; + } + + static void checkParallelCountersDoneAndFinish() throws Exception { + latch.countDown(); + latch.await(); + } + + public void handle() { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public enum TestEventType1 { TYPE1 } + public class TestEvent1 extends TezAbstractEvent<TestEventType1> { + final int hash; + public TestEvent1(TestEventType1 type, int hash) { + super(type); + this.hash = hash; + } + + @Override + public int getSerializingHash() { + return hash; + } + } + class TestEventHandler1 extends CountDownEventHandler implements EventHandler<TestEvent1> { + @Override + public void handle(TestEvent1 event) { + handle(); + } + } + public enum TestEventType2 { TYPE2 } + public class TestEvent2 extends TezAbstractEvent<TestEventType2> { + public TestEvent2(TestEventType2 type) { + super(type); + } + } + class TestEventHandler2 extends CountDownEventHandler implements EventHandler<TestEvent2> { + @Override + public void handle(TestEvent2 event) { + handle(); + } + } + public enum TestEventType3 { TYPE3 } + public class TestEvent3 extends TezAbstractEvent<TestEventType3> { + public TestEvent3(TestEventType3 type) { + super(type); + } + } + class TestEventHandler3 extends CountDownEventHandler implements EventHandler<TestEvent3> { + @Override + public void handle(TestEvent3 event) { + handle(); + } + } + + @Test (timeout=5000) + public void testBasic() throws Exception { + CountDownLatch latch = new CountDownLatch(4); + CountDownEventHandler.init(latch); + + AsyncDispatcher central = new AsyncDispatcher("Type1"); + central.register(TestEventType1.class, new TestEventHandler1()); + central.registerAndCreateDispatcher(TestEventType2.class, new TestEventHandler2(), "Type2", 1); + central.registerAndCreateDispatcher(TestEventType3.class, new TestEventHandler3(), "Type3", 1); + + central.init(new Configuration()); + central.start(); + // 3 threads in different dispatchers will handle 3 events + central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 0)); + central.getEventHandler().handle(new TestEvent2(TestEventType2.TYPE2)); + central.getEventHandler().handle(new TestEvent3(TestEventType3.TYPE3)); + // wait for all events to be run in parallel + CountDownEventHandler.checkParallelCountersDoneAndFinish(); + central.close(); + } + + @Test (timeout=5000) + public void testMultiThreads() throws Exception { + CountDownLatch latch = new CountDownLatch(4); + CountDownEventHandler.init(latch); + + AsyncDispatcherConcurrent central = new AsyncDispatcherConcurrent("Type1", 1); + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler1(), "Type1", 3); + + central.init(new Configuration()); + central.start(); + // 3 threads in the same dispatcher will handle 3 events + central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 0)); + central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 1)); + central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 2)); + // wait for all events to be run in parallel + CountDownEventHandler.checkParallelCountersDoneAndFinish(); + central.close(); + } + + @Test (timeout=5000) + public void testMultipleRegisterFail() throws Exception { + AsyncDispatcher central = new AsyncDispatcher("Type1"); + try { + central.register(TestEventType1.class, new TestEventHandler1()); + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers")); + } finally { + central.close(); + } + + central = new AsyncDispatcher("Type1"); + try { + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1); + central.register(TestEventType1.class, new TestEventHandler1()); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered")); + } finally { + central.close(); + } + + central = new AsyncDispatcher("Type1"); + try { + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1); + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered")); + } finally { + central.close(); + } + + central = new AsyncDispatcher("Type1"); + try { + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2"); + central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2"); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Multiple dispatchers cannot be registered for")); + } finally { + central.close(); + } + + central = new AsyncDispatcher("Type1"); + try { + AsyncDispatcherConcurrent concDispatcher = central.registerAndCreateDispatcher( + TestEventType1.class, new TestEventHandler2(), "Type2", 1); + central.registerWithExistingDispatcher(TestEventType1.class, new TestEventHandler1(), + concDispatcher); + Assert.fail(); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered")); + } finally { + central.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 27b9c37..3e3d6f0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.AsyncDispatcher; +import org.apache.tez.common.AsyncDispatcherConcurrent; import org.apache.tez.common.GcTimeUpdater; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezConverterUtils; @@ -455,12 +456,22 @@ public class DAGAppMaster extends AbstractService { dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler()); dispatcher.register(DAGEventType.class, dagEventDispatcher); dispatcher.register(VertexEventType.class, vertexEventDispatcher); - dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); - dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); + if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, + TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) { + dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); + dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); + } else { + int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY, + TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT); + AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher( + TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency); + dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class, + new TaskAttemptEventDispatcher(), sharedDispatcher); + } // register other delegating dispatchers - dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(), "Speculator"); - + dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(), + "Speculator"); if (enableWebUIService()) { this.webUIService = new WebUIService(context); http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 3f60a4e..6c85cc2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -128,8 +128,6 @@ public interface TaskAttempt { */ long getFinishTime(); - public Task getTask(); - TaskAttemptState restoreFromEvent(HistoryEvent event); } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java index e148fe8..7e68752 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java @@ -20,11 +20,11 @@ package org.apache.tez.dag.app.dag.event; import java.util.concurrent.Callable; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; import com.google.common.util.concurrent.FutureCallback; -public abstract class CallableEvent extends AbstractEvent<CallableEventType> implements +public abstract class CallableEvent extends TezAbstractEvent<CallableEventType> implements Callable<Void> { private final FutureCallback<Void> callback; http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java index 0571cab..b7cb3a4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java @@ -18,9 +18,10 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; -public class DAGAppMasterEvent extends AbstractEvent<DAGAppMasterEventType> { + +public class DAGAppMasterEvent extends TezAbstractEvent<DAGAppMasterEventType> { public DAGAppMasterEvent(DAGAppMasterEventType type) { super(type); http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java index 1ec0222..a0a8a1a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java @@ -18,14 +18,14 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezDAGID; /** * This class encapsulates job related events. * */ -public class DAGEvent extends AbstractEvent<DAGEventType> { +public class DAGEvent extends TezAbstractEvent<DAGEventType> { private TezDAGID dagId; http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java index 16fab8e..3863a2a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java @@ -18,10 +18,10 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezVertexID; -public class SpeculatorEvent extends AbstractEvent<SpeculatorEventType> { +public class SpeculatorEvent extends TezAbstractEvent<SpeculatorEventType> { private final TezVertexID vertexId; public SpeculatorEvent(SpeculatorEventType type, TezVertexID vertexId) { http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java index 56c03e3..63ef70f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java @@ -18,14 +18,14 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezTaskAttemptID; /** * This class encapsulates task attempt related events. * */ -public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> { +public class TaskAttemptEvent extends TezAbstractEvent<TaskAttemptEventType> { private TezTaskAttemptID attemptID; @@ -42,4 +42,9 @@ public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> { public TezTaskAttemptID getTaskAttemptID() { return attemptID; } + + @Override + public int getSerializingHash() { + return attemptID.getTaskID().getSerializingHash(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java index c7e5faa..def9ddf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java @@ -18,14 +18,14 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezTaskID; /** * this class encapsulates task related events. * */ -public class TaskEvent extends AbstractEvent<TaskEventType> { +public class TaskEvent extends TezAbstractEvent<TaskEventType> { private TezTaskID taskId; @@ -37,4 +37,9 @@ public class TaskEvent extends AbstractEvent<TaskEventType> { public TezTaskID getTaskID() { return taskId; } + + @Override + public int getSerializingHash() { + return taskId.getSerializingHash(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java index 9e94eb5..33128e4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java @@ -18,14 +18,14 @@ package org.apache.tez.dag.app.dag.event; -import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tez.common.TezAbstractEvent; import org.apache.tez.dag.records.TezVertexID; /** * this class encapsulates vertex related events. * */ -public class VertexEvent extends AbstractEvent<VertexEventType> { +public class VertexEvent extends TezAbstractEvent<VertexEventType> { private TezVertexID vertexId; http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index f562451..f769565 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1261,7 +1261,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, if (finishTime == 0) { setFinishTime(); } - entityUpdateTracker.stop(); boolean recoveryError = false; http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 1f3e1cf..b1c0acc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -144,6 +144,9 @@ public class TaskAttemptImpl implements TaskAttempt, private NodeId containerNodeId; private String nodeHttpAddress; private String nodeRackName; + + private final Task task; + private final Vertex vertex; @VisibleForTesting TaskAttemptStatus reportedStatus; @@ -406,7 +409,8 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, - Resource resource, ContainerContext containerContext, boolean leafVertex) { + Resource resource, ContainerContext containerContext, boolean leafVertex, + Task task) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -417,6 +421,9 @@ public class TaskAttemptImpl implements TaskAttempt, this.clock = clock; this.taskHeartbeatHandler = taskHeartbeatHandler; this.appContext = appContext; + this.task = task; + this.vertex = this.task.getVertex(); + this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); RackResolver.init(conf); @@ -649,17 +656,9 @@ public class TaskAttemptImpl implements TaskAttempt, readLock.unlock(); } } - - @Override - public Task getTask() { - return appContext.getCurrentDAG() - .getVertex(attemptId.getTaskID().getVertexID()) - .getTask(attemptId.getTaskID()); - } - + Vertex getVertex() { - return appContext.getCurrentDAG() - .getVertex(attemptId.getTaskID().getVertexID()); + return vertex; } @SuppressWarnings("unchecked") @@ -955,7 +954,7 @@ public class TaskAttemptImpl implements TaskAttempt, if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { - String contextStr = "v_" + getTask().getVertex().getName() + String contextStr = "v_" + getVertex().getName() + "_" + this.attemptId.toString(); completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) + "/" + containerNodeId.toString() @@ -964,7 +963,7 @@ public class TaskAttemptImpl implements TaskAttempt, + "/" + this.appContext.getUser(); } TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( - attemptId, getTask().getVertex().getName(), + attemptId, getVertex().getName(), launchTime, containerId, containerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); this.appContext.getHistoryHandler().handle( @@ -976,7 +975,7 @@ public class TaskAttemptImpl implements TaskAttempt, if (getLaunchTime() == 0) return; TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( - attemptId, getTask().getVertex().getName(), getLaunchTime(), + attemptId, getVertex().getName(), getLaunchTime(), getFinishTime(), TaskAttemptState.SUCCEEDED, null, "", getCounters()); // FIXME how do we store information regd completion events @@ -987,7 +986,7 @@ public class TaskAttemptImpl implements TaskAttempt, protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( - attemptId, getTask().getVertex().getName(), getLaunchTime(), + attemptId, getVertex().getName(), getLaunchTime(), clock.getTime(), state, terminationCause, StringUtils.join( http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 461339b..8b63734 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -119,6 +119,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private Map<TezTaskAttemptID, TaskAttempt> attempts; private final int maxFailedAttempts; protected final Clock clock; + private final Vertex vertex; private final Lock readLock; private final Lock writeLock; private final List<String> diagnostics = new ArrayList<String>(); @@ -326,7 +327,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, Resource resource, ContainerContext containerContext, - StateChangeNotifier stateChangeNotifier) { + StateChangeNotifier stateChangeNotifier, + Vertex vertex) { this.conf = conf; this.clock = clock; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -342,7 +344,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { this.eventHandler = eventHandler; this.appContext = appContext; this.stateChangeNotifier = stateChangeNotifier; - + this.vertex = vertex; this.leafVertex = leafVertex; this.taskResource = resource; this.containerContext = containerContext; @@ -382,7 +384,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { @Override public Vertex getVertex() { - return appContext.getCurrentDAG().getVertex(taskId.getVertexID()); + return vertex; } @Override @@ -778,7 +780,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { TaskAttemptImpl createAttempt(int attemptNumber) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - (failedAttempts > 0), taskResource, containerContext, leafVertex); + (failedAttempts > 0), taskResource, containerContext, leafVertex, this); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index c5de19b..9ed7441 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -109,9 +109,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; import org.apache.tez.dag.app.dag.event.SpeculatorEvent; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; @@ -164,7 +162,6 @@ import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; -import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventType; @@ -194,8 +191,7 @@ import org.slf4j.LoggerFactory; * The read and write calls use ReadWriteLock for concurrency. */ @SuppressWarnings({ "rawtypes", "unchecked" }) -public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, - EventHandler<VertexEvent> { +public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandler<VertexEvent> { private static final String LINE_SEPARATOR = System .getProperty("line.separator"); @@ -216,6 +212,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // TODO Metrics //private final MRAppMetrics metrics; private final AppContext appContext; + private final DAG dag; private boolean lazyTasksCopyNeeded = false; // must be a linked map for ordering @@ -867,6 +864,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, setTaskLocationHints(vertexLocationHint); this.dagUgi = appContext.getCurrentDAG().getDagUGI(); + this.dag = appContext.getCurrentDAG(); this.taskResource = DagTypeConverters .createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig()); @@ -2154,7 +2152,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.targetVertices.isEmpty() : true), this.taskResource, conContext, - this.stateChangeNotifier); + this.stateChangeNotifier, + this); } private void createTasks() { @@ -4409,7 +4408,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public DAG getDAG() { - return appContext.getCurrentDAG(); + return dag; } private TezDAGID getDAGId() { http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 2a061bc..87ffead 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -136,7 +136,7 @@ public class TestMockDAGAppMaster { lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); - DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("testLocalResourceSetup").addTaskLocalFiles(lrDAG); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex); dag.addVertex(vA); @@ -166,7 +166,7 @@ public class TestMockDAGAppMaster { MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); mockLauncher.startScheduling(false); // there is only 1 task whose first attempt will be preempted - DAG dag = DAG.create("test"); + DAG dag = DAG.create("testInternalPreemption"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1); dag.addVertex(vA); @@ -197,7 +197,7 @@ public class TestMockDAGAppMaster { MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); mockLauncher.startScheduling(false); mockApp.sendDMEvents = true; - DAG dag = DAG.create("test"); + DAG dag = DAG.create("testBasicEvents"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2); Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2); Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2); @@ -230,21 +230,27 @@ public class TestMockDAGAppMaster { List<TezEvent> tEvents = tImpl.getTaskEvents(); Assert.assertEquals(2, tEvents.size()); // 2 from vA Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName()); - Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex()); Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex()); Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName()); - Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex()); Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex()); + int targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex(); + int targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex(); + // order of vA task completion can change order of events + Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2, + (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0)); vImpl = (VertexImpl) dagImpl.getVertex(vC.getName()); tImpl = (TaskImpl) vImpl.getTask(1); tEvents = tImpl.getTaskEvents(); Assert.assertEquals(2, tEvents.size()); // 2 from vA Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName()); - Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex()); Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex()); Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName()); - Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex()); Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex()); + targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex(); + targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex(); + // order of vA task completion can change order of events + Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2, + (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0)); vImpl = (VertexImpl) dagImpl.getVertex(vD.getName()); tImpl = (TaskImpl) vImpl.getTask(1); tEvents = tImpl.getTaskEvents(); @@ -478,7 +484,7 @@ public class TestMockDAGAppMaster { final String vAName = "A"; - DAG dag = DAG.create("testBasicCounters"); + DAG dag = DAG.create("testBasicCounterMemory"); Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10000); dag.addVertex(vA); @@ -511,6 +517,30 @@ public class TestMockDAGAppMaster { checkMemory(dag.getName(), mockApp); tezClient.stop(); } + + @Ignore + @Test (timeout = 60000) + public void testTaskEventsProcessingSpeed() throws Exception { + Logger.getRootLogger().setLevel(Level.WARN); + TezConfiguration tezconf = new TezConfiguration(defaultConf); + tezconf.setBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, true); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false, 30, 1000); + tezClient.start(); + + final String vAName = "A"; + + DAG dag = DAG.create("testTaskEventsProcessingSpeed"); + Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 50000); + dag.addVertex(vA); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + mockApp.doSleep = false; + DAGClient dagClient = tezClient.submitDAG(dag); + DAGStatus status = dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); + tezClient.stop(); + } @Ignore @Test (timeout = 60000) @@ -530,7 +560,7 @@ public class TestMockDAGAppMaster { ioStats.setItemsProcessed(1); TaskStatistics vAStats = new TaskStatistics(); - DAG dag = DAG.create("testBasisStatistics"); + DAG dag = DAG.create("testBasicStatisticsMemory"); Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), numTasks); for (int i=0; i<numSources; ++i) { final String sourceName = i + vAName; @@ -623,7 +653,7 @@ public class TestMockDAGAppMaster { MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); mockLauncher.startScheduling(false); - DAG dag = DAG.create("test"); + DAG dag = DAG.create("testSchedulerErrorHandling"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); dag.addVertex(vA); http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 2a2df7c..50bb68c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -81,6 +81,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate; @@ -672,6 +673,35 @@ public class TestTaskAttempt { } @Test(timeout = 5000) + public void testEventSerializingHash() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID1 = TezTaskID.getInstance(vertexID, 1); + TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2); + TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(taskID1, 0); + TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(taskID1, 1); + TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(taskID2, 1); + + TaskAttemptEvent taEventFail11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_FAILED); + TaskAttemptEvent taEventKill11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_KILL_REQUEST); + TaskAttemptEvent taEventKill12 = new TaskAttemptEvent(taID12, TaskAttemptEventType.TA_KILL_REQUEST); + TaskAttemptEvent taEventKill21 = new TaskAttemptEvent(taID21, TaskAttemptEventType.TA_KILL_REQUEST); + TaskEvent tEventKill1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_KILLED); + TaskEvent tEventFail1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_FAILED); + TaskEvent tEventFail2 = new TaskEvent(taskID2, TaskEventType.T_ATTEMPT_FAILED); + + // all of them should have the same value + assertEquals(taEventFail11.getSerializingHash(), taEventKill11.getSerializingHash()); + assertEquals(taEventKill11.getSerializingHash(), taEventKill12.getSerializingHash()); + assertEquals(tEventFail1.getSerializingHash(), tEventKill1.getSerializingHash()); + assertEquals(taEventFail11.getSerializingHash(), tEventKill1.getSerializingHash()); + assertEquals(taEventKill21.getSerializingHash(), tEventFail2.getSerializingHash()); + // events from different tasks may not have the same value + assertFalse(tEventFail1.getSerializingHash() == tEventFail2.getSerializingHash()); + } + + @Test(timeout = 5000) public void testSuccess() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( @@ -695,7 +725,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); @@ -786,7 +816,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); @@ -881,7 +911,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); @@ -984,7 +1014,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); @@ -1084,7 +1114,7 @@ public class TestTaskAttempt { Resource resource = Resource.newInstance(1024, 1); NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); @@ -1211,7 +1241,7 @@ public class TestTaskAttempt { Resource resource, ContainerContext containerContext, boolean leafVertex) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, taskHeartbeatHandler, appContext, - isRescheduled, resource, containerContext, leafVertex); + isRescheduled, resource, containerContext, leafVertex, mock(TaskImpl.class)); this.locationHint = locationHint; } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index b8b09d0..d6d874d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -148,7 +148,7 @@ public class TestTaskAttemptRecovery { mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, false, Resource.newInstance(1, 1), - mock(ContainerContext.class), false); + mock(ContainerContext.class), false, mockTask); taId = ta.getID(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 9509df4..66e6724 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -630,7 +630,7 @@ public class TestTaskImpl { ContainerContext containerContext, Vertex vertex) { super(vertexId, partition, eventHandler, conf, taskAttemptListener, clock, thh, appContext, leafVertex, resource, - containerContext, mock(StateChangeNotifier.class)); + containerContext, mock(StateChangeNotifier.class), vertex); this.vertex = vertex; this.locationHint = locationHint; } @@ -687,7 +687,7 @@ public class TestTaskImpl { TaskLocationHint locationHint, boolean isRescheduled, Resource resource, ContainerContext containerContext) { super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, - appContext, isRescheduled, resource, containerContext, false); + appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class)); this.locationHint = locationHint; } http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index e182f24..2a49826 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -190,7 +190,7 @@ public class TestTaskRecovery { new Configuration(), mock(TaskAttemptListener.class), new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, false, Resource.newInstance(1, 1), - mock(ContainerContext.class), mock(StateChangeNotifier.class)); + mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex); Map<String, OutputCommitter> committers = new HashMap<String, OutputCommitter>();
