TEZ-2361. Propagate dag completion to TaskCommunicator. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a5f872e9 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a5f872e9 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a5f872e9 Branch: refs/heads/TEZ-2003 Commit: a5f872e908835a152466b670164bf9f46568fa05 Parents: c44b337 Author: Siddharth Seth <[email protected]> Authored: Thu Apr 23 17:26:25 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 14 13:46:43 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/api/TaskCommunicator.java | 12 +++- .../org/apache/tez/dag/app/DAGAppMaster.java | 4 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 17 +++++- .../dag/app/TaskCommunicatorContextImpl.java | 64 +++++++++++++++++--- .../tez/dag/app/TezTaskCommunicatorImpl.java | 5 ++ .../tez/dag/app/launcher/ContainerLauncher.java | 3 - .../dag/app/launcher/ContainerLauncherImpl.java | 12 ---- .../app/launcher/ContainerLauncherRouter.java | 10 +++ .../app/launcher/LocalContainerLauncher.java | 9 --- .../apache/tez/dag/app/MockDAGAppMaster.java | 11 ---- .../rm/TezTestServiceTaskSchedulerService.java | 2 +- .../TezTestServiceTaskCommunicatorImpl.java | 2 +- .../tez/tests/TestExternalTezServices.java | 2 + 14 files changed, 103 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 7c13110..f6bc8e7 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -17,5 +17,6 @@ ALL CHANGES: TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. TEZ-2347. Expose additional information in TaskCommunicatorContext. + TEZ-2361. Propagate dag completion to TaskCommunicator. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index a2cd858..cadca0c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -14,7 +14,6 @@ package org.apache.tez.dag.api; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; @@ -74,4 +73,15 @@ public abstract class TaskCommunicator extends AbstractService { * @throws Exception */ public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception; + + /** + * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to + * query information about the current dag during the duration of the dagComplete invocation. + * + * After this, the contents returned from querying the context may change at any point - due to + * the next dag being submitted. + */ + // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially + // throw exceptions between a dagComplete and dagStart invocation. + public abstract void dagComplete(String dagName); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 6ab0f8e..04e72db 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -766,7 +766,7 @@ public class DAGAppMaster extends AbstractService { DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event; LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + cleanupEvent.getDag().getID()); - containerLauncher.dagComplete(cleanupEvent.getDag()); + containerLauncherRouter.dagComplete(cleanupEvent.getDag()); taskAttemptListener.dagComplete(cleanupEvent.getDag()); nodes.dagComplete(cleanupEvent.getDag()); containers.dagComplete(cleanupEvent.getDag()); @@ -780,7 +780,7 @@ public class DAGAppMaster extends AbstractService { case NEW_DAG_SUBMITTED: // Inform sub-components that a new DAG has been submitted. taskSchedulerEventHandler.dagSubmitted(); - containerLauncher.dagSubmitted(); + containerLauncherRouter.dagSubmitted(); taskAttemptListener.dagSubmitted(); break; default: http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 386e4af..7cdf292 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -80,6 +80,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements private final AppContext context; private final TaskCommunicator[] taskCommunicators; + private final TaskCommunicatorContext[] taskCommunicatorContexts; protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; @@ -122,7 +123,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length]; + this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length]; for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) { + taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, i); taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i); } // TODO TEZ-2118 Start using taskCommunicator indices properly @@ -147,10 +150,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) { if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Using Default Task Communicator"); - return new TezTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex)); + return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Using Default Local Task Communicator"); - return new TezLocalTaskCommunicatorImpl(new TaskCommunicatorContextImpl(context, this, taskCommIndex)); + return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); } else { LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier); Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils @@ -158,7 +161,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements try { Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class); ctor.setAccessible(true); - return ctor.newInstance(new TaskCommunicatorContextImpl(context, this, taskCommIndex)); + return ctor.newInstance(taskCommunicatorContexts[taskCommIndex]); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); } catch (InvocationTargetException e) { @@ -317,6 +320,14 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // This becomes more relevant when task kills without container kills are allowed. // TODO TEZ-2336. Send a signal to containers indicating DAG completion. + + // Inform all communicators of the dagCompletion. + for (int i = 0 ; i < taskCommunicators.length ; i++) { + ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag); + taskCommunicators[i].dagComplete(dag.getName()); + ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd(); + } + } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 4cb0c93..790066f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -17,6 +17,11 @@ package org.apache.tez.dag.app; import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -33,6 +38,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexStateUpdateListener; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -44,6 +50,10 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver private final AppContext context; private final TaskAttemptListenerImpTezDag taskAttemptListener; private final int taskCommunicatorIndex; + private final ReentrantReadWriteLock.ReadLock dagChangedReadLock; + private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock; + + private DAG dag; public TaskCommunicatorContextImpl(AppContext appContext, TaskAttemptListenerImpTezDag taskAttemptListener, @@ -51,6 +61,10 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver this.context = appContext; this.taskAttemptListener = taskAttemptListener; this.taskCommunicatorIndex = taskCommunicatorIndex; + + ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock(); + dagChangedReadLock = dagChangedLock.readLock(); + dagChangedWriteLock = dagChangedLock.writeLock(); } @Override @@ -111,18 +125,19 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver public void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet) { Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); - context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this); + getDag().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, + this); } @Override public String getCurretnDagName() { - return context.getCurrentDAG().getName(); + return getDag().getName(); } @Override public Iterable<String> getInputVertexNames(String vertexName) { Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); - Vertex vertex = context.getCurrentDAG().getVertex(vertexName); + Vertex vertex = getDag().getVertex(vertexName); Set<Vertex> sources = vertex.getInputVertices().keySet(); return Iterables.transform(sources, new Function<Vertex, String>() { @Override @@ -135,31 +150,32 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public int getVertexTotalTaskCount(String vertexName) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); - return context.getCurrentDAG().getVertex(vertexName).getTotalTasks(); + return getDag().getVertex(vertexName).getTotalTasks(); } @Override public int getVertexCompletedTaskCount(String vertexName) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); - return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks(); + return getDag().getVertex(vertexName).getCompletedTasks(); } @Override public int getVertexRunningTaskCount(String vertexName) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); - return context.getCurrentDAG().getVertex(vertexName).getRunningTasks(); + return getDag().getVertex(vertexName).getRunningTasks(); } @Override public long getFirstAttemptStartTime(String vertexName, int taskIndex) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0"); - return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime(); + return getDag().getVertex(vertexName).getTask( + taskIndex).getFirstAttemptStartTime(); } @Override public long getDagStartTime() { - return context.getCurrentDAG().getStartTime(); + return getDag().getStartTime(); } @Override @@ -171,4 +187,36 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver throw new TezUncheckedException(e); } } + + private DAG getDag() { + dagChangedReadLock.lock(); + try { + if (dag != null) { + return dag; + } else { + return context.getCurrentDAG(); + } + } finally { + dagChangedReadLock.unlock(); + } + } + + @InterfaceAudience.Private + public void dagCompleteStart(DAG dag) { + dagChangedWriteLock.lock(); + try { + this.dag = dag; + } finally { + dagChangedWriteLock.unlock(); + } + } + + public void dagCompleteEnd() { + dagChangedWriteLock.lock(); + try { + this.dag = null; + } finally { + dagChangedWriteLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index fa2749a..6200a5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -260,6 +260,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { // Empty. Not registering, or expecting any updates. } + @Override + public void dagComplete(String dagName) { + // Nothing to do at the moment. Some of the TODOs from TaskAttemptListener apply here. + } + protected String getTokenIdentifier() { return tokenIdentifier; } http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java index 8a8498f..ea07a1d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java @@ -26,7 +26,4 @@ import org.apache.tez.dag.app.rm.NMCommunicatorEvent; public interface ContainerLauncher extends EventHandler<NMCommunicatorEvent> { - void dagComplete(DAG dag); - - void dagSubmitted(); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index 94889a1..a1eb2a7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -111,17 +110,6 @@ public class ContainerLauncherImpl extends AbstractService implements } } - @Override - public void dagComplete(DAG dag) { - // Nothing required at the moment. Containers are shared across DAGs - } - - @Override - public void dagSubmitted() { - // Nothing to do right now. Indicates that a new DAG has been submitted and - // the context has updated information. - } - private static enum ContainerState { PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH } http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index dd3571e..db145f4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -27,6 +27,7 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +129,15 @@ public class ContainerLauncherRouter extends AbstractService } } + public void dagComplete(DAG dag) { + // Nothing required at the moment. Containers are shared across DAGs + } + + public void dagSubmitted() { + // Nothing to do right now. Indicates that a new DAG has been submitted and + // the context has updated information. + } + @Override public void handle(NMCommunicatorEvent event) { http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 7dbf937..fe23409 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -44,7 +44,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -170,14 +169,6 @@ public class LocalContainerLauncher extends AbstractService implements callbackExecutor.shutdownNow(); } - @Override - public void dagComplete(DAG dag) { - } - - @Override - public void dagSubmitted() { - } - // Thread to monitor the queue of incoming NMCommunicator events private class TezSubTaskRunner implements Runnable { @Override http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index b39eee2..9882954 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -34,10 +34,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.AbstractService; @@ -54,7 +52,6 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.launcher.ContainerLauncher; import org.apache.tez.dag.app.launcher.ContainerLauncherRouter; @@ -150,14 +147,6 @@ public class MockDAGAppMaster extends DAGAppMaster { this.goFlag = goFlag; } - @Override - public void dagComplete(DAG dag) { - } - - @Override - public void dagSubmitted() { - } - public class ContainerData { ContainerId cId; TezTaskAttemptID taId; http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 50dfb24..073cb50 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -158,7 +158,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } @Override - public void resetMatchLocalityForAllHeldContainers() { + public void dagComplete() { } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index ef983c2..cf28b11 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -150,7 +150,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl t = se.getCause(); } if (t instanceof RemoteException) { - RemoteException re = (RemoteException)t; + RemoteException re = (RemoteException) t; String message = re.toString(); if (message.contains(RejectedExecutionException.class.getName())) { getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), http://git-wip-us.apache.org/repos/asf/tez/blob/a5f872e9/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 4d0a610..45c70f1 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.JoinDataGen; import org.apache.tez.examples.JoinValidateConfigured; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.processor.SleepProcessor; import org.apache.tez.service.MiniTezTestServiceCluster; import org.apache.tez.service.impl.ContainerRunnerImpl; @@ -124,6 +125,7 @@ public class TestExternalTezServices { remoteFs.mkdirs(stagingDirPath); // This is currently configured to push tasks into the Service, and then use the standard RPC confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
