TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b81592ad Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b81592ad Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b81592ad Branch: refs/heads/master Commit: b81592ad656f70fc18e613b77db9fcc633dda006 Parents: 23b4338 Author: Jonathan Eagles <[email protected]> Authored: Thu May 4 15:33:23 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu May 4 15:33:23 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 10 +++---- .../tez/dag/app/launcher/DeletionTracker.java | 12 +------- .../dag/app/launcher/DeletionTrackerImpl.java | 29 ++++++++++++++------ .../app/launcher/LocalContainerLauncher.java | 27 ++++++++++++------ .../app/launcher/TezContainerLauncherImpl.java | 29 +++++++++++++------- .../tez/auxservices/TestShuffleHandlerJobs.java | 2 +- 7 files changed, 66 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index e7f0211..f407ac5 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle TEZ-3685. ShuffleHandler completedInputSet off-by-one error TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 6c2fcf0..105c85c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -663,19 +663,19 @@ public class TezConfiguration extends Configuration { /** Boolean value. Instructs AM to delete Dag directory upon completion */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="boolean") - public static final String TEZ_AM_DAG_DELETE_ENABLED = TEZ_AM_PREFIX - + "dag.delete.enabled"; - public static final boolean TEZ_AM_DAG_DELETE_ENABLED_DEFAULT = false; + public static final String TEZ_AM_DAG_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX + + "dag.cleanup.on.completion"; + public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; /** * Int value. Upper limit on the number of threads used to delete DAG directories on nodes. */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") - public static final String TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT = + public static final String TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT = TEZ_AM_PREFIX + "dag.deletion.thread-count-limit"; - public static final int TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT = 30; + public static final int TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT = 10; /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across * all vertices. Setting it to the same value for all tasks is helpful for container reuse and http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java index 0409a30..c12f41e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java @@ -18,27 +18,18 @@ package org.apache.tez.dag.app.launcher; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.records.TezDAGID; public abstract class DeletionTracker { protected final Configuration conf; - protected ExecutorService dagDeleteService; protected String pluginName; public DeletionTracker(Configuration conf, String pluginName) { this.conf = conf; - dagDeleteService = Executors.newFixedThreadPool( - conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT, - TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleDeleteTracker #%d").build()); this.pluginName = pluginName; } @@ -51,7 +42,6 @@ public abstract class DeletionTracker { } public void shutdown() { - dagDeleteService.shutdownNow(); - dagDeleteService = null; + // do nothing } } http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index 625aabb..f0b2818 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -20,7 +20,12 @@ package org.apache.tez.dag.app.launcher; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; @@ -29,21 +34,21 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.common.TezRuntimeUtils; public class DeletionTrackerImpl extends DeletionTracker { - Map<NodeId, Integer> nodeIdShufflePortMap; - String pluginName; + private Map<NodeId, Integer> nodeIdShufflePortMap; + private ExecutorService dagCleanupService; public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration conf, String pluginName) { super(conf, pluginName); this.nodeIdShufflePortMap = nodeIdShufflePortMap; + this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT, + TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10, + TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleDeleteTracker #%d").build()); } @Override public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { - boolean shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, - TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT); - if (!shouldDelete) { - return; - } + super.dagComplete(dag, jobTokenSecretManager); for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) { NodeId nodeId = entry.getKey(); int shufflePort = entry.getValue(); @@ -51,7 +56,7 @@ public class DeletionTrackerImpl extends DeletionTracker { if (shufflePort != TezRuntimeUtils.INVALID_PORT) { DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager, this.pluginName); - dagDeleteService.submit(dagDeleteRunnable); + dagCleanupService.submit(dagDeleteRunnable); } } nodeIdShufflePortMap.clear(); @@ -65,4 +70,12 @@ public class DeletionTrackerImpl extends DeletionTracker { } } } + + @Override + public void shutdown() { + if (dagCleanupService != null) { + dagCleanupService.shutdownNow(); + dagCleanupService = null; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index c977c6a..9c04781 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -52,6 +52,7 @@ import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -163,12 +164,17 @@ public class LocalContainerLauncher extends ContainerLauncher { String tezDefaultComponentName = isLocalMode ? TezConstants.getTezUberServicePluginName() : TezConstants.getTezYarnServicePluginName(); - String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, - TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); - deletionTracker = ReflectionUtils.createClazzInstance( - deletionTrackerClassName,new Class[] { - Map.class, Configuration.class, String.class}, - new Object[] {new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName}); + boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf) + && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, + TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); + if (cleanupDagDataOnComplete) { + String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, + TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); + deletionTracker = ReflectionUtils.createClazzInstance( + deletionTrackerClassName, new Class[]{ + Map.class, Configuration.class, String.class}, + new Object[]{new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName}); + } } @Override @@ -272,8 +278,9 @@ public class LocalContainerLauncher extends ContainerLauncher { RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId()); runningContainers.put(event.getContainerId(), callback); Futures.addCallback(runningTaskFuture, callback, callbackExecutor); - - deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); + if (deletionTracker != null) { + deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); + } } catch (RejectedExecutionException e) { handleLaunchFailed(e, event.getContainerId()); } @@ -411,7 +418,9 @@ public class LocalContainerLauncher extends ContainerLauncher { } public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { - deletionTracker.dagComplete(dag, jobTokenSecretManager); + if (deletionTracker != null) { + deletionTracker.dagComplete(dag, jobTokenSecretManager); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index f6a6874..c67fc01 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -89,7 +90,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>(); private ContainerManagementProtocolProxy cmProxy; private AtomicBoolean serviceStopped = new AtomicBoolean(false); - private DeletionTracker deletionTracker; + private DeletionTracker deletionTracker = null; private Container getContainer(ContainerOp event) { ContainerId id = event.getBaseOperation().getContainerId(); @@ -192,8 +193,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher { } else { LOG.warn("Shuffle port cannot be found since services metadata response is missing"); } - - deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); + if (deletionTracker != null) { + deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); + } } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + ExceptionUtils.getStackTrace(t); @@ -331,12 +333,17 @@ public class TezContainerLauncherImpl extends ContainerLauncher { }; eventHandlingThread.setName("ContainerLauncher Event Handler"); eventHandlingThread.start(); - String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, - TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); - deletionTracker = ReflectionUtils.createClazzInstance( - deletionTrackerClassName,new Class[] { - Map.class, Configuration.class, String.class}, - new Object[] {new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()}); + boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf) + && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, + TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); + if (cleanupDagDataOnComplete) { + String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, + TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); + deletionTracker = ReflectionUtils.createClazzInstance( + deletionTrackerClassName, new Class[]{ + Map.class, Configuration.class, String.class}, + new Object[]{new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()}); + } } @Override @@ -438,7 +445,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher { } public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { - deletionTracker.dagComplete(dag, jobTokenSecretManager); + if (deletionTracker != null) { + deletionTracker.dagComplete(dag, jobTokenSecretManager); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java index 73c4c13..27b700a 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java @@ -118,7 +118,7 @@ public class TestShuffleHandlerJobs { TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig()); tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); tezConf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, ShuffleHandler.TEZ_SHUFFLE_SERVICEID); - tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, true); + tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, true); tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
