TEZ-3362. Delete intermediate data at DAG level for Shuffle Handler
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fec0c5c5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fec0c5c5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fec0c5c5 Branch: refs/heads/master Commit: fec0c5c5c5d47dc6b3f5a3bcce5513c9d261dc58 Parents: 613f1e0 Author: Jonathan Eagles <[email protected]> Authored: Tue Oct 25 13:20:40 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Oct 25 13:20:40 2016 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 19 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../app/launcher/ContainerLauncherManager.java | 8 +- .../app/launcher/ContainerLauncherWrapper.java | 11 ++ .../tez/dag/app/launcher/DagDeleteRunnable.java | 60 +++++++ .../app/launcher/LocalContainerLauncher.java | 50 ++++++ .../app/launcher/TezContainerLauncherImpl.java | 58 ++++++ tez-plugins/tez-aux-services/pom.xml | 29 +++ .../apache/tez/auxservices/ShuffleHandler.java | 44 ++++- .../tez/auxservices/TestShuffleHandler.java | 81 +++++++++ .../tez/auxservices/TestShuffleHandlerJobs.java | 175 +++++++++++++++++++ .../library/common/shuffle/ShuffleUtils.java | 18 ++ .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../output/OrderedPartitionedKVOutput.java | 1 + .../library/output/UnorderedKVOutput.java | 1 + .../org/apache/tez/test/MiniTezCluster.java | 18 +- 17 files changed, 558 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 732fee9..ce344bf 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -655,7 +655,24 @@ public class TezConfiguration extends Configuration { public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX + "resource.cpu.vcores"; public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1; - + + /** Boolean value. Instructs AM to delete Dag directory upon completion */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_DAG_DELETE_ENABLED = TEZ_AM_PREFIX + + "dag.delete.enabled"; + public static final boolean TEZ_AM_DAG_DELETE_ENABLED_DEFAULT = false; + + /** + * Int value. Upper limit on the number of threads used to delete DAG directories on nodes. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT = + TEZ_AM_PREFIX + "dag.deletion.thread-count-limit"; + + public static final int TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT = 30; + /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across * all vertices. Setting it to the same value for all tasks is helpful for container reuse and * thus good for performance typically. */ http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 7ca7118..605e6f5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -842,7 +842,7 @@ public class DAGAppMaster extends AbstractService { DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event; LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + cleanupEvent.getDag().getID()); - containerLauncherManager.dagComplete(cleanupEvent.getDag()); + containerLauncherManager.dagComplete(cleanupEvent.getDag(), jobTokenSecretManager); taskCommunicatorManager.dagComplete(cleanupEvent.getDag()); nodes.dagComplete(cleanupEvent.getDag()); containers.dagComplete(cleanupEvent.getDag()); http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index f2c1cff..3bbb602 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.Utils; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; @@ -193,8 +194,11 @@ public class ContainerLauncherManager extends AbstractService } } - public void dagComplete(DAG dag) { - // Nothing required at the moment. Containers are shared across DAGs + public void dagComplete(DAG dag, JobTokenSecretManager secretManager) { + + for (int i = 0 ; i < containerLaunchers.length ; i++) { + containerLaunchers[i].dagComplete(dag, secretManager); + } } public void dagSubmitted() { http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java index 08e287e..5f5f66e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java @@ -14,6 +14,8 @@ package org.apache.tez.dag.app.launcher; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerStopRequest; @@ -37,4 +39,13 @@ public class ContainerLauncherWrapper { public ContainerLauncher getContainerLauncher() { return real; } + + public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) { + if (real instanceof TezContainerLauncherImpl) { + ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager); + } + if (real instanceof LocalContainerLauncher) { + ((LocalContainerLauncher)real).dagComplete(dag, jobTokenSecretManager); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java new file mode 100644 index 0000000..fefaf69 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.http.BaseHttpConnection; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; + +import java.net.URL; + +class DagDeleteRunnable implements Runnable { + final NodeId nodeId; + final DAG dag; + final JobTokenSecretManager jobTokenSecretManager; + final String tezDefaultComponentName; + final int shufflePort; + + public DagDeleteRunnable(NodeId nodeId, int shufflePort, DAG currentDag, + JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent) { + this.nodeId = nodeId; + this.shufflePort = shufflePort; + this.dag = currentDag; + this.jobTokenSecretManager = jobTokenSecretMgr; + this.tezDefaultComponentName = tezDefaultComponent; + } + + @Override + public void run() { + try { + URL baseURL = ShuffleUtils.constructBaseURIForShuffleHandlerDagComplete( + nodeId.getHost(), shufflePort, + dag.getID().getApplicationId().toString(), dag.getID().getId(), false); + BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection(true, baseURL, + ShuffleUtils.getHttpConnectionParams(dag.getConf()), "DAGDelete", jobTokenSecretManager); + httpConnection.connect(); + httpConnection.getInputStream(); + } catch (Exception e) { + TezContainerLauncherImpl.LOG.warn("Could not setup HTTP Connection to the node " + nodeId.getHost() + " for dag delete " + + e); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 153b2e0..eb9b459 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -44,8 +44,12 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezUtils; import org.apache.tez.hadoop.shim.DefaultHadoopShim; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -89,6 +93,10 @@ public class LocalContainerLauncher extends ContainerLauncher { private final ExecutionContext executionContext; private final int numExecutors; private final boolean isLocalMode; + int shufflePort = ShuffleUtils.UNDEFINED_PORT; + private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>(); + private ExecutorService dagDeleteService; + boolean shouldDelete; private final ConcurrentHashMap<ContainerId, RunningTaskCallback> runningContainers = @@ -137,6 +145,12 @@ public class LocalContainerLauncher extends ContainerLauncher { localEnv = Maps.newHashMap(); AuxiliaryServiceHelper.setServiceDataIntoEnv( auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv); + try { + shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData( + AuxiliaryServiceHelper.getServiceDataFromEnv(auxiliaryService, localEnv)); + } catch (IOException e) { + LOG.warn("Could not extract shuffle aux-service port!"); + } } else { localEnv = System.getenv(); } @@ -147,6 +161,12 @@ public class LocalContainerLauncher extends ContainerLauncher { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread #%d") .build()); this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor); + dagDeleteService = Executors.newFixedThreadPool( + conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT, + TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build()); + shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, + TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT); } @Override @@ -170,6 +190,10 @@ public class LocalContainerLauncher extends ContainerLauncher { taskExecutorService.shutdownNow(); } callbackExecutor.shutdownNow(); + if (dagDeleteService != null) { + dagDeleteService.shutdown(); + dagDeleteService = null; + } } @@ -247,6 +271,12 @@ public class LocalContainerLauncher extends ContainerLauncher { RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId()); runningContainers.put(event.getContainerId(), callback); Futures.addCallback(runningTaskFuture, callback, callbackExecutor); + + if (isLocalMode && shufflePort != ShuffleUtils.UNDEFINED_PORT) { + if(nodeIdShufflePortMap.get(event.getNodeId()) == null) { + nodeIdShufflePortMap.put(event.getNodeId(), shufflePort); + } + } } catch (RejectedExecutionException e) { handleLaunchFailed(e, event.getContainerId()); } @@ -383,4 +413,24 @@ public class LocalContainerLauncher extends ContainerLauncher { } } + public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) { + if (!shouldDelete) { + return; + } + String tezDefaultComponentName = + isLocalMode ? TezConstants.getTezUberServicePluginName() : + TezConstants.getTezYarnServicePluginName(); + for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) { + NodeId nodeId = entry.getKey(); + int shufflePort = entry.getValue(); + //TODO: add check for healthy node + if (shufflePort != ShuffleUtils.UNDEFINED_PORT) { + DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, + shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName); + dagDeleteService.submit(dagDeleteRunnable); + } + } + nodeIdShufflePortMap.clear(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index 1521dcb..0726d86 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -19,9 +19,14 @@ package org.apache.tez.dag.app.launcher; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; @@ -30,8 +35,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -79,6 +89,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher { protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); private ContainerManagementProtocolProxy cmProxy; private AtomicBoolean serviceStopped = new AtomicBoolean(false); + private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>(); + private ExecutorService dagDeleteService; private Container getContainer(ContainerOp event) { ContainerId id = event.getBaseOperation().getContainerId(); @@ -164,6 +176,23 @@ public class TezContainerLauncherImpl extends ContainerLauncher { // it from ASSIGNED to RUNNING state getContext().containerLaunched(containerID); this.state = ContainerState.RUNNING; + + int shufflePort = ShuffleUtils.UNDEFINED_PORT; + ByteBuffer portInfo = + response.getAllServicesMetaData().get( + conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); + if (portInfo != null) { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(portInfo); + shufflePort = in.readInt(); + } + + if (shufflePort != ShuffleUtils.UNDEFINED_PORT) { + if(nodeIdShufflePortMap.get(event.getNodeId()) == null) { + nodeIdShufflePortMap.put(event.getNodeId(), shufflePort); + } + } } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + ExceptionUtils.getStackTrace(t); @@ -301,6 +330,10 @@ public class TezContainerLauncherImpl extends ContainerLauncher { }; eventHandlingThread.setName("ContainerLauncher Event Handler"); eventHandlingThread.start(); + dagDeleteService = Executors.newFixedThreadPool( + conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT, + TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build()); } @Override @@ -315,6 +348,10 @@ public class TezContainerLauncherImpl extends ContainerLauncher { if (launcherPool != null) { launcherPool.shutdownNow(); } + if (dagDeleteService != null) { + dagDeleteService.shutdown(); + dagDeleteService = null; + } } protected EventProcessor createEventProcessor(ContainerOp event) { @@ -397,4 +434,25 @@ public class TezContainerLauncherImpl extends ContainerLauncher { throw new TezUncheckedException(e); } } + + public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) { + boolean shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, + TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT); + if (!shouldDelete) { + return; + } + String tezDefaultComponentName = TezConstants.getTezYarnServicePluginName(); + for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) { + NodeId nodeId = entry.getKey(); + int shufflePort = entry.getValue(); + //TODO: add check for healthy node + if (shufflePort != ShuffleUtils.UNDEFINED_PORT) { + DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, + shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName); + dagDeleteService.submit(dagDeleteRunnable); + } + } + nodeIdShufflePortMap.clear(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml index 93fef71..e9fdd3f 100644 --- a/tez-plugins/tez-aux-services/pom.xml +++ b/tez-plugins/tez-aux-services/pom.xml @@ -103,6 +103,35 @@ <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index b00c28f..3799cbe 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -56,7 +56,9 @@ import java.util.regex.Pattern; import javax.crypto.SecretKey; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -899,6 +901,7 @@ public class ShuffleHandler extends AuxiliaryService { final Map<String,List<String>> q = new QueryStringDecoder(request.getUri()).getParameters(); final List<String> keepAliveList = q.get("keepAlive"); + final List<String> dagCompletedQ = q.get("dagCompleted"); boolean keepAliveParam = false; if (keepAliveList != null && keepAliveList.size() == 1) { keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0)); @@ -919,7 +922,10 @@ public class ShuffleHandler extends AuxiliaryService { "\n dagId: " + dagIdQ + "\n keepAlive: " + keepAliveParam); } - + // If the request is for Dag Deletion, process the request and send OK. + if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ)) { + return; + } if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) { sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST); return; @@ -993,6 +999,26 @@ public class ShuffleHandler extends AuxiliaryService { } } + private boolean deleteDagDirectories(MessageEvent evt, + List<String> dagCompletedQ, List<String> jobQ, + List<String> dagIdQ) { + if (dagCompletedQ != null && !dagCompletedQ.isEmpty()) { + String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0))); + try { + LocalFileSystem lfs = FileSystem.getLocal(conf); + for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(base, conf)) { + lfs.delete(dagPath, true); + } + } catch (IOException e) { + LOG.warn("Encountered exception during dag delete "+ e); + } + evt.getChannel().write(new DefaultHttpResponse(HTTP_1_1, OK)); + evt.getChannel().close(); + return true; + } + return false; + } + /** * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend * and increments it. This method is first called by messageReceived() @@ -1050,16 +1076,22 @@ public class ShuffleHandler extends AuxiliaryService { } private String getBaseLocation(String jobId, String dagId, String user) { + final String baseStr = + getDagLocation(jobId, dagId, user) + "output" + Path.SEPARATOR; + return baseStr; + } + + private String getDagLocation(String jobId, String dagId, String user) { final JobID jobID = JobID.forName(jobId); final ApplicationId appID = ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()), - jobID.getId()); - final String baseStr = + jobID.getId()); + final String dagStr = USERCACHE + Path.SEPARATOR + user + Path.SEPARATOR + APPCACHE + Path.SEPARATOR - + appID.toString() + Path.SEPARATOR + Constants.DAG_PREFIX + - dagId + Path.SEPARATOR + "output" + Path.SEPARATOR; - return baseStr; + + appID.toString() + Path.SEPARATOR + Constants.DAG_PREFIX + dagId + + Path.SEPARATOR; + return dagStr; } protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 00db3c4..3d622e6 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -1063,6 +1063,87 @@ public class TestShuffleHandler { } } + @Test(timeout = 5000) + public void testDagDelete() throws Exception { + final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", TestShuffleHandler.class. + getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + List<File> fileMap = new ArrayList<File>(); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error(message)); + ctx.getChannel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = + new Token<JobTokenIdentifier>("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&dag=1&dagCompleted=true"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + String dagDirStr = + StringUtils.join(Path.SEPARATOR, + new String[] { absLogDir.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, + ShuffleHandler.APPCACHE, appId.toString(),"dag_1/"}); + File dagDir = new File(dagDirStr); + Assert.assertTrue("Dag Directory does not exist!", dagDir.exists()); + conn.connect(); + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + is.close(); + Assert.assertFalse("Dag Directory was not deleted!", dagDir.exists()); + } catch (EOFException e) { + // ignore + } + Assert.assertEquals("sendError called due to shuffle error", + 0, failures.size()); + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } + @Test(timeout = 4000) public void testSendMapCount() throws Exception { final List<ShuffleHandler.ReduceMapFileCount> listenerList = http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java new file mode 100644 index 0000000..c409bf8 --- /dev/null +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.auxservices; +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .GetApplicationReportRequestPBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.examples.OrderedWordCount; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import static org.apache.tez.test.TestTezJobs.generateOrderedWordCountInput; +import static org.apache.tez.test.TestTezJobs.verifyOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.test.MiniTezCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestShuffleHandlerJobs { + + private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandlerJobs.class); + + protected static MiniTezCluster mrrTezCluster; + protected static MiniDFSCluster dfsCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem remoteFs; + private static int NUM_NMS = 5; + private static int NUM_DNS = 5; + @BeforeClass + public static void setup() throws IOException { + try { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 22); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DNS) + .format(true).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + if (!(new File(MiniTezCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniTezCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (mrrTezCluster == null) { + mrrTezCluster = new MiniTezCluster(TestShuffleHandlerJobs.class.getName(), NUM_NMS, + 1, 1); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_AUX_SERVICES, + ShuffleHandler.TEZ_SHUFFLE_SERVICEID); + String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + ShuffleHandler.TEZ_SHUFFLE_SERVICEID); + conf.set(serviceStr, ShuffleHandler.class.getName()); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); + conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); + mrrTezCluster.init(conf); + mrrTezCluster.start(); + } + + } + + @AfterClass + public static void tearDown() { + if (mrrTezCluster != null) { + mrrTezCluster.stop(); + mrrTezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + @Test(timeout = 60000) + public void testOrderedWordCount() throws Exception { + String inputDirStr = "/tmp/owc-input/"; + Path inputDir = new Path(inputDirStr); + Path stagingDirPath = new Path("/tmp/owc-staging-dir"); + remoteFs.mkdirs(inputDir); + remoteFs.mkdirs(stagingDirPath); + generateOrderedWordCountInput(inputDir, remoteFs); + + String outputDirStr = "/tmp/owc-output/"; + Path outputDir = new Path(outputDirStr); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + tezConf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, ShuffleHandler.TEZ_SHUFFLE_SERVICEID); + tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, true); + tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); + tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); + TezClient tezSession = TezClient.create("WordCountTest", tezConf); + tezSession.start(); + try { + final OrderedWordCount job = new OrderedWordCount(); + Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{"-counter", + inputDirStr, outputDirStr, "10"}, tezSession)==0); + verifyOutput(outputDir, remoteFs); + tezSession.stop(); + ClientRMService rmService = mrrTezCluster.getResourceManager().getClientRMService(); + boolean isAppComplete = false; + while(!isAppComplete) { + GetApplicationReportResponse resp = rmService.getApplicationReport( + new GetApplicationReportRequest() { + @Override + public ApplicationId getApplicationId() { + return job.getAppId(); + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + } + }); + if (resp.getApplicationReport().getYarnApplicationState() == YarnApplicationState.FINISHED) { + isAppComplete = true; + } + Thread.sleep(100); + } + for(int i = 0; i < NUM_NMS; i++) { + String appPath = mrrTezCluster.getTestWorkDir() + "/" + this.getClass().getName() + + "-localDir-nm-" + i + "_0/usercache/" + UserGroupInformation.getCurrentUser().getUserName() + + "/appcache/" + job.getAppId(); + String dagPathStr = appPath + "/dag_1"; + + File fs = new File(dagPathStr); + Assert.assertFalse(fs.exists()); + fs = new File(appPath); + Assert.assertTrue(fs.exists()); + } + } finally { + remoteFs.delete(stagingDirPath, true); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 5d2444c..ed2e26e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -76,6 +76,7 @@ public class ShuffleUtils { private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class); private static final long MB = 1024l * 1024l; + public static final int UNDEFINED_PORT = -1; //Shared by multiple threads private static volatile SSLFactory sslFactory; @@ -222,6 +223,23 @@ public class ShuffleUtils { return sb; } + public static URL constructBaseURIForShuffleHandlerDagComplete( + String host, int port, String appId, int dagIdentifier, boolean sslShuffle) + throws MalformedURLException{ + final String http_protocol = (sslShuffle) ? "https://" : "http://"; + StringBuilder sb = new StringBuilder(http_protocol); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append("/"); + sb.append("mapOutput?job="); + sb.append(appId.replace("application", "job")); + sb.append("&dag="); + sb.append(String.valueOf(dagIdentifier)); + sb.append("&dagCompleted=true"); + return new URL(sb.toString()); + } + public static URL constructInputURL(String baseURI, Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException { StringBuilder url = new StringBuilder(baseURI); http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 9a2a23e..69506af 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -390,6 +390,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY); + confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 2d6683a..6a9dd62 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -283,6 +283,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY); + confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 13e27eb..6ebcac8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -245,6 +245,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); + confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 4f74f7d..db6ecda 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -173,6 +173,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS); + confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java index d7319c5..c727a8f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java +++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java @@ -173,15 +173,15 @@ public class MiniTezCluster extends MiniYARNCluster { conf.set(MRConfig.MASTER_ADDRESS, "test"); //configure the shuffle service in NM - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, - ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, - Service.class); - - // Non-standard shuffle port - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); - + if (conf.get(YarnConfiguration.NM_AUX_SERVICES) == null) { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[]{ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, + Service.class); + // Non-standard shuffle port + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + } conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class, ContainerExecutor.class);
