Repository: tez Updated Branches: refs/heads/master 685fa742f -> 2c7abeb15
TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ). (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2c7abeb1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2c7abeb1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2c7abeb1 Branch: refs/heads/master Commit: 2c7abeb155f6e832b3da407d4d537389778b3a02 Parents: 685fa74 Author: Hitesh Shah <[email protected]> Authored: Tue Dec 15 18:38:45 2015 -0800 Committer: Hitesh Shah <[email protected]> Committed: Tue Dec 15 18:38:45 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + tez-common/pom.xml | 4 ++ .../org/apache/tez/common/TezUtilsInternal.java | 26 ++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 32 +++++---- .../app/dag/RootInputInitializerManager.java | 25 +++++-- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 69 ++++++++++++-------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 41 ++++++++---- .../app/launcher/LocalContainerLauncher.java | 4 +- .../dag/history/recovery/RecoveryService.java | 3 + .../apache/tez/dag/app/TestRecoveryParser.java | 14 +++- .../apache/tez/dag/app/dag/impl/TestCommit.java | 3 + .../tez/dag/app/dag/impl/TestDAGImpl.java | 12 ++++ .../tez/dag/app/dag/impl/TestDAGRecovery.java | 3 + .../tez/dag/app/dag/impl/TestVertexImpl.java | 2 + .../history/recovery/TestRecoveryService.java | 10 +++ tez-dist/pom.xml | 39 +++++++++++ .../org/apache/tez/examples/TezExampleBase.java | 14 ++++ .../tez/service/impl/ContainerRunnerImpl.java | 6 +- .../tez/mapreduce/output/TestMROutput.java | 3 +- .../tez/mapreduce/processor/MapUtils.java | 3 +- .../processor/reduce/TestReduceProcessor.java | 3 +- tez-runtime-internals/pom.xml | 4 ++ .../runtime/LogicalIOProcessorRuntimeTask.java | 9 ++- .../tez/runtime/task/TaskRunner2Callable.java | 7 +- .../org/apache/tez/runtime/task/TezChild.java | 22 +++++-- .../apache/tez/runtime/task/TezTaskRunner2.java | 8 ++- .../TestLogicalIOProcessorRuntimeTask.java | 11 ++-- .../runtime/api/impl/TestProcessorContext.java | 3 +- .../tez/runtime/task/TestTaskExecution2.java | 5 +- .../output/TestOnFileUnorderedKVOutput.java | 5 +- tez-tests/pom.xml | 1 - .../examples/TestOrderedWordCount.java | 8 ++- 32 files changed, 314 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a9d1893..8588923 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job. ALL CHANGES: + TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ). TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization. TEZ-2997. Tez UI: Support searches by CallerContext ID for DAGs TEZ-2996. TestAnalyzer fails in trunk after recovery redesign http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-common/pom.xml ---------------------------------------------------------------------- diff --git a/tez-common/pom.xml b/tez-common/pom.xml index a521d54..e133c9c 100644 --- a/tez-common/pom.xml +++ b/tez-common/pom.xml @@ -58,6 +58,10 @@ <artifactId>tez-api</artifactId> </dependency> <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>hadoop-shim</artifactId> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index c2a50f5..a99bab4 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -38,8 +38,13 @@ import com.google.protobuf.TextFormat; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.log4j.Appender; import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; @@ -327,4 +332,25 @@ public class TezUtilsInternal { return TaskAttemptEndReason.OTHER; } } + + @Private + public static void setHadoopCallerContext(HadoopShim hadoopShim, TezTaskAttemptID attemptID) { + hadoopShim.setHadoopCallerContext("tez_ta:" + attemptID.toString()); + } + + @Private + public static void setHadoopCallerContext(HadoopShim hadoopShim, TezVertexID vertexID) { + hadoopShim.setHadoopCallerContext("tez_v:" + vertexID.toString()); + } + + @Private + public static void setHadoopCallerContext(HadoopShim hadoopShim, TezDAGID dagID) { + hadoopShim.setHadoopCallerContext("tez_dag:" + dagID.toString()); + } + + @Private + public static void setHadoopCallerContext(HadoopShim hadoopShim, ApplicationId appID) { + hadoopShim.setHadoopCallerContext("tez_app:" + appID.toString()); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 5cbdab4..c0b86a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1408,7 +1408,7 @@ public class DAGAppMaster extends AbstractService { } } - private List<URL> processAdditionalResources(Map<String, LocalResource> lrDiff) + private List<URL> processAdditionalResources(TezDAGID dagId, Map<String, LocalResource> lrDiff) throws TezException { if (lrDiff == null || lrDiff.isEmpty()) { return Collections.emptyList(); @@ -1416,6 +1416,7 @@ public class DAGAppMaster extends AbstractService { LOG.info("Localizing additional local resources for AM : " + lrDiff); List<URL> downloadedURLs; try { + TezUtilsInternal.setHadoopCallerContext(hadoopShim, dagId); downloadedURLs = RelocalizationUtils.processAdditionalResources( Maps.transformValues(lrDiff, new Function<LocalResource, URI>() { @@ -1426,6 +1427,8 @@ public class DAGAppMaster extends AbstractService { }), getConfig(), workingDirectory); } catch (IOException e) { throw new TezException(e); + } finally { + hadoopShim.clearHadoopCallerContext(); } LOG.info("Done downloading additional AM resources"); return downloadedURLs; @@ -1850,14 +1853,19 @@ public class DAGAppMaster extends AbstractService { private DAGRecoveryData recoverDAG() throws IOException, TezException { if (recoveryEnabled) { - if (this.appAttemptID.getAttemptId() > 1) { - LOG.info("Recovering data from previous attempts" - + ", currentAttemptId=" + this.appAttemptID.getAttemptId()); - this.state = DAGAppMasterState.RECOVERING; - RecoveryParser recoveryParser = new RecoveryParser( - this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId()); - DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData(); - return recoveredDAGData; + try { + TezUtilsInternal.setHadoopCallerContext(hadoopShim, this.getAppID()); + if (this.appAttemptID.getAttemptId() > 1) { + LOG.info("Recovering data from previous attempts" + + ", currentAttemptId=" + this.appAttemptID.getAttemptId()); + this.state = DAGAppMasterState.RECOVERING; + RecoveryParser recoveryParser = new RecoveryParser( + this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId()); + DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData(); + return recoveredDAGData; + } + } finally { + hadoopShim.clearHadoopCallerContext(); } } return null; @@ -1906,7 +1914,9 @@ public class DAGAppMaster extends AbstractService { if (recoveredDAGData != null) { if (recoveredDAGData.cumulativeAdditionalResources != null) { - recoveredDAGData.additionalUrlsForClasspath = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources); + recoveredDAGData.additionalUrlsForClasspath = processAdditionalResources( + recoveredDAGData.recoveredDagID, + recoveredDAGData.cumulativeAdditionalResources); amResources.putAll(recoveredDAGData.cumulativeAdditionalResources); cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources); } @@ -2397,7 +2407,7 @@ public class DAGAppMaster extends AbstractService { additionalUrlsForClasspath = dag.getDagUGI().doAs(new PrivilegedExceptionAction<List<URL>>() { @Override public List<URL> run() throws Exception { - return processAdditionalResources(additionalAmResources); + return processAdditionalResources(currentDAG.getID(), additionalAmResources); } }); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 13128f8..57a7172 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; @@ -112,7 +113,14 @@ public class RootInputInitializerManager { InputInitializerContext context = new TezRootInputInitializerContextImpl(input, vertex, appContext, this); - InputInitializer initializer = createInitializer(input, context); + + InputInitializer initializer; + try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertex.getVertexId()); + initializer = createInitializer(input, context); + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); + } InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, vertex, entityStateTracker, appContext); @@ -127,7 +135,7 @@ public class RootInputInitializerManager { initializerMap.put(input.getName(), initializerWrapper); ListenableFuture<List<Event>> future = executor - .submit(new InputInitializerCallable(initializerWrapper, dagUgi)); + .submit(new InputInitializerCallable(initializerWrapper, dagUgi, appContext)); Futures.addCallback(future, createInputInitializerCallback(initializerWrapper)); } } @@ -229,10 +237,13 @@ public class RootInputInitializerManager { private final InitializerWrapper initializerWrapper; private final UserGroupInformation ugi; + private final AppContext appContext; - public InputInitializerCallable(InitializerWrapper initializer, UserGroupInformation ugi) { + public InputInitializerCallable(InitializerWrapper initializer, UserGroupInformation ugi, + AppContext appContext) { this.initializerWrapper = initializer; this.ugi = ugi; + this.appContext = appContext; } @Override @@ -243,7 +254,13 @@ public class RootInputInitializerManager { LOG.info( "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() + " on vertex " + initializerWrapper.getVertexLogIdentifier()); - return initializerWrapper.getInitializer().initialize(); + try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), + initializerWrapper.vertexId); + return initializerWrapper.getInitializer().initialize(); + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); + } } }); return events; http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 698f9d6..3d47450 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.state.OnStateChangedCallback; import org.apache.tez.state.StateMachineTez; @@ -1022,19 +1023,24 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, if (!groupInfo.outputs.isEmpty()) { groupInfo.commitStarted = true; final Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); - for (final String outputName : groupInfo.outputs) { - final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true); - CommitCallback groupCommitCallback = new CommitCallback(outputKey); - CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback) { - @Override - public Void call() throws Exception { - OutputCommitter committer = v.getOutputCommitters().get(outputName); - LOG.info("Committing output: " + outputKey); - commitOutput(committer); - return null; - } - }; - commitEvents.put(outputKey, groupCommitCallableEvent); + try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), v.getVertexId()); + for (final String outputName : groupInfo.outputs) { + final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true); + CommitCallback groupCommitCallback = new CommitCallback(outputKey); + CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback) { + @Override + public Void call() throws Exception { + OutputCommitter committer = v.getOutputCommitters().get(outputName); + LOG.info("Committing output: " + outputKey); + commitOutput(committer); + return null; + } + }; + commitEvents.put(outputKey, groupCommitCallableEvent); + } + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); } } } @@ -1061,23 +1067,28 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("No exclusive output committers for vertex: " + vertex.getLogIdentifier()); continue; } - for (final Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) { - if (vertex.getState() != VertexState.SUCCEEDED) { - throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() + - " not in SUCCEEDED state. State= " + vertex.getState()); - } - OutputKey outputKey = new OutputKey(entry.getKey(), vertex.getName(), false); - CommitCallback commitCallback = new CommitCallback(outputKey); - CallableEvent commitCallableEvent = new CallableEvent(commitCallback) { - @Override - public Void call() throws Exception { - LOG.info("Committing output: " + entry.getKey() + " for vertex: " - + vertex.getLogIdentifier() + ", outputName: " + entry.getKey()); - commitOutput(entry.getValue()); - return null; + try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertex.getVertexId()); + for (final Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) { + if (vertex.getState() != VertexState.SUCCEEDED) { + throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() + + " not in SUCCEEDED state. State= " + vertex.getState()); } - }; - commitEvents.put(outputKey, commitCallableEvent); + OutputKey outputKey = new OutputKey(entry.getKey(), vertex.getName(), false); + CommitCallback commitCallback = new CommitCallback(outputKey); + CallableEvent commitCallableEvent = new CallableEvent(commitCallback) { + @Override + public Void call() throws Exception { + LOG.info("Committing output: " + entry.getKey() + " for vertex: " + + vertex.getLogIdentifier() + ", outputName: " + entry.getKey()); + commitOutput(entry.getValue()); + return null; + } + }; + commitEvents.put(outputKey, commitCallableEvent); + } + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 4e82560..93baa0a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; @@ -1967,15 +1968,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl CallableEvent commitCallableEvent = new CallableEvent(commitCallback) { @Override public Void call() throws Exception { - vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { + try { + TezUtilsInternal.setHadoopCallerContext(vertex.appContext.getHadoopShim(), + vertex.vertexId); + vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { LOG.info("Invoking committer commit for output=" + outputName + ", vertexId=" + vertex.logIdentifier); committer.commitOutput(); - return null; - } - }); + return null; + } + }); + } finally { + vertex.appContext.getHadoopShim().clearHadoopCallerContext(); + } return null; } }; @@ -2208,13 +2215,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.debug("Invoking committer init for output=" + outputName + ", vertex=" + logIdentifier); } - outputCommitter.initialize(); - outputCommitters.put(outputName, outputCommitter); - if (LOG.isDebugEnabled()) { - LOG.debug("Invoking committer setup for output=" + outputName - + ", vertex=" + logIdentifier); + + try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertexId); + outputCommitter.initialize(); + outputCommitters.put(outputName, outputCommitter); + if (LOG.isDebugEnabled()) { + LOG.debug("Invoking committer setup for output=" + outputName + + ", vertex=" + logIdentifier); + } + outputCommitter.setupOutput(); + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); } - outputCommitter.setupOutput(); + return null; } }); @@ -3066,6 +3080,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (outputCommitters != null) { LOG.info("Invoking committer abort for vertex, vertexId=" + logIdentifier); try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertexId); dagUgi.doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() { @@ -3084,6 +3099,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl }); } catch (Exception e) { throw new TezUncheckedException("Unknown error while attempting VertexCommitter(s) abort", e); + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); } } if (finishTime == 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 9267f00..c4ab6e3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -45,6 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.tez.common.TezUtils; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -356,7 +357,8 @@ public class LocalContainerLauncher extends ContainerLauncher { TezChild tezChild = TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials, - memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false); + memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false, + context.getHadoopShim()); return tezChild; } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index fed4f3d..3eeddf5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.AppContext; @@ -133,6 +134,8 @@ public class RecoveryService extends AbstractService { eventHandlingThread = new Thread(new Runnable() { @Override public void run() { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), + appContext.getApplicationID()); DAGHistoryEvent event; while (!stopped.get() && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 9c688b6..d8b620a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -20,7 +20,6 @@ package org.apache.tez.dag.app; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -70,6 +69,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.*; @@ -252,6 +252,8 @@ public class TestRecoveryParser { AppContext appContext = mock(AppContext.class); when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); when(appContext.getClock()).thenReturn(new SystemClock()); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); // write data in attempt_1 @@ -430,6 +432,8 @@ public class TestRecoveryParser { when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); when(appContext.getClock()).thenReturn(new SystemClock()); when(mockDAGImpl.getID()).thenReturn(dagID); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); RecoveryService rService = new RecoveryService(appContext); Configuration conf = new Configuration(); @@ -465,6 +469,8 @@ public class TestRecoveryParser { when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); when(appContext.getClock()).thenReturn(new SystemClock()); when(mockDAGImpl.getID()).thenReturn(dagID); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); RecoveryService rService = new RecoveryService(appContext); Configuration conf = new Configuration(); @@ -497,6 +503,8 @@ public class TestRecoveryParser { when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); when(appContext.getClock()).thenReturn(new SystemClock()); when(mockDAGImpl.getID()).thenReturn(dagID); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); RecoveryService rService = new RecoveryService(appContext); Configuration conf = new Configuration(); @@ -543,6 +551,8 @@ public class TestRecoveryParser { when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); when(appContext.getClock()).thenReturn(new SystemClock()); when(mockDAGImpl.getID()).thenReturn(dagID); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); // MockRecoveryService will skip the non-summary event MockRecoveryService rService = new MockRecoveryService(appContext); @@ -617,6 +627,8 @@ public class TestRecoveryParser { when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); when(appContext.getClock()).thenReturn(new SystemClock()); when(mockDAGImpl.getID()).thenReturn(dagID); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); RecoveryService rService = new RecoveryService(appContext); Configuration conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index f0b89c8..28670ff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -103,6 +104,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.events.VertexManagerEvent; @@ -302,6 +304,7 @@ public class TestCommit { dispatcher = new DrainDispatcher(); fsTokens = new Credentials(); appContext = mock(AppContext.class); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build()); execService = MoreExecutors.listeningDecorator(rawExecutor); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 31b4f76..1809230 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.URL; @@ -40,6 +41,8 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; +import org.apache.tez.hadoop.shim.HadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -186,6 +189,7 @@ public class TestDAGImpl { private HistoryEventHandler historyEventHandler; private TaskAttemptEventDispatcher taskAttemptEventDispatcher; private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); + private HadoopShim defaultShim = new DefaultHadoopShim(); static { Limits.reset(); @@ -845,6 +849,8 @@ public class TestDAGImpl { appContext = mock(AppContext.class); execService = mock(ListeningExecutorService.class); final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class); + when(appContext.getHadoopShim()).thenReturn(defaultShim); + when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId()); Mockito.doAnswer(new Answer() { public ListenableFuture<Void> answer(InvocationOnMock invocation) { @@ -864,6 +870,7 @@ public class TestDAGImpl { doReturn(dagId).when(appContext).getCurrentDAGID(); doReturn(historyEventHandler).when(appContext).getHistoryHandler(); doReturn(aclManager).when(appContext).getAMACLManager(); + doReturn(defaultShim).when(appContext).getHadoopShim(); dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, appContext); @@ -873,6 +880,8 @@ public class TestDAGImpl { mrrAppContext = mock(AppContext.class); doReturn(aclManager).when(mrrAppContext).getAMACLManager(); doReturn(execService).when(mrrAppContext).getExecService(); + doReturn(defaultShim).when(mrrAppContext).getHadoopShim(); + mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2); mrrDagPlan = createTestMRRDAGPlan(); mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan, @@ -889,6 +898,8 @@ public class TestDAGImpl { groupAppContext = mock(AppContext.class); doReturn(aclManager).when(groupAppContext).getAMACLManager(); doReturn(execService).when(groupAppContext).getExecService(); + doReturn(defaultShim).when(groupAppContext).getHadoopShim(); + groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3); groupDagPlan = createGroupDAGPlan(); groupDag = new DAGImpl(groupDagId, conf, groupDagPlan, @@ -961,6 +972,7 @@ public class TestDAGImpl { dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation, useLegacy); dagWithCustomEdgeAppContext = mock(AppContext.class); doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager(); + when(dagWithCustomEdgeAppContext.getHadoopShim()).thenReturn(defaultShim); dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 3eed717..3a602bc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -119,6 +119,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; @@ -319,6 +320,8 @@ public class TestDAGRecovery { execService = mock(ListeningExecutorService.class); thh = mock(TaskHeartbeatHandler.class); final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId()); Mockito.doAnswer(new Answer() { public ListenableFuture<Void> answer(InvocationOnMock invocation) { http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 1aaf588..986f64d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; @@ -2353,6 +2354,7 @@ public class TestVertexImpl { } dispatcher = new DrainDispatcher(); appContext = mock(AppContext.class); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); thh = mock(TaskHeartbeatHandler.class); historyEventHandler = mock(HistoryEventHandler.class); TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java index 99094c6..d828d6b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java @@ -36,6 +36,7 @@ import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.junit.Before; import org.junit.Test; @@ -62,6 +63,9 @@ public class TestRecoveryService { AppContext appContext = mock(AppContext.class); when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); when(appContext.getClock()).thenReturn(new SystemClock()); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + when(appContext.getApplicationID()).thenReturn(appId); MockRecoveryService recoveryService = new MockRecoveryService(appContext); conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); @@ -85,6 +89,8 @@ public class TestRecoveryService { AppContext appContext = mock(AppContext.class); when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); when(appContext.getClock()).thenReturn(new SystemClock()); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); MockRecoveryService recoveryService = new MockRecoveryService(appContext); conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); @@ -121,6 +127,8 @@ public class TestRecoveryService { AppContext appContext = mock(AppContext.class); when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); when(appContext.getClock()).thenReturn(new SystemClock()); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); MockRecoveryService recoveryService = new MockRecoveryService(appContext); conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); @@ -147,6 +155,8 @@ public class TestRecoveryService { AppContext appContext = mock(AppContext.class); when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); when(appContext.getClock()).thenReturn(new SystemClock()); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); MockRecoveryService recoveryService = new MockRecoveryService(appContext); conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml index a414448..9a9a790 100644 --- a/tez-dist/pom.xml +++ b/tez-dist/pom.xml @@ -67,6 +67,11 @@ <artifactId>tez-yarn-timeline-history</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>hadoop-shim-2.4</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </profile> <profile> @@ -80,6 +85,40 @@ <artifactId>tez-yarn-timeline-history-with-acls</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>hadoop-shim-2.6</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>hadoop28</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history-with-acls</artifactId> + <version>${project.version}</version> + </dependency> + <!-- + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history-with-fs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-cache-plugin</artifactId> + <version>${project.version}</version> + </dependency> --> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>hadoop-shim-2.8</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index 6960559..a3c0224 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -30,6 +30,10 @@ import com.google.common.collect.Sets; import org.apache.commons.cli.Options; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.client.CallerContext; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -64,6 +68,7 @@ public abstract class TezExampleBase extends Configured implements Tool { private boolean isLocalMode = false; private boolean isCountersLog = false; private boolean generateSplitInClient = false; + private HadoopShim hadoopShim; protected boolean isCountersLog() { return isCountersLog; @@ -103,6 +108,8 @@ public abstract class TezExampleBase extends Configured implements Tool { if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) { generateSplitInClient = true; } + hadoopShim = new HadoopShimsLoader(conf).getHadoopShim(); + return _execute(otherArgs, null, null); } @@ -122,6 +129,7 @@ public abstract class TezExampleBase extends Configured implements Tool { public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws Exception { setConf(conf); + hadoopShim = new HadoopShimsLoader(conf).getHadoopShim(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(), args); if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) { isLocalMode = true; @@ -158,7 +166,13 @@ public abstract class TezExampleBase extends Configured implements Tool { CallerContext callerContext = CallerContext.create("TezExamples", "Tez Example DAG: " + dag.getName()); ApplicationId appId = tezClientInternal.getAppMasterApplicationId(); + if (hadoopShim == null) { + Configuration conf = (getConf() == null ? new Configuration(false) : getConf()); + hadoopShim = new HadoopShimsLoader(conf).getHadoopShim(); + } + if (appId != null) { + TezUtilsInternal.setHadoopCallerContext(hadoopShim, appId); callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication"); } dag.setCallerContext(callerContext); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index ad05af9..07dcc9b 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -58,6 +58,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.task.TaskReporter; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -305,7 +306,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun request.getContainerIdString(), request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs, envMap, objectRegistry, pid, - executionContext, credentials, memoryAvailable, request.getUser(), null, false); + executionContext, credentials, memoryAvailable, request.getUser(), null, false, + new DefaultHadoopShim()); ContainerExecutionResult result = tezChild.run(); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + sw.stop().elapsedMillis()); @@ -449,7 +451,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memoryAvailable, false); + executionContext, memoryAvailable, false, new DefaultHadoopShim()); boolean shouldDie; try { http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 0129a8b..0c1dc66 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -47,6 +47,7 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DataSinkDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.MRConfig; @@ -207,7 +208,7 @@ public class TestMROutput { null, new HashMap<String, String>(), HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), true); + Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); return task; } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 71aa87c..133ef9e 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -232,7 +233,7 @@ public class MapUtils { serviceConsumerMetadata, envMap, HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), true); + Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); return task; } } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index db78b6e..382bc0e 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -224,7 +225,7 @@ public class TestReduceProcessor { serviceConsumerMetadata, serviceProviderEnvMap, HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), true); + Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); List<Event> destEvents = new LinkedList<Event>(); destEvents.add(dme); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/pom.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml index 6d40b6a..9c1cf2f 100644 --- a/tez-runtime-internals/pom.xml +++ b/tez-runtime-internals/pom.xml @@ -34,6 +34,10 @@ <artifactId>tez-common</artifactId> </dependency> <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>hadoop-shim</artifactId> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-annotations</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 1a59310..df09fdb 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; import org.slf4j.Logger; @@ -151,13 +152,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private volatile ObjectRegistry objectRegistry; private final ExecutionContext ExecutionContext; private final long memAvailable; + private final HadoopShim hadoopShim; public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry, String pid, ExecutionContext ExecutionContext, long memAvailable, - boolean updateSysCounters) throws IOException { + boolean updateSysCounters, HadoopShim hadoopShim) throws IOException { // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method. // TODO Remove jobToken from here post TEZ-421 super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters); @@ -200,6 +202,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.objectRegistry = objectRegistry; this.ExecutionContext = ExecutionContext; this.memAvailable = memAvailable; + this.hadoopShim = hadoopShim; } /** @@ -1017,4 +1020,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { return this.outputsMap; } + @Private + public HadoopShim getHadoopShim() { + return hadoopShim; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index ab77635..40d4051 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +35,6 @@ import org.slf4j.LoggerFactory; */ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.TaskRunner2CallableResult> { - - private static final Logger LOG = LoggerFactory.getLogger(TaskRunner2Callable.class); private final LogicalIOProcessorRuntimeTask task; @@ -64,13 +63,15 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas return new TaskRunner2CallableResult(null); } LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); + TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID()); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID()); task.run(); } else { - LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID()); + LOG.info("Stopped before running the processor taskAttemptId={}", + task.getTaskAttemptID()); return new TaskRunner2CallableResult(null); } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 784065a..07810d9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -64,6 +64,9 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.RelocalizationUtils; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.hadoop.shim.HadoopShimProvider; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -118,6 +121,7 @@ public class TezChild { private TaskReporterInterface taskReporter; private int taskCount = 0; private TezVertexID lastVertexID; + private final HadoopShim hadoopShim; public TezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs, @@ -125,7 +129,7 @@ public class TezChild { ObjectRegistryImpl objectRegistry, String pid, ExecutionContext executionContext, Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical, - boolean updateSysCounters) throws IOException, InterruptedException { + boolean updateSysCounters, HadoopShim hadoopShim) throws IOException, InterruptedException { this.defaultConf = conf; this.containerIdString = containerIdentifier; this.appAttemptNumber = appAttemptNumber; @@ -138,6 +142,7 @@ public class TezChild { this.memAvailable = memAvailable; this.user = user; this.updateSysCounters = updateSysCounters; + this.hadoopShim = hadoopShim; getTaskMaxSleepTime = defaultConf.getInt( TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX, @@ -239,6 +244,8 @@ public class TezChild { containerTask.getTaskSpec().getTaskAttemptID().toString()); System.out.println(timeStamp + " Starting to run new task attempt: " + containerTask.getTaskSpec().getTaskAttemptID().toString()); + TezUtilsInternal.setHadoopCallerContext(hadoopShim, + containerTask.getTaskSpec().getTaskAttemptID()); TezUtilsInternal.updateLoggers(loggerAddend); FileSystem.clearStatistics(); @@ -250,7 +257,8 @@ public class TezChild { TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI, localDirs, containerTask.getTaskSpec(), appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, - executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters); + executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, + hadoopShim); boolean shouldDie; try { TaskRunner2Result result = taskRunner.run(); @@ -443,7 +451,7 @@ public class TezChild { String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory, Map<String, String> serviceProviderEnvMap, @Nullable String pid, ExecutionContext executionContext, Credentials credentials, long memAvailable, String user, - TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters) + TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters, HadoopShim hadoopShim) throws IOException, InterruptedException, TezException { // Pull in configuration specified for the session. @@ -456,7 +464,8 @@ public class TezChild { return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid, - executionContext, credentials, memAvailable, user, tezUmbilical, updateSysCounters); + executionContext, credentials, memAvailable, user, tezUmbilical, updateSysCounters, + hadoopShim); } public static void main(String[] args) throws IOException, InterruptedException, TezException { @@ -488,11 +497,14 @@ public class TezChild { TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList()); UserGroupInformation.setConfiguration(defaultConf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + + HadoopShim hadoopShim = new HadoopShimsLoader(defaultConf).getHadoopShim(); + TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()), System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())), credentials, Runtime.getRuntime().maxMemory(), System - .getenv(ApplicationConstants.Environment.USER.toString()), null, true); + .getenv(ApplicationConstants.Environment.USER.toString()), null, true, hadoopShim); tezChild.run(); } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 4fdc17d..219cc2f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSError; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; @@ -91,6 +92,8 @@ public class TezTaskRunner2 { private volatile long taskKillStartTime = 0; + private final HadoopShim hadoopShim; + // The callable which is being used to execute the task. private volatile TaskRunner2Callable taskRunnerCallable; @@ -102,15 +105,16 @@ public class TezTaskRunner2 { TaskReporterInterface taskReporter, ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid, ExecutionContext executionContext, long memAvailable, - boolean updateSysCounters) throws + boolean updateSysCounters, HadoopShim hadoopShim) throws IOException { this.ugi = ugi; this.taskReporter = taskReporter; this.executor = executor; this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler(); + this.hadoopShim = hadoopShim; this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, - objectRegistry, pid, executionContext, memAvailable, updateSysCounters); + objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index 12fec7e..00e830f 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -38,6 +38,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalOutput; @@ -82,7 +83,8 @@ public class TestLogicalIOProcessorRuntimeTask { LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, - "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true); + "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, + new DefaultHadoopShim()); try { lio1.initialize(); @@ -107,13 +109,12 @@ public class TestLogicalIOProcessorRuntimeTask { cleanupAndTest(lio1); } - - - // local mode + // local mode tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, - "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true); + "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, + new DefaultHadoopShim()); try { lio2.initialize(); lio2.run(); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java index f0c1e66..d16b880 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java @@ -32,6 +32,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.InputReadyTracker; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; @@ -63,7 +64,7 @@ public class TestProcessorContext { LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask( mockSpec, 1, new Configuration(), new String[]{"/"}, - tezUmbilical, null, null, null, null, "", null, 1024, false); + tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim()); LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask); Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap(); Map<String, String> auxServiceEnv = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index 989753b..7843754 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -64,6 +64,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; @@ -634,7 +635,7 @@ public class TestTaskExecution2 { new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String>create(), taskReporter, executor, null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), updateSysCounters); + Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim()); } return taskRunner; @@ -662,7 +663,7 @@ public class TestTaskExecution2 { boolean updateSysCounters) throws IOException { super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memAvailable, updateSysCounters); + executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim()); } http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 32a3619..884f0e6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -41,6 +41,7 @@ import java.util.Map; import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -220,8 +221,8 @@ public class TestOnFileUnorderedKVOutput { when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); task = new LogicalIOProcessorRuntimeTask( mockSpec, appAttemptNumber, - new Configuration(), new String[]{"/"}, - tezUmbilical, null, null, null, null, "", null, 1024, false); + new Configuration(), new String[]{"/"}, + tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim()); LogicalIOProcessorRuntimeTask runtimeTask = spy(task); http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml index 0bac721..72538bf 100644 --- a/tez-tests/pom.xml +++ b/tez-tests/pom.xml @@ -32,7 +32,6 @@ <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-common</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.tez</groupId> http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index 3188c6e..6ed6d2d 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -53,6 +53,7 @@ import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSourceDescriptor; @@ -66,6 +67,8 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; @@ -373,8 +376,8 @@ public class TestOrderedWordCount extends Configured implements Tool { } UserGroupInformation.setConfiguration(conf); - TezConfiguration tezConf = new TezConfiguration(conf); + HadoopShim hadoopShim = new HadoopShimsLoader(tezConf).getHadoopShim(); TestOrderedWordCount instance = new TestOrderedWordCount(); FileSystem fs = FileSystem.get(conf); @@ -407,6 +410,9 @@ public class TestOrderedWordCount extends Configured implements Tool { TezClient tezSession = TezClient.create("OrderedWordCountSession", tezConf, null, instance.credentials); tezSession.start(); + if (tezSession.getAppMasterApplicationId() != null) { + TezUtilsInternal.setHadoopCallerContext(hadoopShim, tezSession.getAppMasterApplicationId()); + } DAGStatus dagStatus = null; DAGClient dagClient = null;
