Repository: tez Updated Branches: refs/heads/master 536394726 -> 01db837c4
TEZ-3713. Allow dag level deletion in cases where containers are reused (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/01db837c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/01db837c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/01db837c Branch: refs/heads/master Commit: 01db837c4c798cabd03bdaf3aa37ec02a68606f1 Parents: 5363947 Author: Jonathan Eagles <[email protected]> Authored: Wed May 31 14:54:37 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed May 31 14:54:45 2017 -0500 ---------------------------------------------------------------------- .../dag/app/launcher/DeletionTrackerImpl.java | 8 ++- .../dag/app/launcher/TestDeletionTracker.java | 72 ++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/01db837c/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java index 52b6347..06dae2d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.security.JobTokenSecretManager; @@ -67,7 +68,6 @@ public class DeletionTrackerImpl extends DeletionTracker { } } } - nodeIdShufflePortMap.clear(); } @Override @@ -79,11 +79,17 @@ public class DeletionTrackerImpl extends DeletionTracker { } } + @VisibleForTesting + Map<NodeId, Integer> getNodeIdShufflePortMap() { + return nodeIdShufflePortMap; + } + @Override public void shutdown() { if (dagCleanupService != null) { dagCleanupService.shutdownNow(); dagCleanupService = null; } + nodeIdShufflePortMap = null; } } http://git-wip-us.apache.org/repos/asf/tez/blob/01db837c/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestDeletionTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestDeletionTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestDeletionTracker.java new file mode 100644 index 0000000..fba35a6 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestDeletionTracker.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.records.TezDAGID; +import org.junit.Assert; +import org.junit.Test; + +public class TestDeletionTracker { + + @Test + public void testNodeIdShufflePortMap() throws Exception { + DeletionTrackerImpl deletionTracker = new DeletionTrackerImpl(new Configuration()); + // test NodeId + NodeId nodeId = new NodeId() { + @Override + public String getHost() { + return "testHost"; + } + + @Override + protected void setHost(String s) { + + } + + @Override + public int getPort() { + return 1234; + } + + @Override + protected void setPort(int i) { + + } + + @Override + protected void build() { + + } + }; + // test shuffle port for the nodeId + int shufflePort = 9999; + deletionTracker.addNodeShufflePort(nodeId, shufflePort); + Assert.assertEquals("Unexpected number of entries in NodeIdShufflePortMap!", + 1, deletionTracker.getNodeIdShufflePortMap().size()); + deletionTracker.addNodeShufflePort(nodeId, shufflePort); + Assert.assertEquals("Unexpected number of entries in NodeIdShufflePortMap!", + 1, deletionTracker.getNodeIdShufflePortMap().size()); + deletionTracker.dagComplete(new TezDAGID(), new JobTokenSecretManager()); + Assert.assertEquals("Unexpected number of entries in NodeIdShufflePortMap after dagComplete!", + 1, deletionTracker.getNodeIdShufflePortMap().size()); + } +}
