Repository: tez
Updated Branches:
  refs/heads/master 685fa742f -> 2c7abeb15


TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ). (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2c7abeb1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2c7abeb1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2c7abeb1

Branch: refs/heads/master
Commit: 2c7abeb155f6e832b3da407d4d537389778b3a02
Parents: 685fa74
Author: Hitesh Shah <[email protected]>
Authored: Tue Dec 15 18:38:45 2015 -0800
Committer: Hitesh Shah <[email protected]>
Committed: Tue Dec 15 18:38:45 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 tez-common/pom.xml                              |  4 ++
 .../org/apache/tez/common/TezUtilsInternal.java | 26 ++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 32 +++++----
 .../app/dag/RootInputInitializerManager.java    | 25 +++++--
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 69 ++++++++++++--------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 41 ++++++++----
 .../app/launcher/LocalContainerLauncher.java    |  4 +-
 .../dag/history/recovery/RecoveryService.java   |  3 +
 .../apache/tez/dag/app/TestRecoveryParser.java  | 14 +++-
 .../apache/tez/dag/app/dag/impl/TestCommit.java |  3 +
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 12 ++++
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |  3 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  2 +
 .../history/recovery/TestRecoveryService.java   | 10 +++
 tez-dist/pom.xml                                | 39 +++++++++++
 .../org/apache/tez/examples/TezExampleBase.java | 14 ++++
 .../tez/service/impl/ContainerRunnerImpl.java   |  6 +-
 .../tez/mapreduce/output/TestMROutput.java      |  3 +-
 .../tez/mapreduce/processor/MapUtils.java       |  3 +-
 .../processor/reduce/TestReduceProcessor.java   |  3 +-
 tez-runtime-internals/pom.xml                   |  4 ++
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  9 ++-
 .../tez/runtime/task/TaskRunner2Callable.java   |  7 +-
 .../org/apache/tez/runtime/task/TezChild.java   | 22 +++++--
 .../apache/tez/runtime/task/TezTaskRunner2.java |  8 ++-
 .../TestLogicalIOProcessorRuntimeTask.java      | 11 ++--
 .../runtime/api/impl/TestProcessorContext.java  |  3 +-
 .../tez/runtime/task/TestTaskExecution2.java    |  5 +-
 .../output/TestOnFileUnorderedKVOutput.java     |  5 +-
 tez-tests/pom.xml                               |  1 -
 .../examples/TestOrderedWordCount.java          |  8 ++-
 32 files changed, 314 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a9d1893..8588923 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez 
session, if running a MapReduce job.
 
 ALL CHANGES:
+  TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ).
   TEZ-2976. Recovery fails when InputDescriptor is changed during input 
initialization.
   TEZ-2997. Tez UI: Support searches by CallerContext ID for DAGs
   TEZ-2996. TestAnalyzer fails in trunk after recovery redesign

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-common/pom.xml
----------------------------------------------------------------------
diff --git a/tez-common/pom.xml b/tez-common/pom.xml
index a521d54..e133c9c 100644
--- a/tez-common/pom.xml
+++ b/tez-common/pom.xml
@@ -58,6 +58,10 @@
       <artifactId>tez-api</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>hadoop-shim</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git 
a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java 
b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index c2a50f5..a99bab4 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -38,8 +38,13 @@ import com.google.protobuf.TextFormat;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.log4j.Appender;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos;
@@ -327,4 +332,25 @@ public class TezUtilsInternal {
         return TaskAttemptEndReason.OTHER;
     }
   }
+
+  @Private
+  public static void setHadoopCallerContext(HadoopShim hadoopShim, 
TezTaskAttemptID attemptID) {
+    hadoopShim.setHadoopCallerContext("tez_ta:" + attemptID.toString());
+  }
+
+  @Private
+  public static void setHadoopCallerContext(HadoopShim hadoopShim, TezVertexID 
vertexID) {
+    hadoopShim.setHadoopCallerContext("tez_v:" + vertexID.toString());
+  }
+
+  @Private
+  public static void setHadoopCallerContext(HadoopShim hadoopShim, TezDAGID 
dagID) {
+    hadoopShim.setHadoopCallerContext("tez_dag:" + dagID.toString());
+  }
+
+  @Private
+  public static void setHadoopCallerContext(HadoopShim hadoopShim, 
ApplicationId appID) {
+    hadoopShim.setHadoopCallerContext("tez_app:" + appID.toString());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 5cbdab4..c0b86a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1408,7 +1408,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private List<URL> processAdditionalResources(Map<String, LocalResource> 
lrDiff)
+  private List<URL> processAdditionalResources(TezDAGID dagId, Map<String, 
LocalResource> lrDiff)
       throws TezException {
     if (lrDiff == null || lrDiff.isEmpty()) {
       return Collections.emptyList();
@@ -1416,6 +1416,7 @@ public class DAGAppMaster extends AbstractService {
       LOG.info("Localizing additional local resources for AM : " + lrDiff);
       List<URL> downloadedURLs;
       try {
+        TezUtilsInternal.setHadoopCallerContext(hadoopShim, dagId);
         downloadedURLs = RelocalizationUtils.processAdditionalResources(
             Maps.transformValues(lrDiff, new Function<LocalResource, URI>() {
 
@@ -1426,6 +1427,8 @@ public class DAGAppMaster extends AbstractService {
             }), getConfig(), workingDirectory);
       } catch (IOException e) {
         throw new TezException(e);
+      } finally {
+        hadoopShim.clearHadoopCallerContext();
       }
       LOG.info("Done downloading additional AM resources");
       return downloadedURLs;
@@ -1850,14 +1853,19 @@ public class DAGAppMaster extends AbstractService {
 
   private DAGRecoveryData recoverDAG() throws IOException, TezException {
     if (recoveryEnabled) {
-      if (this.appAttemptID.getAttemptId() > 1) {
-        LOG.info("Recovering data from previous attempts"
-            + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
-        this.state = DAGAppMasterState.RECOVERING;
-        RecoveryParser recoveryParser = new RecoveryParser(
-            this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
-        DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
-        return recoveredDAGData;
+      try {
+        TezUtilsInternal.setHadoopCallerContext(hadoopShim, this.getAppID());
+        if (this.appAttemptID.getAttemptId() > 1) {
+          LOG.info("Recovering data from previous attempts"
+              + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
+          this.state = DAGAppMasterState.RECOVERING;
+          RecoveryParser recoveryParser = new RecoveryParser(
+              this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+          DAGRecoveryData recoveredDAGData = 
recoveryParser.parseRecoveryData();
+          return recoveredDAGData;
+        }
+      } finally {
+        hadoopShim.clearHadoopCallerContext();
       }
     }
     return null;
@@ -1906,7 +1914,9 @@ public class DAGAppMaster extends AbstractService {
 
     if (recoveredDAGData != null) {
       if (recoveredDAGData.cumulativeAdditionalResources != null) {
-        recoveredDAGData.additionalUrlsForClasspath = 
processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
+        recoveredDAGData.additionalUrlsForClasspath = 
processAdditionalResources(
+            recoveredDAGData.recoveredDagID,
+            recoveredDAGData.cumulativeAdditionalResources);
         amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
         
cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
       }
@@ -2397,7 +2407,7 @@ public class DAGAppMaster extends AbstractService {
       additionalUrlsForClasspath = dag.getDagUGI().doAs(new 
PrivilegedExceptionAction<List<URL>>() {
         @Override
         public List<URL> run() throws Exception {
-          return processAdditionalResources(additionalAmResources);
+          return processAdditionalResources(currentDAG.getID(), 
additionalAmResources);
         }
       });
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 13128f8..57a7172 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
@@ -112,7 +113,14 @@ public class RootInputInitializerManager {
 
       InputInitializerContext context =
           new TezRootInputInitializerContextImpl(input, vertex, appContext, 
this);
-      InputInitializer initializer = createInitializer(input, context);
+
+      InputInitializer initializer;
+      try {
+        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), 
vertex.getVertexId());
+        initializer = createInitializer(input, context);
+      } finally {
+        appContext.getHadoopShim().clearHadoopCallerContext();
+      }
 
       InitializerWrapper initializerWrapper =
           new InitializerWrapper(input, initializer, context, vertex, 
entityStateTracker, appContext);
@@ -127,7 +135,7 @@ public class RootInputInitializerManager {
 
       initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
-          .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
+          .submit(new InputInitializerCallable(initializerWrapper, dagUgi, 
appContext));
       Futures.addCallback(future, 
createInputInitializerCallback(initializerWrapper));
     }
   }
@@ -229,10 +237,13 @@ public class RootInputInitializerManager {
 
     private final InitializerWrapper initializerWrapper;
     private final UserGroupInformation ugi;
+    private final AppContext appContext;
 
-    public InputInitializerCallable(InitializerWrapper initializer, 
UserGroupInformation ugi) {
+    public InputInitializerCallable(InitializerWrapper initializer, 
UserGroupInformation ugi,
+                                    AppContext appContext) {
       this.initializerWrapper = initializer;
       this.ugi = ugi;
+      this.appContext = appContext;
     }
 
     @Override
@@ -243,7 +254,13 @@ public class RootInputInitializerManager {
           LOG.info(
               "Starting InputInitializer for Input: " + 
initializerWrapper.getInput().getName() +
                   " on vertex " + initializerWrapper.getVertexLogIdentifier());
-          return initializerWrapper.getInitializer().initialize();
+          try {
+            TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
+                initializerWrapper.vertexId);
+            return initializerWrapper.getInitializer().initialize();
+          } finally {
+            appContext.getHadoopShim().clearHadoopCallerContext();
+          }
         }
       });
       return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 698f9d6..3d47450 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
@@ -1022,19 +1023,24 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       if (!groupInfo.outputs.isEmpty()) {
         groupInfo.commitStarted = true;
         final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
-        for (final String outputName : groupInfo.outputs) {
-          final OutputKey outputKey = new OutputKey(outputName, 
groupInfo.groupName, true);
-          CommitCallback groupCommitCallback = new CommitCallback(outputKey);
-          CallableEvent groupCommitCallableEvent = new 
CallableEvent(groupCommitCallback) {
-            @Override
-            public Void call() throws Exception {
-              OutputCommitter committer = 
v.getOutputCommitters().get(outputName);
-              LOG.info("Committing output: " + outputKey);
-              commitOutput(committer);
-              return null;
-            }
-          };
-          commitEvents.put(outputKey, groupCommitCallableEvent);
+        try {
+          TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), 
v.getVertexId());
+          for (final String outputName : groupInfo.outputs) {
+            final OutputKey outputKey = new OutputKey(outputName, 
groupInfo.groupName, true);
+            CommitCallback groupCommitCallback = new CommitCallback(outputKey);
+            CallableEvent groupCommitCallableEvent = new 
CallableEvent(groupCommitCallback) {
+              @Override
+              public Void call() throws Exception {
+                OutputCommitter committer = 
v.getOutputCommitters().get(outputName);
+                LOG.info("Committing output: " + outputKey);
+                commitOutput(committer);
+                return null;
+              }
+            };
+            commitEvents.put(outputKey, groupCommitCallableEvent);
+          }
+        } finally {
+          appContext.getHadoopShim().clearHadoopCallerContext();
         }
       }
     }
@@ -1061,23 +1067,28 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
         LOG.info("No exclusive output committers for vertex: " + 
vertex.getLogIdentifier());
         continue;
       }
-      for (final Map.Entry<String, OutputCommitter> entry : 
outputCommitters.entrySet()) {
-        if (vertex.getState() != VertexState.SUCCEEDED) {
-          throw new TezUncheckedException("Vertex: " + 
vertex.getLogIdentifier() +
-              " not in SUCCEEDED state. State= " + vertex.getState());
-        }
-        OutputKey outputKey = new OutputKey(entry.getKey(), vertex.getName(), 
false);
-        CommitCallback commitCallback = new CommitCallback(outputKey);
-        CallableEvent commitCallableEvent = new CallableEvent(commitCallback) {
-          @Override
-          public Void call() throws Exception {
-            LOG.info("Committing output: " + entry.getKey() + " for vertex: "
-                + vertex.getLogIdentifier() + ", outputName: " + 
entry.getKey());
-            commitOutput(entry.getValue());
-            return null;
+      try {
+        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), 
vertex.getVertexId());
+        for (final Map.Entry<String, OutputCommitter> entry : 
outputCommitters.entrySet()) {
+          if (vertex.getState() != VertexState.SUCCEEDED) {
+            throw new TezUncheckedException("Vertex: " + 
vertex.getLogIdentifier() +
+                " not in SUCCEEDED state. State= " + vertex.getState());
           }
-        };
-        commitEvents.put(outputKey, commitCallableEvent);
+          OutputKey outputKey = new OutputKey(entry.getKey(), 
vertex.getName(), false);
+          CommitCallback commitCallback = new CommitCallback(outputKey);
+          CallableEvent commitCallableEvent = new 
CallableEvent(commitCallback) {
+            @Override
+            public Void call() throws Exception {
+              LOG.info("Committing output: " + entry.getKey() + " for vertex: "
+                  + vertex.getLogIdentifier() + ", outputName: " + 
entry.getKey());
+              commitOutput(entry.getValue());
+              return null;
+            }
+          };
+          commitEvents.put(outputKey, commitCallableEvent);
+        }
+      } finally {
+        appContext.getHadoopShim().clearHadoopCallerContext();
       }
     }
     

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4e82560..93baa0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -1967,15 +1968,21 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         CallableEvent commitCallableEvent = new CallableEvent(commitCallback) {
           @Override
           public Void call() throws Exception {
-            vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
+            try {
+              
TezUtilsInternal.setHadoopCallerContext(vertex.appContext.getHadoopShim(),
+                  vertex.vertexId);
+              vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
                   LOG.info("Invoking committer commit for output=" + outputName
                       + ", vertexId=" + vertex.logIdentifier);
                   committer.commitOutput();
-                return null;
-              }
-            });
+                  return null;
+                }
+              });
+            } finally {
+              vertex.appContext.getHadoopShim().clearHadoopCallerContext();
+            }
             return null;
           }
         };
@@ -2208,13 +2215,20 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
               LOG.debug("Invoking committer init for output=" + outputName
                   + ", vertex=" + logIdentifier);
             }
-            outputCommitter.initialize();
-            outputCommitters.put(outputName, outputCommitter);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Invoking committer setup for output=" + outputName
-                  + ", vertex=" + logIdentifier);
+
+            try {
+              
TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertexId);
+              outputCommitter.initialize();
+              outputCommitters.put(outputName, outputCommitter);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Invoking committer setup for output=" + outputName
+                    + ", vertex=" + logIdentifier);
+              }
+              outputCommitter.setupOutput();
+            } finally {
+              appContext.getHadoopShim().clearHadoopCallerContext();
             }
-            outputCommitter.setupOutput();
+
             return null;
           }
         });
@@ -3066,6 +3080,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
     if (outputCommitters != null) {
       LOG.info("Invoking committer abort for vertex, vertexId=" + 
logIdentifier);
       try {
+        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), 
vertexId);
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() {
@@ -3084,6 +3099,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         });
       } catch (Exception e) {
         throw new TezUncheckedException("Unknown error while attempting 
VertexCommitter(s) abort", e);
+      } finally {
+        appContext.getHadoopShim().clearHadoopCallerContext();
       }
     }
     if (finishTime == 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9267f00..c4ab6e3 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -45,6 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -356,7 +357,8 @@ public class LocalContainerLauncher extends 
ContainerLauncher {
     TezChild tezChild =
         TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), 
tokenIdentifier,
             attemptNumber, localDirs, workingDirectory, containerEnv, "", 
executionContext, credentials,
-            memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false);
+            memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false,
+            context.getHadoopShim());
     return tezChild;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index fed4f3d..3eeddf5 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.AppContext;
@@ -133,6 +134,8 @@ public class RecoveryService extends AbstractService {
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
+        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
+            appContext.getApplicationID());
         DAGHistoryEvent event;
         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           drained = eventQueue.isEmpty();

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 9c688b6..d8b620a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -20,7 +20,6 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -70,6 +69,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.*;
@@ -252,6 +252,8 @@ public class TestRecoveryParser {
     AppContext appContext = mock(AppContext.class);
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(recoveryPath+"/1"));
     when(appContext.getClock()).thenReturn(new SystemClock());
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
     // write data in attempt_1
@@ -430,6 +432,8 @@ public class TestRecoveryParser {
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(recoveryPath+"/1"));
     when(appContext.getClock()).thenReturn(new SystemClock());
     when(mockDAGImpl.getID()).thenReturn(dagID);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     RecoveryService rService = new RecoveryService(appContext);
     Configuration conf = new Configuration();
@@ -465,6 +469,8 @@ public class TestRecoveryParser {
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(recoveryPath+"/1"));
     when(appContext.getClock()).thenReturn(new SystemClock());
     when(mockDAGImpl.getID()).thenReturn(dagID);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     RecoveryService rService = new RecoveryService(appContext);
     Configuration conf = new Configuration();
@@ -497,6 +503,8 @@ public class TestRecoveryParser {
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(recoveryPath+"/1"));
     when(appContext.getClock()).thenReturn(new SystemClock());
     when(mockDAGImpl.getID()).thenReturn(dagID);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     RecoveryService rService = new RecoveryService(appContext);
     Configuration conf = new Configuration();
@@ -543,6 +551,8 @@ public class TestRecoveryParser {
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(recoveryPath+"/1"));
     when(appContext.getClock()).thenReturn(new SystemClock());
     when(mockDAGImpl.getID()).thenReturn(dagID);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     // MockRecoveryService will skip the non-summary event
     MockRecoveryService rService = new MockRecoveryService(appContext);
@@ -617,6 +627,8 @@ public class TestRecoveryParser {
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(recoveryPath+"/1"));
     when(appContext.getClock()).thenReturn(new SystemClock());
     when(mockDAGImpl.getID()).thenReturn(dagID);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     RecoveryService rService = new RecoveryService(appContext);
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index f0b89c8..28670ff 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
@@ -103,6 +104,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -302,6 +304,7 @@ public class TestCommit {
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
     appContext = mock(AppContext.class);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
     rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build());
     execService = MoreExecutors.listeningDecorator(rawExecutor);

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 31b4f76..1809230 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.URL;
@@ -40,6 +41,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -186,6 +189,7 @@ public class TestDAGImpl {
   private HistoryEventHandler historyEventHandler;
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
   private ClusterInfo clusterInfo = new 
ClusterInfo(Resource.newInstance(8192,10));
+  private HadoopShim defaultShim = new DefaultHadoopShim();
 
   static {
     Limits.reset();
@@ -845,6 +849,8 @@ public class TestDAGImpl {
     appContext = mock(AppContext.class);
     execService = mock(ListeningExecutorService.class);
     final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+    when(appContext.getHadoopShim()).thenReturn(defaultShim);
+    
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
     
     Mockito.doAnswer(new Answer() {
       public ListenableFuture<Void> answer(InvocationOnMock invocation) {
@@ -864,6 +870,7 @@ public class TestDAGImpl {
     doReturn(dagId).when(appContext).getCurrentDAGID();
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(aclManager).when(appContext).getAMACLManager();
+    doReturn(defaultShim).when(appContext).getHadoopShim();
     dag = new DAGImpl(dagId, conf, dagPlan,
         dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
         fsTokens, clock, "user", thh, appContext);
@@ -873,6 +880,8 @@ public class TestDAGImpl {
     mrrAppContext = mock(AppContext.class);
     doReturn(aclManager).when(mrrAppContext).getAMACLManager();
     doReturn(execService).when(mrrAppContext).getExecService();
+    doReturn(defaultShim).when(mrrAppContext).getHadoopShim();
+
     mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
     mrrDagPlan = createTestMRRDAGPlan();
     mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
@@ -889,6 +898,8 @@ public class TestDAGImpl {
     groupAppContext = mock(AppContext.class);
     doReturn(aclManager).when(groupAppContext).getAMACLManager();
     doReturn(execService).when(groupAppContext).getExecService();
+    doReturn(defaultShim).when(groupAppContext).getHadoopShim();
+
     groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
     groupDagPlan = createGroupDAGPlan();
     groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
@@ -961,6 +972,7 @@ public class TestDAGImpl {
     dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation, useLegacy);
     dagWithCustomEdgeAppContext = mock(AppContext.class);
     doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
+    when(dagWithCustomEdgeAppContext.getHadoopShim()).thenReturn(defaultShim);
     dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, 
dagPlanWithCustomEdge,
         dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
         fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 3eed717..3a602bc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -119,6 +119,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
@@ -319,6 +320,8 @@ public class TestDAGRecovery {
     execService = mock(ListeningExecutorService.class);
     thh = mock(TaskHeartbeatHandler.class);
     final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
 
     Mockito.doAnswer(new Answer() {
       public ListenableFuture<Void> answer(InvocationOnMock invocation) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 1aaf588..986f64d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -2353,6 +2354,7 @@ public class TestVertexImpl {
     }
     dispatcher = new DrainDispatcher();
     appContext = mock(AppContext.class);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
     thh = mock(TaskHeartbeatHandler.class);
     historyEventHandler = mock(HistoryEventHandler.class);
     TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index 99094c6..d828d6b 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -36,6 +36,7 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -62,6 +63,9 @@ public class TestRecoveryService {
     AppContext appContext = mock(AppContext.class);
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(TEST_ROOT_DIR));
     when(appContext.getClock()).thenReturn(new SystemClock());
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    ApplicationId appId = 
ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     MockRecoveryService recoveryService = new MockRecoveryService(appContext);
     
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, 
true);
@@ -85,6 +89,8 @@ public class TestRecoveryService {
     AppContext appContext = mock(AppContext.class);
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(TEST_ROOT_DIR));
     when(appContext.getClock()).thenReturn(new SystemClock());
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     MockRecoveryService recoveryService = new MockRecoveryService(appContext);
     
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, 
true);
@@ -121,6 +127,8 @@ public class TestRecoveryService {
     AppContext appContext = mock(AppContext.class);
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(TEST_ROOT_DIR));
     when(appContext.getClock()).thenReturn(new SystemClock());
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     MockRecoveryService recoveryService = new MockRecoveryService(appContext);
     
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, 
true);
@@ -147,6 +155,8 @@ public class TestRecoveryService {
     AppContext appContext = mock(AppContext.class);
     when(appContext.getCurrentRecoveryDir()).thenReturn(new 
Path(TEST_ROOT_DIR));
     when(appContext.getClock()).thenReturn(new SystemClock());
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
 
     MockRecoveryService recoveryService = new MockRecoveryService(appContext);
     
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, 
true);

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index a414448..9a9a790 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -67,6 +67,11 @@
           <artifactId>tez-yarn-timeline-history</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>hadoop-shim-2.4</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
@@ -80,6 +85,40 @@
           <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>hadoop-shim-2.6</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hadoop28</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <!--
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-history-with-fs</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-cache-plugin</artifactId>
+          <version>${project.version}</version>
+        </dependency> -->
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>hadoop-shim-2.8</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git 
a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java 
b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 6960559..a3c0224 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -30,6 +30,10 @@ import com.google.common.collect.Sets;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.client.CallerContext;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -64,6 +68,7 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
   private boolean isLocalMode = false;
   private boolean isCountersLog = false;
   private boolean generateSplitInClient = false;
+  private HadoopShim hadoopShim;
 
   protected boolean isCountersLog() {
          return isCountersLog;
@@ -103,6 +108,8 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
     if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
       generateSplitInClient = true;
     }
+    hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
+
     return _execute(otherArgs, null, null);
   }
 
@@ -122,6 +129,7 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
   public int run(TezConfiguration conf, String[] args, @Nullable TezClient 
tezClient) throws
       Exception {
     setConf(conf);
+    hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
     GenericOptionsParser optionParser = new GenericOptionsParser(conf, 
getExtraOptions(), args);
     if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) {
       isLocalMode = true;
@@ -158,7 +166,13 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
     CallerContext callerContext = CallerContext.create("TezExamples",
         "Tez Example DAG: " + dag.getName());
     ApplicationId appId = tezClientInternal.getAppMasterApplicationId();
+    if (hadoopShim == null) {
+      Configuration conf = (getConf() == null ? new Configuration(false) : 
getConf());
+      hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
+    }
+
     if (appId != null) {
+      TezUtilsInternal.setHadoopCallerContext(hadoopShim, appId);
       callerContext.setCallerIdAndType(appId.toString(), 
"TezExampleApplication");
     }
     dag.setCallerContext(callerContext);

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index ad05af9..07dcc9b 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -58,6 +58,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.task.TaskReporter;
 import org.apache.tez.runtime.task.TaskRunner2Result;
@@ -305,7 +306,8 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
               request.getContainerIdString(),
               request.getTokenIdentifier(), request.getAppAttemptNumber(), 
workingDir, localDirs,
               envMap, objectRegistry, pid,
-              executionContext, credentials, memoryAvailable, 
request.getUser(), null, false);
+              executionContext, credentials, memoryAvailable, 
request.getUser(), null, false,
+              new DefaultHadoopShim());
       ContainerExecutionResult result = tezChild.run();
       LOG.info("ExecutionTime for Container: " + 
request.getContainerIdString() + "=" +
           sw.stop().elapsedMillis());
@@ -449,7 +451,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
           request.getAppAttemptNumber(),
           serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor, objectRegistry,
           pid,
-          executionContext, memoryAvailable, false);
+          executionContext, memoryAvailable, false, new DefaultHadoopShim());
 
       boolean shouldDie;
       try {

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 0129a8b..0c1dc66 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -47,6 +47,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
@@ -207,7 +208,7 @@ public class TestMROutput {
         null,
         new HashMap<String, String>(),
         HashMultimap.<String, String>create(), null, "", new 
ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory(), true);
+        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
     return task;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 71aa87c..133ef9e 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -232,7 +233,7 @@ public class MapUtils {
         serviceConsumerMetadata,
         envMap,
         HashMultimap.<String, String>create(), null, "", new 
ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory(), true);
+        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index db78b6e..382bc0e 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -224,7 +225,7 @@ public class TestReduceProcessor {
         serviceConsumerMetadata,
         serviceProviderEnvMap,
         HashMultimap.<String, String>create(), null, "", new 
ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory(), true);
+        Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
 
     List<Event> destEvents = new LinkedList<Event>();
     destEvents.add(dme);

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index 6d40b6a..9c1cf2f 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -34,6 +34,10 @@
       <artifactId>tez-common</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>hadoop-shim</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-annotations</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 1a59310..df09fdb 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.slf4j.Logger;
@@ -151,13 +152,14 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
   private volatile ObjectRegistry objectRegistry;
   private final ExecutionContext ExecutionContext;
   private final long memAvailable;
+  private final HadoopShim hadoopShim;
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> 
envMap,
       Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
       String pid, ExecutionContext ExecutionContext, long memAvailable,
-      boolean updateSysCounters) throws IOException {
+      boolean updateSysCounters, HadoopShim hadoopShim) throws IOException {
     // Note: If adding any fields here, make sure they're cleaned up in the 
cleanupContext method.
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters);
@@ -200,6 +202,7 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
     this.objectRegistry = objectRegistry;
     this.ExecutionContext = ExecutionContext;
     this.memAvailable = memAvailable;
+    this.hadoopShim = hadoopShim;
   }
 
   /**
@@ -1017,4 +1020,8 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
     return this.outputsMap;
   }
 
+  @Private
+  public HadoopShim getHadoopShim() {
+    return hadoopShim;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index ab77635..40d4051 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +35,6 @@ import org.slf4j.LoggerFactory;
  */
 public class TaskRunner2Callable extends 
CallableWithNdc<TaskRunner2Callable.TaskRunner2CallableResult> {
 
-
-
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskRunner2Callable.class);
 
   private final LogicalIOProcessorRuntimeTask task;
@@ -64,13 +63,15 @@ public class TaskRunner2Callable extends 
CallableWithNdc<TaskRunner2Callable.Tas
             return new TaskRunner2CallableResult(null);
           }
           LOG.info("Initializing task" + ", taskAttemptId={}", 
task.getTaskAttemptID());
+          TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), 
task.getTaskAttemptID());
           task.initialize();
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) 
{
             LOG.info("Running task, taskAttemptId={}", 
task.getTaskAttemptID());
             task.run();
           } else {
-            LOG.info("Stopped before running the processor taskAttemptId={}", 
task.getTaskAttemptID());
+            LOG.info("Stopped before running the processor taskAttemptId={}",
+                task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 784065a..07810d9 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -64,6 +64,9 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.RelocalizationUtils;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShimProvider;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -118,6 +121,7 @@ public class TezChild {
   private TaskReporterInterface taskReporter;
   private int taskCount = 0;
   private TezVertexID lastVertexID;
+  private final HadoopShim hadoopShim;
 
   public TezChild(Configuration conf, String host, int port, String 
containerIdentifier,
       String tokenIdentifier, int appAttemptNumber, String workingDir, 
String[] localDirs,
@@ -125,7 +129,7 @@ public class TezChild {
       ObjectRegistryImpl objectRegistry, String pid,
       ExecutionContext executionContext,
       Credentials credentials, long memAvailable, String user, 
TezTaskUmbilicalProtocol umbilical,
-      boolean updateSysCounters) throws IOException, InterruptedException {
+      boolean updateSysCounters, HadoopShim hadoopShim) throws IOException, 
InterruptedException {
     this.defaultConf = conf;
     this.containerIdString = containerIdentifier;
     this.appAttemptNumber = appAttemptNumber;
@@ -138,6 +142,7 @@ public class TezChild {
     this.memAvailable = memAvailable;
     this.user = user;
     this.updateSysCounters = updateSysCounters;
+    this.hadoopShim = hadoopShim;
 
     getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -239,6 +244,8 @@ public class TezChild {
             containerTask.getTaskSpec().getTaskAttemptID().toString());
         System.out.println(timeStamp + " Starting to run new task attempt: " +
             containerTask.getTaskSpec().getTaskAttemptID().toString());
+        TezUtilsInternal.setHadoopCallerContext(hadoopShim,
+            containerTask.getTaskSpec().getTaskAttemptID());
         TezUtilsInternal.updateLoggers(loggerAddend);
         FileSystem.clearStatistics();
 
@@ -250,7 +257,8 @@ public class TezChild {
         TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
             localDirs, containerTask.getTaskSpec(), appAttemptNumber,
             serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, 
taskReporter,
-            executor, objectRegistry, pid, executionContext, memAvailable, 
updateSysCounters);
+            executor, objectRegistry, pid, executionContext, memAvailable, 
updateSysCounters,
+            hadoopShim);
         boolean shouldDie;
         try {
           TaskRunner2Result result = taskRunner.run();
@@ -443,7 +451,7 @@ public class TezChild {
       String tokenIdentifier, int attemptNumber, String[] localDirs, String 
workingDirectory,
       Map<String, String> serviceProviderEnvMap, @Nullable String pid,
       ExecutionContext executionContext, Credentials credentials, long 
memAvailable, String user,
-      TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters)
+      TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters, 
HadoopShim hadoopShim)
       throws IOException, InterruptedException, TezException {
 
     // Pull in configuration specified for the session.
@@ -456,7 +464,8 @@ public class TezChild {
 
     return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
         attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, 
objectRegistry, pid,
-        executionContext, credentials, memAvailable, user, tezUmbilical, 
updateSysCounters);
+        executionContext, credentials, memAvailable, user, tezUmbilical, 
updateSysCounters,
+        hadoopShim);
   }
 
   public static void main(String[] args) throws IOException, 
InterruptedException, TezException {
@@ -488,11 +497,14 @@ public class TezChild {
     TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, 
confProto.getConfKeyValuesList());
     UserGroupInformation.setConfiguration(defaultConf);
     Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+
+    HadoopShim hadoopShim = new HadoopShimsLoader(defaultConf).getHadoopShim();
+
     TezChild tezChild = newTezChild(defaultConf, host, port, 
containerIdentifier,
         tokenIdentifier, attemptNumber, localDirs, 
System.getenv(Environment.PWD.name()),
         System.getenv(), pid, new 
ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
         credentials, Runtime.getRuntime().maxMemory(), System
-            .getenv(ApplicationConstants.Environment.USER.toString()), null, 
true);
+            .getenv(ApplicationConstants.Environment.USER.toString()), null, 
true, hadoopShim);
     tezChild.run();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 4fdc17d..219cc2f 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
@@ -91,6 +92,8 @@ public class TezTaskRunner2 {
 
   private volatile long taskKillStartTime  = 0;
 
+  private final HadoopShim hadoopShim;
+
   // The callable which is being used to execute the task.
   private volatile TaskRunner2Callable taskRunnerCallable;
 
@@ -102,15 +105,16 @@ public class TezTaskRunner2 {
                         TaskReporterInterface taskReporter, 
ListeningExecutorService executor,
                         ObjectRegistry objectRegistry, String pid,
                         ExecutionContext executionContext, long memAvailable,
-                        boolean updateSysCounters) throws
+                        boolean updateSysCounters, HadoopShim hadoopShim) 
throws
       IOException {
     this.ugi = ugi;
     this.taskReporter = taskReporter;
     this.executor = executor;
     this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
+    this.hadoopShim = hadoopShim;
     this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, 
tezConf, localDirs,
         umbilicalAndErrorHandler, serviceConsumerMetadata, 
serviceProviderEnvMap, startedInputsMap,
-        objectRegistry, pid, executionContext, memAvailable, 
updateSysCounters);
+        objectRegistry, pid, executionContext, memAvailable, 
updateSysCounters, hadoopShim);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 12fec7e..00e830f 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -82,7 +83,8 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     LogicalIOProcessorRuntimeTask lio1 = new 
LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), 
startedInputsMap, null,
-        "", new ExecutionContextImpl("localhost"), 
Runtime.getRuntime().maxMemory(), true);
+        "", new ExecutionContextImpl("localhost"), 
Runtime.getRuntime().maxMemory(), true,
+        new DefaultHadoopShim());
 
     try {
       lio1.initialize();
@@ -107,13 +109,12 @@ public class TestLogicalIOProcessorRuntimeTask {
       cleanupAndTest(lio1);
     }
 
-
-
-    // local mode 
+    // local mode
     tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
     LogicalIOProcessorRuntimeTask lio2 = new 
LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), 
startedInputsMap, null,
-        "", new ExecutionContextImpl("localhost"), 
Runtime.getRuntime().maxMemory(), true);
+        "", new ExecutionContextImpl("localhost"), 
Runtime.getRuntime().maxMemory(), true,
+        new DefaultHadoopShim());
     try {
       lio2.initialize();
       lio2.run();

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index f0c1e66..d16b880 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -32,6 +32,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
@@ -63,7 +64,7 @@ public class TestProcessorContext {
     LogicalIOProcessorRuntimeTask runtimeTask = new 
LogicalIOProcessorRuntimeTask(
         mockSpec, 1, 
         new Configuration(), new String[]{"/"}, 
-        tezUmbilical, null, null, null, null, "", null, 1024, false);
+        tezUmbilical, null, null, null, null, "", null, 1024, false, new 
DefaultHadoopShim());
     LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask);
     Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
     Map<String, String> auxServiceEnv = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index 989753b..7843754 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -64,6 +64,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
@@ -634,7 +635,7 @@ public class TestTaskExecution2 {
           new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
           HashMultimap.<String, String>create(), taskReporter,
           executor, null, "", new ExecutionContextImpl("localhost"),
-          Runtime.getRuntime().maxMemory(), updateSysCounters);
+          Runtime.getRuntime().maxMemory(), updateSysCounters, new 
DefaultHadoopShim());
     }
 
     return taskRunner;
@@ -662,7 +663,7 @@ public class TestTaskExecution2 {
                                  boolean updateSysCounters) throws IOException 
{
       super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, 
serviceConsumerMetadata,
           serviceProviderEnvMap, startedInputsMap, taskReporter, executor, 
objectRegistry, pid,
-          executionContext, memAvailable, updateSysCounters);
+          executionContext, memAvailable, updateSysCounters, new 
DefaultHadoopShim());
     }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 32a3619..884f0e6 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -220,8 +221,8 @@ public class TestOnFileUnorderedKVOutput {
     
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
     task = new LogicalIOProcessorRuntimeTask(
         mockSpec, appAttemptNumber, 
-        new Configuration(), new String[]{"/"}, 
-        tezUmbilical, null, null, null, null, "", null, 1024, false);
+        new Configuration(), new String[]{"/"},
+        tezUmbilical, null, null, null, null, "", null, 1024, false, new 
DefaultHadoopShim());
     
     LogicalIOProcessorRuntimeTask runtimeTask = spy(task);
     

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
index 0bac721..72538bf 100644
--- a/tez-tests/pom.xml
+++ b/tez-tests/pom.xml
@@ -32,7 +32,6 @@
     <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-common</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>

http://git-wip-us.apache.org/repos/asf/tez/blob/2c7abeb1/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 3188c6e..6ed6d2d 100644
--- 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -53,6 +53,7 @@ import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -66,6 +67,8 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
@@ -373,8 +376,8 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
     }
 
     UserGroupInformation.setConfiguration(conf);
-
     TezConfiguration tezConf = new TezConfiguration(conf);
+    HadoopShim hadoopShim = new HadoopShimsLoader(tezConf).getHadoopShim();
     TestOrderedWordCount instance = new TestOrderedWordCount();
 
     FileSystem fs = FileSystem.get(conf);
@@ -407,6 +410,9 @@ public class TestOrderedWordCount extends Configured 
implements Tool {
     TezClient tezSession = TezClient.create("OrderedWordCountSession", tezConf,
         null, instance.credentials);
     tezSession.start();
+    if (tezSession.getAppMasterApplicationId() != null) {
+      TezUtilsInternal.setHadoopCallerContext(hadoopShim, 
tezSession.getAppMasterApplicationId());
+    }
 
     DAGStatus dagStatus = null;
     DAGClient dagClient = null;

Reply via email to