Repository: tez Updated Branches: refs/heads/TEZ-3334 397d6af39 -> 53ea6f5b3
TEZ-3238. TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to âmapreduce_shuffleâ (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/53ea6f5b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/53ea6f5b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/53ea6f5b Branch: refs/heads/TEZ-3334 Commit: 53ea6f5b3adfa71f3c235467a50757c72528d653 Parents: 397d6af Author: Jonathan Eagles <[email protected]> Authored: Fri Aug 12 13:11:40 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Aug 12 13:11:40 2016 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../org/apache/tez/client/TezClientUtils.java | 4 +++- .../apache/tez/dag/api/TezConfiguration.java | 10 ++++++++++ .../org/apache/tez/dag/api/TezConstants.java | 6 ------ .../app/launcher/LocalContainerLauncher.java | 16 ++++++++------- .../app/rm/container/AMContainerHelpers.java | 9 ++++++--- .../tez/service/impl/ContainerRunnerImpl.java | 8 +++++--- .../apache/tez/service/impl/TezTestService.java | 5 ++++- .../tez/mapreduce/processor/MapUtils.java | 7 +++++-- .../processor/reduce/TestReduceProcessor.java | 7 +++++-- .../org/apache/tez/runtime/task/TezChild.java | 4 +++- .../library/common/shuffle/ShuffleUtils.java | 21 ++++++++++++-------- .../common/shuffle/impl/ShuffleManager.java | 9 ++++++--- .../orderedgrouped/ShuffleScheduler.java | 7 +++++-- .../common/sort/impl/PipelinedSorter.java | 4 ++-- .../common/sort/impl/dflt/DefaultSorter.java | 2 +- .../writers/UnorderedPartitionedKVWriter.java | 5 ++++- .../output/OrderedPartitionedKVOutput.java | 2 +- .../common/shuffle/TestShuffleUtils.java | 12 ++++++----- .../impl/TestShuffleInputEventHandlerImpl.java | 7 +++++-- .../common/shuffle/impl/TestShuffleManager.java | 7 +++++-- .../common/sort/impl/TestPipelinedSorter.java | 8 +++++--- .../sort/impl/dflt/TestDefaultSorter.java | 4 +++- .../TestUnorderedPartitionedKVWriter.java | 15 ++++++++------ .../library/output/TestOnFileSortedOutput.java | 4 +++- .../output/TestOnFileUnorderedKVOutput.java | 4 +++- 26 files changed, 123 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index ef0a1cb..2122cca 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to âmapreduce_shuffleâ TEZ-3390. Package Shuffle Handler as a shaded uber-jar TEZ-3378. Move Shuffle Handler configuration into the Tez namespace TEZ-3377. Remove ShuffleHandler dependency on mapred.FadvisedChunkedFile and mapred.FadvisedFileRegion http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index eb1a95e..fbe36f1 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -692,7 +692,9 @@ public class TezClientUtils { // provide this to AuxServices running on the AM node - in case tasks run within the AM, // and no other task runs on this node. Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); - serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + serviceData.put(auxiliaryService, TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials))); // Setup ContainerLaunchContext for AM container http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 11c50cf..732fee9 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -138,6 +138,16 @@ public class TezConfiguration extends Configuration { public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true; /** + * String value. Specifies the name of the shuffle auxiliary service. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID = TEZ_AM_PREFIX + + "shuffle.auxiliary-service.id"; + public static final String TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT = + "mapreduce_shuffle"; + + /** * String value. Specifies a directory where Tez can create temporary job artifacts. */ @ConfigurationScope(Scope.AM) http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index 43f09a2..31c1e66 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -69,12 +69,6 @@ public class TezConstants { TezConfiguration.TEZ_SESSION_PREFIX + "local-resources.pb"; public static final String TEZ_APPLICATION_TYPE = "TEZ"; - - /** - * The service id for the NodeManager plugin used to share intermediate data - * between vertices. - */ - public static final String TEZ_SHUFFLE_HANDLER_SERVICE_ID = "tez_shuffle"; public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG"; http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 1e9d1e6..153b2e0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -118,13 +118,6 @@ public class LocalContainerLauncher extends ContainerLauncher { this.tal = taskCommunicatorManagerInterface; this.workingDirectory = workingDirectory; this.isLocalMode = isLocalMode; - if (isLocalMode) { - localEnv = Maps.newHashMap(); - AuxiliaryServiceHelper.setServiceDataIntoEnv( - ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv); - } else { - localEnv = System.getenv(); - } // Check if the hostname is set in the environment before overriding it. String host = isLocalMode ? InetAddress.getLocalHost().getHostName() : @@ -138,6 +131,15 @@ public class LocalContainerLauncher extends ContainerLauncher { throw new TezUncheckedException( "Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), e); } + if (isLocalMode) { + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + localEnv = Maps.newHashMap(); + AuxiliaryServiceHelper.setServiceDataIntoEnv( + auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv); + } else { + localEnv = System.getenv(); + } numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor"); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 11b5006..51e954d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -91,10 +91,11 @@ public class AMContainerHelpers { * Create the common {@link ContainerLaunchContext} for all attempts. * * @param applicationACLs + * @param conf */ private static ContainerLaunchContext createCommonContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, - Credentials credentials, Map<String, LocalResource> localResources) { + Credentials credentials, Map<String, LocalResource> localResources, Configuration conf) { // Application environment Map<String, String> environment = new HashMap<String, String>(); @@ -128,7 +129,9 @@ public class AMContainerHelpers { if (LOG.isDebugEnabled()) { LOG.debug("Putting shuffle token in serviceData in common CLC"); } - serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + serviceData.put(auxiliaryService, TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials))); } catch (IOException e) { throw new TezUncheckedException(e); @@ -159,7 +162,7 @@ public class AMContainerHelpers { synchronized (commonContainerSpecLock) { if (!commonContainerSpecs.containsKey(tezDAGID)) { commonContainerSpec = - createCommonContainerLaunchContext(acls, credentials, commonDAGLRs); + createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, conf); commonContainerSpecs.put(tezDAGID, commonContainerSpec); } else { commonContainerSpec = commonContainerSpecs.get(tezDAGID); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index f9de995..b3d6d54 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -132,9 +132,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun public void serviceStart() { } - public void setShufflePort(int shufflePort) { + public void setShufflePort(String auxiliaryService, int shufflePort) { AuxiliaryServiceHelper.setServiceDataIntoEnv( - TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + auxiliaryService, ByteBuffer.allocate(4).putInt(shufflePort), localEnv); } @@ -417,7 +417,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); - serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + serviceConsumerMetadata.put(auxiliaryService, TezCommonUtils.convertJobTokenToBytes(jobToken)); Multimap<String, String> startedInputsMap = HashMultimap.create(); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java index 322be00..8d9436e 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.service.ContainerRunner; import org.apache.tez.shufflehandler.ShuffleHandler; @@ -86,7 +87,9 @@ public class TezTestService extends AbstractService implements ContainerRunner { @Override public void serviceStart() throws Exception { ShuffleHandler.initializeAndStart(shuffleHandlerConf); - containerRunner.setShufflePort(ShuffleHandler.get().getPort()); + String auxiliaryService = getConfig().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + containerRunner.setShufflePort(auxiliaryService, ShuffleHandler.get().getPort()); server.start(); containerRunner.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 8309966..47ce377 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,12 +217,14 @@ public class MapUtils { outputSpecs, null, null); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); - serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, + String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + serviceConsumerMetadata.put(auxiliaryService, ShuffleUtils.convertJobTokenToBytes(shuffleToken)); Map<String, String> envMap = new HashMap<String, String>(); ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000); AuxiliaryServiceHelper - .setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, shufflePortBb, + .setServiceDataIntoEnv(auxiliaryService, shufflePortBb, envMap); LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 1922c53..043bd94 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,12 +209,14 @@ public class TestReduceProcessor { Collections.singletonList(reduceOutputSpec), null, null); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); - serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, + String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + serviceConsumerMetadata.put(auxiliaryService, ShuffleUtils.convertJobTokenToBytes(shuffleToken)); Map<String, String> serviceProviderEnvMap = new HashMap<String, String>(); ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000); AuxiliaryServiceHelper - .setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, shufflePortBb, + .setServiceDataIntoEnv(auxiliaryService, shufflePortBb, serviceProviderEnvMap); LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 07810d9..3ee89cf 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -175,7 +175,9 @@ public class TezChild { UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier); Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); - serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + serviceConsumerMetadata.put(auxiliaryService, TezCommonUtils.convertJobTokenToBytes(jobToken)); if (umbilical == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index e194298..fa8533c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -39,6 +39,7 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnection; import org.apache.tez.http.HttpConnectionParams; @@ -73,7 +74,6 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedP public class ShuffleUtils { private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class); - public static final String SHUFFLE_HANDLER_SERVICE_ID = "tez_shuffle"; private static final long MB = 1024l * 1024l; //Shared by multiple threads @@ -278,12 +278,13 @@ public class ShuffleUtils { * @param finalMergeEnabled * @param isLastEvent * @param pathComponent + * @param conf * @return ByteBuffer * @throws IOException */ static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails, - int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, - int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent) + int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, + int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Configuration conf) throws IOException { DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); @@ -312,8 +313,11 @@ public class ShuffleUtils { if (!sendEmptyPartitionDetails || outputGenerated) { String host = context.getExecutionContext().getHostName(); + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + ByteBuffer shuffleMetadata = context - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + .getServiceProviderMetaData(auxiliaryService); int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); payloadBuilder.setHost(host); payloadBuilder.setPort(shufflePort); @@ -398,12 +402,13 @@ public class ShuffleUtils { * @param numPhysicalOutputs * @param pathComponent * @param partitionStats + * @param conf * @throws IOException */ public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled, - boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, - int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, - @Nullable long[] partitionStats, boolean reportDetailedPartitionStats) + boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, + int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, + @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration conf) throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); @@ -421,7 +426,7 @@ public class ShuffleUtils { ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs, spillRecord, context, spillId, - finalMergeEnabled, isLastEvent, pathComponent); + finalMergeEnabled, isLastEvent, pathComponent, conf); if (finalMergeEnabled || isLastEvent) { VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats, http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index c80713b..a8fea9e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock; import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; @@ -244,10 +245,12 @@ public class ShuffleManager implements FetcherCallback { this.startTime = System.currentTimeMillis(); this.lastProgressTime = startTime; - + + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); SecretKey shuffleSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext - .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); + .getServiceConsumerMetaData(auxiliaryService)); this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret); this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false); httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); @@ -261,7 +264,7 @@ public class ShuffleManager implements FetcherCallback { localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class); this.localhostName = inputContext.getExecutionContext().getHostName(); final ByteBuffer shuffleMetaData = - inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + inputContext.getServiceProviderMetaData(auxiliaryService); this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData); /** http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index afd280b..a83c301 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -55,6 +55,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; @@ -327,8 +328,10 @@ class ShuffleScheduler { this.applicationId = inputContext.getApplicationId().toString(); this.dagId = inputContext.getDagIdentifier(); this.localHostname = inputContext.getExecutionContext().getHostName(); + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); final ByteBuffer shuffleMetadata = - inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + inputContext.getServiceProviderMetaData(auxiliaryService); this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); this.referee = new Referee(); @@ -371,7 +374,7 @@ class ShuffleScheduler { this.conf, UserGroupInformation.getCurrentUser().getShortUserName()); SecretKey jobTokenSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext - .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); + .getServiceConsumerMetaData(auxiliaryService)); this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret); ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 897d7d7..e468a55 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -350,7 +350,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats()); + reportDetailedPartitionStats(), this.conf); outputContext.sendEvents(events); LOG.info(outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); @@ -673,7 +673,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats()); + reportDetailedPartitionStats(), this.conf); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 69bfdb8..21c40e9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -1133,7 +1133,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, - partitionStats, reportDetailedPartitionStats()); + partitionStats, reportDetailedPartitionStats(), this.conf); LOG.info(outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 152096c..760daf5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -52,6 +52,7 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.OutputContext; @@ -1084,8 +1085,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @VisibleForTesting int getShufflePort() throws IOException { + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); ByteBuffer shuffleMetadata = outputContext - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + .getServiceProviderMetaData(auxiliaryService); int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); return shufflePort; } http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 9a3d778..13e27eb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent, getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf), getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(), - sorter.getPartitionStats(), sorter.reportDetailedPartitionStats()); + sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf); } return eventList; } http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 4233f5d..03ddfa5 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -17,6 +17,7 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.OutputContext; @@ -100,7 +101,8 @@ public class TestShuffleUtils { serviceProviderMetaData.writeInt(80); doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext) .getServiceProviderMetaData - (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doReturn(1).when(outputContext).getTaskVertexIndex(); @@ -115,8 +117,8 @@ public class TestShuffleUtils { @Before public void setup() throws Exception { - outputContext = createTezOutputContext(); conf = new Configuration(); + outputContext = createTezOutputContext(); conf.set("fs.defaultFS", "file:///"); localFs = FileSystem.getLocal(conf); @@ -163,7 +165,7 @@ public class TestShuffleUtils { String pathComponent = "/attempt_x_y_0/file.out"; ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false); + physicalOutputs, true, pathComponent, null, false, this.conf); Assert.assertTrue(events.size() == 1); Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@ -202,7 +204,7 @@ public class TestShuffleUtils { //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false); + physicalOutputs, true, pathComponent, null, false, this.conf); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); @@ -243,7 +245,7 @@ public class TestShuffleUtils { //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false); + physicalOutputs, true, pathComponent, null, false, this.conf); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index 6bcbeb6..e085d1a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -44,6 +44,7 @@ import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; @@ -167,7 +168,8 @@ public class TestShuffleInputEventHandlerImpl { doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); doReturn(shuffleMetaData).when(inputContext) - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doReturn(executionContext).when(inputContext).getExecutionContext(); return inputContext; } @@ -183,7 +185,8 @@ public class TestShuffleInputEventHandlerImpl { Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null)); token.write(out); doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData( - TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID); + conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2, http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index a5608ef..34ca13f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -44,6 +44,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; @@ -144,7 +145,8 @@ public class TestShuffleManager { doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); doReturn(shuffleMetaData).when(inputContext) - .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doReturn(executionContext).when(inputContext).getExecutionContext(); return inputContext; } @@ -165,7 +167,8 @@ public class TestShuffleManager { token.write(out); doReturn(ByteBuffer.wrap(out.getData())).when(inputContext). getServiceConsumerMetaData( - TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID); + conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); return new ShuffleManagerForTest(inputContext, conf, http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 80e7b14..c3f8dda 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -34,6 +34,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.OutputContext; @@ -99,7 +100,7 @@ public class TestPipelinedSorter { ApplicationId appId = ApplicationId.newInstance(10000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - this.outputContext = createMockOutputContext(counters, appId, uniqueId); + this.outputContext = createMockOutputContext(counters, appId, uniqueId, getConf()); } public static Configuration getConf() { @@ -753,7 +754,7 @@ public class TestPipelinedSorter { } private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, - String uniqueId) throws IOException { + String uniqueId, Configuration conf) throws IOException { OutputContext outputContext = mock(OutputContext.class); ExecutionContext execContext = new ExecutionContextImpl("localhost"); @@ -762,7 +763,8 @@ public class TestPipelinedSorter { serviceProviderMetaData.writeInt(80); doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext) .getServiceProviderMetaData - (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doReturn(execContext).when(outputContext).getExecutionContext(); doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter(); http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index e0374a3..73d249c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -49,6 +49,7 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; @@ -463,7 +464,8 @@ public class TestDefaultSorter { doReturn("v1").when(context).getDestinationVertexName(); doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context) .getServiceProviderMetaData - (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { long requestedSize = (Long) invocation.getArguments()[0]; http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 41b2b97..4a0d1d5 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.UUID; import com.google.protobuf.ByteString; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; @@ -117,6 +118,7 @@ public class TestUnorderedPartitionedKVWriter { private boolean shouldCompress; private ReportPartitionStats reportPartitionStats; + private Configuration defaultConf = new Configuration(); public TestUnorderedPartitionedKVWriter(boolean shouldCompress, ReportPartitionStats reportPartitionStats) { @@ -160,7 +162,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); int maxSingleBufferSizeBytes = 2047; Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, @@ -256,7 +258,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); Random random = new Random(); Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress, @@ -524,7 +526,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1); @@ -708,7 +710,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1); @@ -899,7 +901,7 @@ public class TestUnorderedPartitionedKVWriter { } private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, - String uniqueId) { + String uniqueId, Configuration conf) { OutputContext outputContext = mock(OutputContext.class); doReturn(counters).when(outputContext).getCounters(); doReturn(appId).when(outputContext).getApplicationId(); @@ -922,7 +924,8 @@ public class TestUnorderedPartitionedKVWriter { portBuffer.reset(); return portBuffer; } - }).when(outputContext).getServiceProviderMetaData(eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID)); + }).when(outputContext).getServiceProviderMetaData(eq(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT))); Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId); String[] outDirs = new String[] { outDirBase.toString() }; http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 93c4f92..7762025 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text; import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; @@ -407,7 +408,8 @@ public class TestOnFileSortedOutput { doReturn("v1").when(context).getDestinationVertexName(); doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context) .getServiceProviderMetaData - (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + (conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { long requestedSize = (Long) invocation.getArguments()[0]; http://git-wip-us.apache.org/repos/asf/tez/blob/53ea6f5b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 38a60a2..bf55cb3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -41,6 +41,7 @@ import java.util.Map; import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,7 +231,8 @@ public class TestOnFileUnorderedKVOutput { ByteBuffer bb = ByteBuffer.allocate(4); bb.putInt(shufflePort); bb.position(0); - AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv); + AuxiliaryServiceHelper.setServiceDataIntoEnv(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT), bb, auxEnv); OutputDescriptor outputDescriptor = mock(OutputDescriptor.class);
