Repository: tez Updated Branches: refs/heads/TEZ-3334 b81592ad6 -> 886fac7f4
TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes. (kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/886fac7f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/886fac7f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/886fac7f Branch: refs/heads/TEZ-3334 Commit: 886fac7f4d98c5c6542c557838ddc2f7b2c5864c Parents: b81592a Author: Kuhu Shukla <[email protected]> Authored: Tue May 16 12:40:56 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Tue May 16 12:40:56 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../tez/dag/app/launcher/DagDeleteRunnable.java | 29 +++++++++++++++----- .../tez/dag/app/launcher/DeletionTracker.java | 6 ++-- .../dag/app/launcher/DeletionTrackerImpl.java | 20 ++++++++++---- .../app/launcher/LocalContainerLauncher.java | 19 ++++--------- .../app/launcher/TezContainerLauncherImpl.java | 6 ++-- .../app/rm/container/AMContainerHelpers.java | 4 +-- .../dag/app/rm/container/AMContainerImpl.java | 6 ++-- .../dag/app/rm/container/AMContainerMap.java | 6 +++- .../dag/app/rm/container/TestAMContainer.java | 4 ++- 10 files changed, 60 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index f407ac5..a2afb15 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes. TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle TEZ-3685. ShuffleHandler completedInputSet off-by-one error TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java index 6d966b0..eac745e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java @@ -24,41 +24,56 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.URL; class DagDeleteRunnable implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(DagDeleteRunnable.class); final NodeId nodeId; final TezDAGID dag; final JobTokenSecretManager jobTokenSecretManager; - final String tezDefaultComponentName; final int shufflePort; final HttpConnectionParams httpConnectionParams; public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag, HttpConnectionParams httpConnectionParams, - JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent) { + JobTokenSecretManager jobTokenSecretMgr) { this.nodeId = nodeId; this.shufflePort = shufflePort; this.dag = currentDag; this.httpConnectionParams = httpConnectionParams; this.jobTokenSecretManager = jobTokenSecretMgr; - this.tezDefaultComponentName = tezDefaultComponent; } @Override public void run() { + BaseHttpConnection httpConnection = null; try { URL baseURL = TezRuntimeUtils.constructBaseURIForShuffleHandlerDagComplete( nodeId.getHost(), shufflePort, dag.getApplicationId().toString(), dag.getId(), false); - BaseHttpConnection httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, - httpConnectionParams, "DAGDelete", jobTokenSecretManager); + httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, httpConnectionParams, + "DAGDelete", jobTokenSecretManager); httpConnection.connect(); httpConnection.getInputStream(); } catch (Exception e) { - TezContainerLauncherImpl.LOG.warn("Could not setup HTTP Connection to the node " + nodeId.getHost() + " for dag delete " - + e); + LOG.warn("Could not setup HTTP Connection to the node " + nodeId.getHost() + " for dag delete. ", e); + } finally { + try { + if (httpConnection != null) { + httpConnection.cleanup(true); + } + } catch (IOException ioe) { + LOG.warn("Encountered IOException for " + nodeId.getHost() + " during close. ", ioe); + } } } + + @Override + public String toString() { + return "DagDeleteRunnable nodeId=" + nodeId + ", shufflePort=" + shufflePort + ", dagId=" + dag.toString(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java index c12f41e..27ece70 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java @@ -26,18 +26,16 @@ import org.apache.tez.dag.records.TezDAGID; public abstract class DeletionTracker { protected final Configuration conf; - protected String pluginName; - public DeletionTracker(Configuration conf, String pluginName) { + public DeletionTracker(Configuration conf) { this.conf = conf; - this.pluginName = pluginName; } public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { //do nothing } - public void addNodeShufflePorts(NodeId nodeId, int port) { + public void addNodeShufflePort(NodeId nodeId, int port) { //do nothing } http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index f0b2818..b7583ae 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -22,6 +22,7 @@ package org.apache.tez.dag.app.launcher; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -32,13 +33,16 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DeletionTrackerImpl extends DeletionTracker { + private static final Logger LOG = LoggerFactory.getLogger(DeletionTrackerImpl.class); private Map<NodeId, Integer> nodeIdShufflePortMap; private ExecutorService dagCleanupService; - public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration conf, String pluginName) { - super(conf, pluginName); + public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration conf) { + super(conf); this.nodeIdShufflePortMap = nodeIdShufflePortMap; this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT, TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10, @@ -54,16 +58,20 @@ public class DeletionTrackerImpl extends DeletionTracker { int shufflePort = entry.getValue(); //TODO: add check for healthy node if (shufflePort != TezRuntimeUtils.INVALID_PORT) { - DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, - shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager, this.pluginName); - dagCleanupService.submit(dagDeleteRunnable); + DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, shufflePort, dag, + TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager); + try { + dagCleanupService.submit(dagDeleteRunnable); + } catch (RejectedExecutionException rejectedException) { + LOG.info("Ignoring deletion request for " + dagDeleteRunnable); + } } } nodeIdShufflePortMap.clear(); } @Override - public void addNodeShufflePorts(NodeId nodeId, int port) { + public void addNodeShufflePort(NodeId nodeId, int port) { if (port != TezRuntimeUtils.INVALID_PORT) { if(nodeIdShufflePortMap.get(nodeId) == null) { nodeIdShufflePortMap.put(nodeId, port); http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 9c04781..4793bd7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -50,7 +50,6 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -143,14 +142,9 @@ public class LocalContainerLauncher extends ContainerLauncher { String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); localEnv = Maps.newHashMap(); + shufflePort = 0; AuxiliaryServiceHelper.setServiceDataIntoEnv( - auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv); - try { - shufflePort = TezRuntimeUtils.deserializeShuffleProviderMetaData( - AuxiliaryServiceHelper.getServiceDataFromEnv(auxiliaryService, localEnv)); - } catch (IOException e) { - LOG.warn("Could not extract shuffle aux-service port!"); - } + auxiliaryService, ByteBuffer.allocate(4).putInt(shufflePort), localEnv); } else { localEnv = System.getenv(); } @@ -161,9 +155,6 @@ public class LocalContainerLauncher extends ContainerLauncher { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread #%d") .build()); this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor); - String tezDefaultComponentName = - isLocalMode ? TezConstants.getTezUberServicePluginName() : - TezConstants.getTezYarnServicePluginName(); boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf) && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT); @@ -172,8 +163,8 @@ public class LocalContainerLauncher extends ContainerLauncher { TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( deletionTrackerClassName, new Class[]{ - Map.class, Configuration.class, String.class}, - new Object[]{new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName}); + Map.class, Configuration.class}, + new Object[]{new HashMap<NodeId, Integer>(), conf}); } } @@ -279,7 +270,7 @@ public class LocalContainerLauncher extends ContainerLauncher { runningContainers.put(event.getContainerId(), callback); Futures.addCallback(runningTaskFuture, callback, callbackExecutor); if (deletionTracker != null) { - deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); + deletionTracker.addNodeShufflePort(event.getNodeId(), shufflePort); } } catch (RejectedExecutionException e) { handleLaunchFailed(e, event.getContainerId()); http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index c67fc01..922575f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -194,7 +194,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { LOG.warn("Shuffle port cannot be found since services metadata response is missing"); } if (deletionTracker != null) { - deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort); + deletionTracker.addNodeShufflePort(event.getNodeId(), shufflePort); } } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " @@ -341,8 +341,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher { TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( deletionTrackerClassName, new Class[]{ - Map.class, Configuration.class, String.class}, - new Object[]{new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()}); + Map.class, Configuration.class}, + new Object[]{new HashMap<NodeId, Integer>(), conf}); } } http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index ba3ecad..f959a7c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -154,13 +154,11 @@ public class AMContainerHelpers { String javaOpts, InetSocketAddress taskAttemptListenerAddress, Credentials credentials, AppContext appContext, Resource containerResource, - Configuration conf) { + Configuration conf, String auxiliaryService) { ContainerLaunchContext commonContainerSpec = null; synchronized (commonContainerSpecLock) { if (!commonContainerSpecs.containsKey(tezDAGID)) { - String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, - TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); commonContainerSpec = createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService); commonContainerSpecs.put(tezDAGID, commonContainerSpec); http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 94c8fe0..8e7df32 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -91,6 +91,7 @@ public class AMContainerImpl implements AMContainer { private final int schedulerId; private final int launcherId; private final int taskCommId; + private String auxiliaryService; private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>(); @@ -313,7 +314,7 @@ public class AMContainerImpl implements AMContainer { // additional change - JvmID, YarnChild, etc depend on TaskType. public AMContainerImpl(Container container, ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher signatureMatcher, - AppContext appContext, int schedulerId, int launcherId, int taskCommId) { + AppContext appContext, int schedulerId, int launcherId, int taskCommId, String auxiliaryService) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -329,6 +330,7 @@ public class AMContainerImpl implements AMContainer { this.launcherId = launcherId; this.taskCommId = taskCommId; this.stateMachine = stateMachineFactory.make(this); + this.auxiliaryService = auxiliaryService; } @Override @@ -498,7 +500,7 @@ public class AMContainerImpl implements AMContainer { cAddress, containerContext.getCredentials(), container.appContext, container.container.getResource(), - container.appContext.getAMConf()); + container.appContext.getAMConf(), container.auxiliaryService); // Registering now, so that in case of delayed NM response, the child // task is not told to die since the TAL does not know about the container. http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java index ab43db1..1b2fe16 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java @@ -29,6 +29,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; @@ -42,6 +43,7 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo private final AppContext context; private final ContainerSignatureMatcher containerSignatureMatcher; private final ConcurrentHashMap<ContainerId, AMContainer> containerMap; + private String auxiliaryService; public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher containerSignatureMatcher, AppContext context) { @@ -51,6 +53,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo this.context = context; this.containerSignatureMatcher = containerSignatureMatcher; this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>(); + this.auxiliaryService = context.getAMConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); } @Override @@ -65,7 +69,7 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) { AMContainer amc = new AMContainerImpl(container, chh, tal, - containerSignatureMatcher, context, schedulerId, launcherId, taskCommId); + containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService); return (containerMap.putIfAbsent(container.getId(), amc) == null); } http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index ed14871..65883ee 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.TaskCommunicatorWrapper; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.ServicePluginException; @@ -1261,7 +1262,8 @@ public class TestAMContainer { doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); amContainer = new AMContainerImpl(container, chh, tal, - new ContainerContextMatcher(), appContext, 0, 0, 0); + new ContainerContextMatcher(), appContext, 0, 0, 0, conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); } public WrappedContainer() {
