Repository: tez Updated Branches: refs/heads/master ee4a9a908 -> 605154203
TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/60515420 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/60515420 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/60515420 Branch: refs/heads/master Commit: 6051542030101e42738fa2c2da984bb2c744b9c5 Parents: ee4a9a9 Author: Siddharth Seth <[email protected]> Authored: Wed Mar 1 09:05:58 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Wed Mar 1 09:05:58 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/rm/container/AMContainer.java | 1 + .../dag/app/rm/container/AMContainerImpl.java | 1 + .../dag/app/rm/container/AMContainerMap.java | 40 +++++- .../dag/app/rm/container/TestAMContainer.java | 8 +- .../app/rm/container/TestAMContainerMap.java | 126 +++++++++++++++---- 6 files changed, 146 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3806e27..7538f3e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances. TEZ-3637. TezMerger logs too much at INFO level TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount. http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index 8f5034e..5f90a89 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -36,4 +36,5 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{ public int getTaskSchedulerIdentifier(); public int getContainerLauncherIdentifier(); public int getTaskCommunicatorIdentifier(); + public boolean isInErrorState(); } http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 5d73a7b..ac429c7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -401,6 +401,7 @@ public class AMContainerImpl implements AMContainer { return this.taskCommId; } + @Override public boolean isInErrorState() { return inError; } http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java index ab43db1..050ffb6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java @@ -19,8 +19,11 @@ package org.apache.tez.dag.app.rm.container; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.common.ContainerSignatureMatcher; import org.slf4j.Logger; @@ -41,7 +44,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo private final TaskCommunicatorManagerInterface tal; private final AppContext context; private final ContainerSignatureMatcher containerSignatureMatcher; - private final ConcurrentHashMap<ContainerId, AMContainer> containerMap; + @VisibleForTesting + final ConcurrentHashMap<ContainerId, AMContainer> containerMap; public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher containerSignatureMatcher, AppContext context) { @@ -64,11 +68,23 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo } public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) { - AMContainer amc = new AMContainerImpl(container, chh, tal, - containerSignatureMatcher, context, schedulerId, launcherId, taskCommId); + AMContainer amc = createAmContainer(container, chh, tal, + containerSignatureMatcher, context, schedulerId, launcherId, taskCommId); + return (containerMap.putIfAbsent(container.getId(), amc) == null); } + AMContainer createAmContainer(Container container, + ContainerHeartbeatHandler chh, + TaskCommunicatorManagerInterface tal, + ContainerSignatureMatcher signatureMatcher, + AppContext appContext, int schedulerId, + int launcherId, int taskCommId) { + AMContainer amc = new AMContainerImpl(container, chh, tal, + signatureMatcher, appContext, schedulerId, launcherId, taskCommId); + return amc; + } + public AMContainer get(ContainerId containerId) { return containerMap.get(containerId); } @@ -79,6 +95,24 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo public void dagComplete(DAG dag){ AMContainerHelpers.dagComplete(dag.getID()); + // Cleanup completed containers after a query completes. + cleanupCompletedContainers(); + } + + private void cleanupCompletedContainers() { + Iterator<Map.Entry<ContainerId, AMContainer>> iterator = containerMap.entrySet().iterator(); + int count = 0; + while (iterator.hasNext()) { + Map.Entry<ContainerId, AMContainer> entry = iterator.next(); + AMContainer amContainer = entry.getValue(); + if (AMContainerState.COMPLETED.equals(amContainer.getState()) || amContainer.isInErrorState()) { + iterator.remove(); + count++; + } + } + LOG.info( + "Cleaned up completed containers on dagComplete. Removed={}, Remaining={}", + count, containerMap.size()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index ed14871..4d1bbae 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -1183,7 +1183,7 @@ public class TestAMContainer { // TODO Verify diagnostics in most of the tests. - private static class WrappedContainer { + static class WrappedContainer { long rmIdentifier = 2000; static final int taskPriority = 10; @@ -1215,10 +1215,10 @@ public class TestAMContainer { public AMContainerImpl amContainer; @SuppressWarnings("deprecation") // ContainerId - public WrappedContainer(boolean shouldProfile, String profileString) { + public WrappedContainer(boolean shouldProfile, String profileString, int cIdInt) { applicationID = ApplicationId.newInstance(rmIdentifier, 1); appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1); - containerID = ContainerId.newInstance(appAttemptID, 1); + containerID = ContainerId.newInstance(appAttemptID, cIdInt); nodeID = NodeId.newInstance("host", 12500); nodeHttpAddress = "host:12501"; resource = Resource.newInstance(1024, 1); @@ -1265,7 +1265,7 @@ public class TestAMContainer { } public WrappedContainer() { - this(false, null); + this(false, null, 1); } protected void mockDAGID() { http://git-wip-us.apache.org/repos/asf/tez/blob/60515420/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java index 2fcd0c8..efea327 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java @@ -18,11 +18,15 @@ package org.apache.tez.dag.app.rm.container; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import java.net.InetSocketAddress; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -31,43 +35,117 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.app.TaskCommunicatorWrapper; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.rm.container.TestAMContainer.WrappedContainer; import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.serviceplugins.api.ServicePluginException; +import org.junit.Test; public class TestAMContainerMap { - private ContainerHeartbeatHandler mockContainerHeartBeatHandler() { - return mock(ContainerHeartbeatHandler.class); - } - private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException { - TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress(); - doReturn(taskComm).when(tal).getTaskCommunicator(0); - return tal; - } + @Test (timeout = 10000) + public void testCleanupOnDagComplete() { - private AppContext mockAppContext() { + ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class); + TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); AppContext appContext = mock(AppContext.class); - return appContext; - } - @SuppressWarnings("deprecation") - private ContainerId mockContainerId(int cId) { - ApplicationId appId = ApplicationId.newInstance(1000, 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, cId); - return containerId; + + + int numContainers = 7; + WrappedContainer[] wContainers = new WrappedContainer[numContainers]; + for (int i = 0 ; i < numContainers ; i++) { + WrappedContainer wc = + new WrappedContainer(false, null, i); + wContainers[i] = wc; + } + + AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock( + ContainerSignatureMatcher.class), appContext, wContainers); + + for (int i = 0 ; i < numContainers ; i++) { + amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0); + } + + + // Container 1 in LAUNCHING state + wContainers[0].launchContainer(); + wContainers[0].verifyState(AMContainerState.LAUNCHING); + + // Container 2 in IDLE state + wContainers[1].launchContainer(); + wContainers[1].containerLaunched(); + wContainers[1].verifyState(AMContainerState.IDLE); + + // Container 3 RUNNING state + wContainers[2].launchContainer(); + wContainers[2].containerLaunched(); + wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID); + wContainers[2].verifyState(AMContainerState.RUNNING); + + // Cointainer 4 STOP_REQUESTED + wContainers[3].launchContainer(); + wContainers[3].containerLaunched(); + wContainers[3].stopRequest(); + wContainers[3].verifyState(AMContainerState.STOP_REQUESTED); + + // Container 5 STOPPING + wContainers[4].launchContainer(); + wContainers[4].containerLaunched(); + wContainers[4].stopRequest(); + wContainers[4].nmStopSent(); + wContainers[4].verifyState(AMContainerState.STOPPING); + + // Container 6 COMPLETED + wContainers[5].launchContainer(); + wContainers[5].containerLaunched(); + wContainers[5].stopRequest(); + wContainers[5].nmStopSent(); + wContainers[5].containerCompleted(); + wContainers[5].verifyState(AMContainerState.COMPLETED); + + // Container 7 STOP_REQUESTED + ERROR + wContainers[6].launchContainer(); + wContainers[6].containerLaunched(); + wContainers[6].containerLaunched(); + assertTrue(wContainers[6].amContainer.isInErrorState()); + wContainers[6].verifyState(AMContainerState.STOP_REQUESTED); + + // 7 containers present, and registered with AMContainerMap at this point. + + assertEquals(7, amContainerMap.containerMap.size()); + amContainerMap.dagComplete(mock(DAG.class)); + assertEquals(5, amContainerMap.containerMap.size()); } - private Container mockContainer(ContainerId containerId) { - NodeId nodeId = NodeId.newInstance("localhost", 43255); - Container container = Container.newInstance(containerId, nodeId, "localhost:33333", - Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class)); - return container; + private static class AMContainerMapForTest extends AMContainerMap { + + + private WrappedContainer[] wrappedContainers; + + public AMContainerMapForTest(ContainerHeartbeatHandler chh, + TaskCommunicatorManagerInterface tal, + ContainerSignatureMatcher containerSignatureMatcher, + AppContext context, WrappedContainer[] wrappedContainers) { + super(chh, tal, containerSignatureMatcher, context); + this.wrappedContainers = wrappedContainers; + } + + @Override + AMContainer createAmContainer(Container container, + ContainerHeartbeatHandler chh, + TaskCommunicatorManagerInterface tal, + ContainerSignatureMatcher signatureMatcher, + AppContext appContext, int schedulerId, + int launcherId, int taskCommId) { + return wrappedContainers[container.getId().getId()].amContainer; + } + } }
