TEZ-3726. Clean up DeletionTracker's reflection instantiation and provide ContainerLauncher with dagComplete() functionality (kshukla)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8e85c465 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8e85c465 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8e85c465 Branch: refs/heads/master Commit: 8e85c4650f2f24e7bfb9f6618cc2ed539b09185d Parents: 886fac7 Author: Kuhu Shukla <[email protected]> Authored: Wed May 17 10:28:44 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Wed May 17 10:28:44 2017 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../apache/tez/common/DagContainerLauncher.java | 43 ++++++++++++++++++++ .../app/launcher/ContainerLauncherWrapper.java | 8 ++-- .../dag/app/launcher/DeletionTrackerImpl.java | 6 +-- .../app/launcher/LocalContainerLauncher.java | 10 ++--- .../app/launcher/TezContainerLauncherImpl.java | 10 ++--- 6 files changed, 58 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8e85c465/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index a2afb15..8924915 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3726. Clean up DeletionTracker's reflection instantiation and provide ContainerLauncher with dagComplete() functionality TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes. TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle TEZ-3685. ShuffleHandler completedInputSet off-by-one error http://git-wip-us.apache.org/repos/asf/tez/blob/8e85c465/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java new file mode 100644 index 0000000..e3bd385 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/DagContainerLauncher.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; + +/** + * Plugin to allow custom container launchers to be written to launch containers that want to + * support cleanup of DAG level directories upon DAG completion in session mode. The directories are created by + * the Tez Shuffle Handler (tez_shuffle). A typical implementation of dagComplete() method would contain logic to send + * http request(s) for dag deletion to the nodes that support this auxiliary service. + */ +@Public +@Unstable +public abstract class DagContainerLauncher extends ContainerLauncher { + + public DagContainerLauncher(ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + public abstract void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager); +} http://git-wip-us.apache.org/repos/asf/tez/blob/8e85c465/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java index c70ab10..8ecac14 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.app.launcher; +import org.apache.tez.common.DagContainerLauncher; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -41,11 +42,8 @@ public class ContainerLauncherWrapper { } public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { - if (real instanceof TezContainerLauncherImpl) { - ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager); - } - if (real instanceof LocalContainerLauncher) { - ((LocalContainerLauncher)real).dagComplete(dag, jobTokenSecretManager); + if (real instanceof DagContainerLauncher) { + ((DagContainerLauncher)real).dagComplete(dag, jobTokenSecretManager); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/8e85c465/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index b7583ae..52b6347 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.launcher; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -38,12 +39,11 @@ import org.slf4j.LoggerFactory; public class DeletionTrackerImpl extends DeletionTracker { private static final Logger LOG = LoggerFactory.getLogger(DeletionTrackerImpl.class); - private Map<NodeId, Integer> nodeIdShufflePortMap; + private Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId, Integer>(); private ExecutorService dagCleanupService; - public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration conf) { + public DeletionTrackerImpl(Configuration conf) { super(conf); - this.nodeIdShufflePortMap = nodeIdShufflePortMap; this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT, TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), http://git-wip-us.apache.org/repos/asf/tez/blob/8e85c465/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 4793bd7..d50b49e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -44,7 +44,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.DagContainerLauncher; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.records.TezDAGID; @@ -53,7 +53,6 @@ import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; -import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -82,7 +81,7 @@ import org.apache.tez.runtime.task.TezChild; * Since all (sub)tasks share the same local directory, they must be executed * sequentially in order to avoid creating/deleting the same files/dirs. */ -public class LocalContainerLauncher extends ContainerLauncher { +public class LocalContainerLauncher extends DagContainerLauncher { private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class); @@ -162,9 +161,7 @@ public class LocalContainerLauncher extends ContainerLauncher { String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( - deletionTrackerClassName, new Class[]{ - Map.class, Configuration.class}, - new Object[]{new HashMap<NodeId, Integer>(), conf}); + deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf}); } } @@ -408,6 +405,7 @@ public class LocalContainerLauncher extends ContainerLauncher { } } + @Override public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { if (deletionTracker != null) { deletionTracker.dagComplete(dag, jobTokenSecretManager); http://git-wip-us.apache.org/repos/asf/tez/blob/8e85c465/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index 922575f..67fc4ed 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.DagContainerLauncher; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenSecretManager; @@ -44,7 +44,6 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; -import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.slf4j.Logger; @@ -75,7 +74,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class is responsible for launching of containers. */ -public class TezContainerLauncherImpl extends ContainerLauncher { +public class TezContainerLauncherImpl extends DagContainerLauncher { // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering. static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class); @@ -340,9 +339,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS, TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT); deletionTracker = ReflectionUtils.createClazzInstance( - deletionTrackerClassName, new Class[]{ - Map.class, Configuration.class}, - new Object[]{new HashMap<NodeId, Integer>(), conf}); + deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf}); } } @@ -444,6 +441,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { } } + @Override public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) { if (deletionTracker != null) { deletionTracker.dagComplete(dag, jobTokenSecretManager);
