Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 c3a7c2127 -> 287194c20


TEZ-3682. Pass parameters instead of configuration for changes to support tez 
shuffle handler (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/287194c2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/287194c2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/287194c2

Branch: refs/heads/TEZ-3334
Commit: 287194c2099a934eb2c0a2520508ef538019fbdf
Parents: c3a7c21
Author: Jonathan Eagles <[email protected]>
Authored: Thu Apr 6 09:14:49 2017 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Thu Apr 6 09:14:49 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../tez/dag/app/launcher/DagDeleteRunnable.java | 10 +++++-----
 .../dag/app/launcher/DeletionTrackerImpl.java   |  2 +-
 .../app/rm/container/AMContainerHelpers.java    | 10 +++++-----
 .../runtime/library/common/TezRuntimeUtils.java |  6 ++----
 .../library/common/shuffle/ShuffleUtils.java    | 13 +++++-------
 .../common/shuffle/orderedgrouped/Shuffle.java  |  3 ++-
 .../ShuffleInputEventHandlerOrderedGrouped.java |  4 ++--
 .../common/sort/impl/PipelinedSorter.java       |  9 ++++++---
 .../common/sort/impl/dflt/DefaultSorter.java    |  6 +++++-
 .../output/OrderedPartitionedKVOutput.java      |  4 +++-
 .../common/shuffle/TestShuffleUtils.java        | 13 +++++++++---
 ...tShuffleInputEventHandlerOrderedGrouped.java |  3 ++-
 .../common/sort/impl/TestPipelinedSorter.java   | 10 +++++-----
 .../TestUnorderedPartitionedKVWriter.java       | 21 +++++++++++++-------
 15 files changed, 68 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 0fb021e..7ef35c6 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3682. Pass parameters instead of configuration for changes to support 
tez shuffle handler
   TEZ-3628. Give Tez shuffle handler threads custom names
   TEZ-3621. Optimize the Shuffle Handler content length calculation for keep 
alive
   TEZ-3620. UnorderedPartitionedKVOutput is missing the shuffle service config 
in the confKeys set

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
index 669d539..6d966b0 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
@@ -18,11 +18,11 @@
 
 package org.apache.tez.dag.app.launcher;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 
 import java.net.URL;
@@ -33,15 +33,15 @@ class DagDeleteRunnable implements Runnable {
   final JobTokenSecretManager jobTokenSecretManager;
   final String tezDefaultComponentName;
   final int shufflePort;
-  final Configuration conf;
+  final HttpConnectionParams httpConnectionParams;
 
   public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag,
-                           Configuration conf,
+                           HttpConnectionParams httpConnectionParams,
                            JobTokenSecretManager jobTokenSecretMgr, String 
tezDefaultComponent) {
     this.nodeId = nodeId;
     this.shufflePort = shufflePort;
     this.dag = currentDag;
-    this.conf = conf;
+    this.httpConnectionParams = httpConnectionParams;
     this.jobTokenSecretManager = jobTokenSecretMgr;
     this.tezDefaultComponentName = tezDefaultComponent;
   }
@@ -53,7 +53,7 @@ class DagDeleteRunnable implements Runnable {
           nodeId.getHost(), shufflePort,
           dag.getApplicationId().toString(), dag.getId(), false);
       BaseHttpConnection httpConnection = 
TezRuntimeUtils.getHttpConnection(true, baseURL,
-          TezRuntimeUtils.getHttpConnectionParams(conf), "DAGDelete", 
jobTokenSecretManager);
+          httpConnectionParams, "DAGDelete", jobTokenSecretManager);
       httpConnection.connect();
       httpConnection.getInputStream();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
index 4a4a4ae..625aabb 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
@@ -50,7 +50,7 @@ public class DeletionTrackerImpl extends DeletionTracker {
       //TODO: add check for healthy node
       if (shufflePort != TezRuntimeUtils.INVALID_PORT) {
         DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
-            shufflePort, dag, conf, jobTokenSecretManager, this.pluginName);
+            shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), 
jobTokenSecretManager, this.pluginName);
         dagDeleteService.submit(dagDeleteRunnable);
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 51e954d..ba3ecad 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -91,11 +91,11 @@ public class AMContainerHelpers {
    * Create the common {@link ContainerLaunchContext} for all attempts.
    *
    * @param applicationACLs
-   * @param conf
+   * @param auxiliaryService
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
-      Credentials credentials, Map<String, LocalResource> localResources, 
Configuration conf) {
+      Credentials credentials, Map<String, LocalResource> localResources, 
String auxiliaryService) {
 
     // Application environment
     Map<String, String> environment = new HashMap<String, String>();
@@ -129,8 +129,6 @@ public class AMContainerHelpers {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Putting shuffle token in serviceData in common CLC");
       }
-      String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
       serviceData.put(auxiliaryService,
           
TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {
@@ -161,8 +159,10 @@ public class AMContainerHelpers {
     ContainerLaunchContext commonContainerSpec = null;
     synchronized (commonContainerSpecLock) {
       if (!commonContainerSpecs.containsKey(tezDAGID)) {
+        String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
         commonContainerSpec =
-            createCommonContainerLaunchContext(acls, credentials, 
commonDAGLRs, conf);
+            createCommonContainerLaunchContext(acls, credentials, 
commonDAGLRs, auxiliaryService);
         commonContainerSpecs.put(tezDAGID, commonContainerSpec);
       } else {
         commonContainerSpec = commonContainerSpecs.get(tezDAGID);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index d39d554..8e13c13 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -154,13 +154,11 @@ public class TezRuntimeUtils {
     return partitioner;
   }
 
-  public static TezTaskOutput instantiateTaskOutputManager(
-      Configuration conf, OutputContext outputContext) {
+  public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, 
OutputContext outputContext) {
     Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
         TezTaskOutputFiles.class);
     try {
-      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String
-          .class, int.class);
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, 
String.class, int.class);
       ctor.setAccessible(true);
       TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf,
           outputContext.getUniqueIdentifier(),

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1d644aa..25b9b4f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -265,14 +265,14 @@ public class ShuffleUtils {
    * @param finalMergeEnabled
    * @param isLastEvent
    * @param pathComponent
-   * @param conf
+   * @param auxiliaryService
    * @param deflater
    * @return ByteBuffer
    * @throws IOException
    */
   static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
       int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext 
context,
-      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String 
pathComponent, Configuration conf, Deflater deflater)
+      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String 
pathComponent, String auxiliaryService, Deflater deflater)
       throws IOException {
     DataMovementEventPayloadProto.Builder payloadBuilder = 
DataMovementEventPayloadProto
         .newBuilder();
@@ -301,9 +301,6 @@ public class ShuffleUtils {
 
     if (!sendEmptyPartitionDetails || outputGenerated) {
       String host = context.getExecutionContext().getHostName();
-      String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
-
       ByteBuffer shuffleMetadata = context
           .getServiceProviderMetaData(auxiliaryService);
       int shufflePort = 
ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
@@ -391,13 +388,13 @@ public class ShuffleUtils {
    * @param numPhysicalOutputs
    * @param pathComponent
    * @param partitionStats
-   * @param conf
+   * @param auxiliaryService
    * @throws IOException
    */
   public static void generateEventOnSpill(List<Event> eventList, boolean 
finalMergeEnabled,
       boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord 
spillRecord,
       int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String 
pathComponent,
-      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, 
Configuration conf, Deflater deflater)
+      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, 
String auxiliaryService, Deflater deflater)
       throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
@@ -415,7 +412,7 @@ public class ShuffleUtils {
 
     ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, 
numPhysicalOutputs,
         spillRecord, context, spillId,
-        finalMergeEnabled, isLastEvent, pathComponent, conf, deflater);
+        finalMergeEnabled, isLastEvent, pathComponent, auxiliaryService, 
deflater);
 
     if (finalMergeEnabled || isLastEvent) {
       VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index b3d8a6f..f787c59 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -188,7 +189,7 @@ public class Shuffle implements ExceptionReporter {
     eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
         inputContext,
         scheduler,
-        conf);
+        ShuffleUtils.isTezShuffleHandler(conf));
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + 
srcNameTrimmed + "}").build());

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index fda899f..116098f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -61,10 +61,10 @@ public class ShuffleInputEventHandlerOrderedGrouped 
implements ShuffleEventHandl
 
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
                                                 ShuffleScheduler scheduler,
-                                                Configuration conf) {
+                                                boolean compositeFetch) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
-    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
+    this.compositeFetch = compositeFetch;
     this.inflater = TezCommonUtils.newInflater();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 5203851..755a131 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +117,7 @@ public class PipelinedSorter extends ExternalSorter {
   private final int MIN_BLOCK_SIZE;
   private final boolean lazyAllocateMem;
   private final Deflater deflater;
+  private final String auxiliaryService;
 
   // TODO Set additional countesr - total bytes written, spills etc.
 
@@ -155,7 +157,8 @@ public class PipelinedSorter extends ExternalSorter {
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
     pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
-
+    auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
     //sanity checks
     final long sortmb = this.availableMemoryMb;
 
@@ -354,7 +357,7 @@ public class PipelinedSorter extends ExternalSorter {
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
         outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
         partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
-        reportDetailedPartitionStats(), this.conf, deflater);
+        reportDetailedPartitionStats(), auxiliaryService, deflater);
     outputContext.sendEvents(events);
     LOG.info(outputContext.getDestinationVertexName() +
         ": Added spill event for spill (final update=false), spillId=" + 
(numSpills - 1));
@@ -677,7 +680,7 @@ public class PipelinedSorter extends ExternalSorter {
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), 
isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats,
-              reportDetailedPartitionStats(), this.conf, deflater);
+              reportDetailedPartitionStats(), auxiliaryService, deflater);
           LOG.info(outputContext.getDestinationVertexName() + ": Adding spill 
event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 8ff1c99..a6f7cf7 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,6 +32,7 @@ import java.util.zip.Deflater;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +116,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
   volatile boolean spillThreadRunning = false;
   final SpillThread spillThread = new SpillThread();
   private final Deflater deflater;
+  private final String auxiliaryService;
 
   final ArrayList<TezSpillRecord> indexCacheList =
     new ArrayList<TezSpillRecord>();
@@ -153,6 +155,8 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
           TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " 
does not work "
           + "with DefaultSorter. It is supported only with PipelinedSorter.");
     }
+    auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 
     // buffers and accounting
     int maxMemUsage = sortmb << 20;
@@ -1137,7 +1141,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), 
isLastEvent,
         outputContext, index, spillRecord, partitions, 
sendEmptyPartitionDetails, pathComponent,
-        partitionStats, reportDetailedPartitionStats(), this.conf, deflater);
+        partitionStats, reportDetailedPartitionStats(), auxiliaryService, 
deflater);
 
     LOG.info(outputContext.getDestinationVertexName() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), 
spillId=" + index);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 6b14f8d..98e14be 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -201,10 +201,12 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     List<Event> eventList = Lists.newLinkedList();
     if (finalMergeEnabled && !pipelinedShuffle) {
       boolean isLastEvent = true;
+      String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, 
isLastEvent,
           getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), 
conf),
           getNumPhysicalOutputs(), sendEmptyPartitionDetails, 
getContext().getUniqueIdentifier(),
-          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
this.conf, deflater);
+          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
auxiliaryService, deflater);
     }
     return eventList;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index ec19f67..4a28224 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -163,9 +163,12 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
+    String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false, this.conf, 
TezCommonUtils.newBestCompressionDeflater());
+            physicalOutputs, true, pathComponent, null, false, 
auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 1);
     Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -200,11 +203,13 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
+    String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false, this.conf, 
TezCommonUtils.newBestCompressionDeflater());
+            physicalOutputs, true, pathComponent, null, false, 
auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -241,11 +246,13 @@ public class TestShuffleUtils {
     int spillId = 0;
     int physicalOutputs = 10;
     String pathComponent = "/attempt_x_y_0/file.out";
+    String auxiliaryService = 
conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false, this.conf, 
TezCommonUtils.newBestCompressionDeflater());
+            physicalOutputs, true, pathComponent, null, false, 
auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 1d4afde..72cba80 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -18,6 +18,7 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.Before;
 import org.junit.Test;
@@ -154,7 +155,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         0,
         "src vertex");
     scheduler = spy(realScheduler);
-    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, 
scheduler, config);
+    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, 
scheduler, ShuffleUtils.isTezShuffleHandler(config));
     mergeManager = mock(MergeManager.class);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index c3f8dda..454cf22 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -100,7 +100,9 @@ public class TestPipelinedSorter {
     ApplicationId appId = ApplicationId.newInstance(10000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    this.outputContext = createMockOutputContext(counters, appId, uniqueId, 
getConf());
+    String auxiliaryService = 
getConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    this.outputContext = createMockOutputContext(counters, appId, uniqueId, 
auxiliaryService);
   }
 
   public static Configuration getConf() {
@@ -754,7 +756,7 @@ public class TestPipelinedSorter {
   }
 
   private static OutputContext createMockOutputContext(TezCounters counters, 
ApplicationId appId,
-      String uniqueId, Configuration conf) throws IOException {
+      String uniqueId, String auxiliaryService) throws IOException {
     OutputContext outputContext = mock(OutputContext.class);
 
     ExecutionContext execContext = new ExecutionContextImpl("localhost");
@@ -762,9 +764,7 @@ public class TestPipelinedSorter {
     DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
     serviceProviderMetaData.writeInt(80);
     
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
-        .getServiceProviderMetaData
-            (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-                TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
+        .getServiceProviderMetaData(auxiliaryService);
 
     doReturn(execContext).when(outputContext).getExecutionContext();
     
doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();

http://git-wip-us.apache.org/repos/asf/tez/blob/287194c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 031b44d..07feb20 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -162,7 +162,9 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, defaultConf);
+    String auxiliaryService = 
defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, auxiliaryService);
 
     int maxSingleBufferSizeBytes = 2047;
     Configuration conf = createConfiguration(outputContext, IntWritable.class, 
LongWritable.class,
@@ -259,7 +261,9 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     int dagId = 1;
-    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, defaultConf);
+    String auxiliaryService = 
defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, auxiliaryService);
     Random random = new Random();
 
     Configuration conf = createConfiguration(outputContext, Text.class, 
Text.class, shouldCompress,
@@ -528,7 +532,9 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     int dagId = 1;
-    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, defaultConf);
+    String auxiliaryService = 
defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, auxiliaryService);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, 
LongWritable.class,
         shouldCompress, -1);
@@ -713,7 +719,9 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     int dagId = 1;
-    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, defaultConf);
+    String auxiliaryService = 
defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    OutputContext outputContext = createMockOutputContext(counters, appId, 
uniqueId, auxiliaryService);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, 
LongWritable.class,
         shouldCompress, -1);
@@ -904,7 +912,7 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   private OutputContext createMockOutputContext(TezCounters counters, 
ApplicationId appId,
-      String uniqueId, Configuration conf) {
+      String uniqueId, String auxiliaryService) {
     OutputContext outputContext = mock(OutputContext.class);
     doReturn(counters).when(outputContext).getCounters();
     doReturn(appId).when(outputContext).getApplicationId();
@@ -927,8 +935,7 @@ public class TestUnorderedPartitionedKVWriter {
         portBuffer.reset();
         return portBuffer;
       }
-    
}).when(outputContext).getServiceProviderMetaData(eq(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)));
+    }).when(outputContext).getServiceProviderMetaData(eq(auxiliaryService));
 
     Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
     String[] outDirs = new String[] { outDirBase.toString() };

Reply via email to