Repository: tez Updated Branches: refs/heads/TEZ-3334 85e77b9cc -> bee148439
TEZ-3509. Make DAG Deletion path based (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bee14843 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bee14843 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bee14843 Branch: refs/heads/TEZ-3334 Commit: bee1484394e16b10e55269526e99a54748d838e0 Parents: 85e77b9 Author: Jonathan Eagles <[email protected]> Authored: Fri Nov 18 11:34:46 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Nov 18 11:34:46 2016 -0600 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 12 +++ .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../app/launcher/ContainerLauncherManager.java | 7 +- .../app/launcher/ContainerLauncherWrapper.java | 4 +- .../tez/dag/app/launcher/DagDeleteRunnable.java | 20 ++-- .../app/launcher/LocalContainerLauncher.java | 58 ++++------ .../app/launcher/TezContainerLauncherImpl.java | 56 ++++------ .../apache/tez/auxservices/ShuffleHandler.java | 8 +- .../tez/auxservices/TestShuffleHandler.java | 2 +- .../runtime/library/common/TezRuntimeUtils.java | 106 ++++++++++++++++++- .../library/common/shuffle/ShuffleUtils.java | 91 +--------------- 11 files changed, 185 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index ce344bf..22cd80e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1655,6 +1655,18 @@ public class TezConfiguration extends Configuration { public static final String TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT = "org.apache.tez.dag.history.recovery.RecoveryService"; /** + * String value that is a class name. + * Specify the class to use for Deletion tracking. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_DELETION_TRACKER_CLASS = + TEZ_PREFIX + "history.logging.service.class"; + + public static final String TEZ_DELETION_TRACKER_CLASS_DEFAULT = + "org.apache.tez.dag.app.launcher.DeletionTrackerImpl"; + + /** * Boolean value. Default false. * By default, configured values for the Summary Entity Types for Timeline will * not be respected and be overridden by the Timeline History Service. http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 605e6f5..646eefb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -842,7 +842,7 @@ public class DAGAppMaster extends AbstractService { DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event; LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + cleanupEvent.getDag().getID()); - containerLauncherManager.dagComplete(cleanupEvent.getDag(), jobTokenSecretManager); + containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), jobTokenSecretManager); taskCommunicatorManager.dagComplete(cleanupEvent.getDag()); nodes.dagComplete(cleanupEvent.getDag()); containers.dagComplete(cleanupEvent.getDag()); http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index 3bbb602..e3f96ea 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -34,6 +34,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -42,7 +43,6 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerLauncherContextImpl; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; -import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.ContainerLauncherEvent; import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent; import org.apache.tez.serviceplugins.api.DagInfo; @@ -146,7 +146,7 @@ public class ContainerLauncherManager extends AbstractService AppContext context, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, - boolean isLocalMode) { + boolean isLocalMode) throws TezException { LOG.info("Creating LocalContainerLauncher"); // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of // extensive internals which are only available at runtime. Will likely require @@ -194,8 +194,7 @@ public class ContainerLauncherManager extends AbstractService } } - public void dagComplete(DAG dag, JobTokenSecretManager secretManager) { - + public void dagComplete(TezDAGID dag, JobTokenSecretManager secretManager) { for (int i = 0 ; i < containerLaunchers.length ; i++) { containerLaunchers[i].dagComplete(dag, secretManager); } http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java index 5f5f66e..c70ab10 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java @@ -15,7 +15,7 @@ package org.apache.tez.dag.app.launcher; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerStopRequest; @@ -40,7 +40,7 @@ public class ContainerLauncherWrapper { return real; } - public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) { + public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { if (real instanceof TezContainerLauncherImpl) { ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager); } http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java index fefaf69..669d539 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java @@ -18,26 +18,30 @@ package org.apache.tez.dag.app.launcher; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.http.BaseHttpConnection; -import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import java.net.URL; class DagDeleteRunnable implements Runnable { final NodeId nodeId; - final DAG dag; + final TezDAGID dag; final JobTokenSecretManager jobTokenSecretManager; final String tezDefaultComponentName; final int shufflePort; + final Configuration conf; - public DagDeleteRunnable(NodeId nodeId, int shufflePort, DAG currentDag, + public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag, + Configuration conf, JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent) { this.nodeId = nodeId; this.shufflePort = shufflePort; this.dag = currentDag; + this.conf = conf; this.jobTokenSecretManager = jobTokenSecretMgr; this.tezDefaultComponentName = tezDefaultComponent; } @@ -45,11 +49,11 @@ class DagDeleteRunnable implements Runnable { @Override public void run() { try { - URL baseURL = ShuffleUtils.constructBaseURIForShuffleHandlerDagComplete( + URL baseURL = TezRuntimeUtils.constructBaseURIForShuffleHandlerDagComplete( nodeId.getHost(), shufflePort, - dag.getID().getApplicationId().toString(), dag.getID().getId(), false); - BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection(true, baseURL, - ShuffleUtils.getHttpConnectionParams(dag.getConf()), "DAGDelete", jobTokenSecretManager); + dag.getApplicationId().toString(), dag.getId(), false); + BaseHttpConnection httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, + TezRuntimeUtils.getHttpConnectionParams(conf), "DAGDelete", jobTokenSecretManager); httpConnection.connect(); httpConnection.getInputStream(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index eb9b459..b6f725c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -45,11 +45,13 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -72,7 +74,6 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.task.TezChild; @@ -93,9 +94,8 @@ public class LocalContainerLauncher extends ContainerLauncher { private final ExecutionContext executionContext; private final int numExecutors; private final boolean isLocalMode; - int shufflePort = ShuffleUtils.UNDEFINED_PORT; - private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>(); - private ExecutorService dagDeleteService; + int shufflePort = TezRuntimeUtils.INVALID_PORT; + private DeletionTracker deletionTracker; boolean shouldDelete; private final ConcurrentHashMap<ContainerId, RunningTaskCallback> @@ -116,7 +116,7 @@ public class LocalContainerLauncher extends ContainerLauncher { AppContext context, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, - boolean isLocalMode) throws UnknownHostException { + boolean isLocalMode) throws UnknownHostException, TezException { // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM // starts up. It's not possible to set these up via a static payload. // Will need some kind of mechanism to dynamically crate payloads / bind to parameters @@ -146,7 +146,7 @@ public class LocalContainerLauncher extends ContainerLauncher { AuxiliaryServiceHelper.setServiceDataIntoEnv( auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv); try { - shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData( + shufflePort = TezRuntimeUtils.deserializeShuffleProviderMetaData( AuxiliaryServiceHelper.getServiceDataFromEnv(auxiliaryService, localEnv)); } catch (IOException e) { LOG.warn("Could not extract shuffle aux-service port!"); @@ -161,12 +161,17 @@ public class LocalContainerLauncher extends ContainerLauncher { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread #%d") .build()); this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor); - dagDeleteService = Executors.newFixedThreadPool( - conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT, - TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build()); shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT); + String tezDefaultComponentName = + isLocalMode ? TezConstants.getTezUberServicePluginName() : + TezConstants.getTezYarnServicePluginName(); + String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_DELETION_TRACKER_CLASS, + TezConfiguration.TEZ_DELETION_TRACKER_CLASS_DEFAULT); + deletionTracker = ReflectionUtils.createClazzInstance( + deletionTrackerClassName,new Class[] { + Map.class, Configuration.class, String.class}, + new Object[] {new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName}); } @Override @@ -190,9 +195,8 @@ public class LocalContainerLauncher extends ContainerLauncher { taskExecutorService.shutdownNow(); } callbackExecutor.shutdownNow(); - if (dagDeleteService != null) { - dagDeleteService.shutdown(); - dagDeleteService = null; + if (deletionTracker != null) { + deletionTracker.shutdown(); } } @@ -272,11 +276,7 @@ public class LocalContainerLauncher extends ContainerLauncher { runningContainers.put(event.getContainerId(), callback); Futures.addCallback(runningTaskFuture, callback, callbackExecutor); - if (isLocalMode && shufflePort != ShuffleUtils.UNDEFINED_PORT) { - if(nodeIdShufflePortMap.get(event.getNodeId()) == null) { - nodeIdShufflePortMap.put(event.getNodeId(), shufflePort); - } - } + deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); } catch (RejectedExecutionException e) { handleLaunchFailed(e, event.getContainerId()); } @@ -413,24 +413,8 @@ public class LocalContainerLauncher extends ContainerLauncher { } } - public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) { - if (!shouldDelete) { - return; - } - String tezDefaultComponentName = - isLocalMode ? TezConstants.getTezUberServicePluginName() : - TezConstants.getTezYarnServicePluginName(); - for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) { - NodeId nodeId = entry.getKey(); - int shufflePort = entry.getValue(); - //TODO: add check for healthy node - if (shufflePort != ShuffleUtils.UNDEFINED_PORT) { - DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, - shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName); - dagDeleteService.submit(dagDeleteRunnable); - } - } - nodeIdShufflePortMap.clear(); + public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { + deletionTracker.dagComplete(dag, jobTokenSecretManager); } } http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index 0726d86..3ad3488 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; @@ -37,11 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -89,8 +89,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); private ContainerManagementProtocolProxy cmProxy; private AtomicBoolean serviceStopped = new AtomicBoolean(false); - private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>(); - private ExecutorService dagDeleteService; + private DeletionTracker deletionTracker; private Container getContainer(ContainerOp event) { ContainerId id = event.getBaseOperation().getContainerId(); @@ -177,7 +176,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { getContext().containerLaunched(containerID); this.state = ContainerState.RUNNING; - int shufflePort = ShuffleUtils.UNDEFINED_PORT; + int shufflePort = TezRuntimeUtils.INVALID_PORT; ByteBuffer portInfo = response.getAllServicesMetaData().get( conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, @@ -188,11 +187,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { shufflePort = in.readInt(); } - if (shufflePort != ShuffleUtils.UNDEFINED_PORT) { - if(nodeIdShufflePortMap.get(event.getNodeId()) == null) { - nodeIdShufflePortMap.put(event.getNodeId(), shufflePort); - } - } + deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + ExceptionUtils.getStackTrace(t); @@ -269,7 +264,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { } @Override - public void start() { + public void start() throws TezException { // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed cmProxy = new ContainerManagementProtocolProxy(conf); @@ -330,10 +325,12 @@ public class TezContainerLauncherImpl extends ContainerLauncher { }; eventHandlingThread.setName("ContainerLauncher Event Handler"); eventHandlingThread.start(); - dagDeleteService = Executors.newFixedThreadPool( - conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT, - TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build()); + String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_DELETION_TRACKER_CLASS, + TezConfiguration.TEZ_DELETION_TRACKER_CLASS_DEFAULT); + deletionTracker = ReflectionUtils.createClazzInstance( + deletionTrackerClassName,new Class[] { + Map.class, Configuration.class, String.class}, + new Object[] {new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()}); } @Override @@ -348,9 +345,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher { if (launcherPool != null) { launcherPool.shutdownNow(); } - if (dagDeleteService != null) { - dagDeleteService.shutdown(); - dagDeleteService = null; + if (deletionTracker != null) { + deletionTracker.shutdown(); } } @@ -435,24 +431,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher { } } - public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) { - boolean shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, - TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT); - if (!shouldDelete) { - return; - } - String tezDefaultComponentName = TezConstants.getTezYarnServicePluginName(); - for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) { - NodeId nodeId = entry.getKey(); - int shufflePort = entry.getValue(); - //TODO: add check for healthy node - if (shufflePort != ShuffleUtils.UNDEFINED_PORT) { - DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, - shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName); - dagDeleteService.submit(dagDeleteRunnable); - } - } - nodeIdShufflePortMap.clear(); + public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { + deletionTracker.dagComplete(dag, jobTokenSecretManager); } } http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 2991a55..fdaba86 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -905,7 +905,7 @@ public class ShuffleHandler extends AuxiliaryService { final Map<String,List<String>> q = new QueryStringDecoder(request.getUri()).getParameters(); final List<String> keepAliveList = q.get("keepAlive"); - final List<String> dagCompletedQ = q.get("dagCompleted"); + final List<String> dagCompletedQ = q.get("dagAction"); boolean keepAliveParam = false; if (keepAliveList != null && keepAliveList.size() == 1) { keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0)); @@ -1006,7 +1006,11 @@ public class ShuffleHandler extends AuxiliaryService { private boolean deleteDagDirectories(MessageEvent evt, List<String> dagCompletedQ, List<String> jobQ, List<String> dagIdQ) { - if (dagCompletedQ != null && !dagCompletedQ.isEmpty()) { + if (jobQ == null || jobQ.isEmpty()) { + return false; + } + if (dagCompletedQ != null && !dagCompletedQ.isEmpty() && dagCompletedQ.get(0).contains("delete") + && dagIdQ != null && !dagIdQ.isEmpty()) { String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0))); try { LocalFileSystem lfs = FileSystem.getLocal(conf); http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 3d622e6..ebd9c5d 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -1115,7 +1115,7 @@ public class TestShuffleHandler { "http://127.0.0.1:" + shuffleHandler.getConfig().get( ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_0001&dag=1&dagCompleted=true"); + + "/mapOutput?dagAction=delete&job=job_12345_0001&dag=1"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index c0b7210..d39d554 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -21,7 +21,17 @@ package org.apache.tez.runtime.library.common; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.ByteBuffer; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.http.BaseHttpConnection; +import org.apache.tez.http.HttpConnection; +import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.http.SSLFactory; +import org.apache.tez.http.async.netty.AsyncHttpConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -40,7 +50,11 @@ public class TezRuntimeUtils { private static final Logger LOG = LoggerFactory .getLogger(TezRuntimeUtils.class); - + //Shared by multiple threads + private static volatile SSLFactory sslFactory; + //ShufflePort by default for ContainerLaunchers + public static final int INVALID_PORT = -1; + public static String getTaskIdentifier(String vertexName, int taskIndex) { return String.format("%s_%06d", vertexName, taskIndex); } @@ -159,4 +173,94 @@ public class TezRuntimeUtils { TezTaskOutputFiles.class.getName()), e); } } + + public static URL constructBaseURIForShuffleHandlerDagComplete( + String host, int port, String appId, int dagIdentifier, boolean sslShuffle) + throws MalformedURLException { + final String http_protocol = (sslShuffle) ? "https://" : "http://"; + StringBuilder sb = new StringBuilder(http_protocol); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append("/"); + sb.append("mapOutput?dagAction=delete"); + sb.append("&job="); + sb.append(appId.replace("application", "job")); + sb.append("&dag="); + sb.append(String.valueOf(dagIdentifier)); + return new URL(sb.toString()); + } + + public static HttpConnectionParams getHttpConnectionParams(Configuration conf) { + int connectionTimeout = + conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT); + + int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT); + + int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT); + + boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT); + + int keepAliveMaxConnections = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT); + + if (keepAlive) { + System.setProperty("sun.net.http.errorstream.enableBuffering", "true"); + System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections)); + } + + boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); + if (sslShuffle) { + if (sslFactory == null) { + synchronized (HttpConnectionParams.class) { + //Create sslFactory if it is null or if it was destroyed earlier + if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() == null) { + sslFactory = + new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf); + try { + sslFactory.init(); + } catch (Exception ex) { + sslFactory.destroy(); + sslFactory = null; + throw new RuntimeException(ex); + } + } + } + } + } + + HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive, + keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle, + sslFactory); + return httpConnParams; + } + + public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url, + HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager) + throws IOException { + if (asyncHttp) { + //TODO: support other async packages? httpclient-async? + return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager); + } else { + return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager); + } + } + + public static int deserializeShuffleProviderMetaData(ByteBuffer meta) + throws IOException { + DataInputByteBuffer in = new DataInputByteBuffer(); + try { + in.reset(meta); + int port = in.readInt(); + return port; + } finally { + in.close(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index ed2e26e..64a10d2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -41,11 +41,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.BaseHttpConnection; -import org.apache.tez.http.HttpConnection; import org.apache.tez.http.HttpConnectionParams; -import org.apache.tez.http.SSLFactory; -import org.apache.tez.http.async.netty.AsyncHttpConnection; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; @@ -62,7 +60,6 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; @@ -76,10 +73,6 @@ public class ShuffleUtils { private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class); private static final long MB = 1024l * 1024l; - public static final int UNDEFINED_PORT = -1; - //Shared by multiple threads - private static volatile SSLFactory sslFactory; - static final ThreadLocal<DecimalFormat> MBPS_FORMAT = new ThreadLocal<DecimalFormat>() { @Override @@ -105,14 +98,7 @@ public class ShuffleUtils { public static int deserializeShuffleProviderMetaData(ByteBuffer meta) throws IOException { - DataInputByteBuffer in = new DataInputByteBuffer(); - try { - in.reset(meta); - int port = in.readInt(); - return port; - } finally { - in.close(); - } + return TezRuntimeUtils.deserializeShuffleProviderMetaData(meta); } public static void shuffleToMemory(byte[] shuffleData, @@ -223,23 +209,6 @@ public class ShuffleUtils { return sb; } - public static URL constructBaseURIForShuffleHandlerDagComplete( - String host, int port, String appId, int dagIdentifier, boolean sslShuffle) - throws MalformedURLException{ - final String http_protocol = (sslShuffle) ? "https://" : "http://"; - StringBuilder sb = new StringBuilder(http_protocol); - sb.append(host); - sb.append(":"); - sb.append(port); - sb.append("/"); - sb.append("mapOutput?job="); - sb.append(appId.replace("application", "job")); - sb.append("&dag="); - sb.append(String.valueOf(dagIdentifier)); - sb.append("&dagCompleted=true"); - return new URL(sb.toString()); - } - public static URL constructInputURL(String baseURI, Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException { StringBuilder url = new StringBuilder(baseURI); @@ -263,12 +232,7 @@ public class ShuffleUtils { public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url, HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException { - if (asyncHttp) { - //TODO: support other async packages? httpclient-async? - return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager); - } else { - return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager); - } + return TezRuntimeUtils.getHttpConnection(asyncHttp, url, params, logIdentifier, jobTokenSecretManager); } public static String stringify(DataMovementEventPayloadProto dmProto) { @@ -589,54 +553,7 @@ public class ShuffleUtils { * @return HttpConnectionParams */ public static HttpConnectionParams getHttpConnectionParams(Configuration conf) { - int connectionTimeout = - conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT); - - int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT); - - int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT); - - boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT); - - int keepAliveMaxConnections = conf.getInt( - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT); - - if (keepAlive) { - System.setProperty("sun.net.http.errorstream.enableBuffering", "true"); - System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections)); - } - - boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); - - if (sslShuffle) { - if (sslFactory == null) { - synchronized (HttpConnectionParams.class) { - //Create sslFactory if it is null or if it was destroyed earlier - if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() == null) { - sslFactory = - new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf); - try { - sslFactory.init(); - } catch (Exception ex) { - sslFactory.destroy(); - sslFactory = null; - throw new RuntimeException(ex); - } - } - } - } - } - - HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive, - keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle, - sslFactory); - return httpConnParams; + return TezRuntimeUtils.getHttpConnectionParams(conf); } public static boolean isTezShuffleHandler(Configuration config) {
