Repository: flink Updated Branches: refs/heads/release-1.5 41cef0da3 -> e1fcd458a
[FLINK-9567][yarn] Only restart containers if there are pending slot requests The YarnResourceManager should only restart containers if it still has some pending slot requests left. This solves the problem that upon restart of the YarnResourceManager it can happen that one recovers containers from a previous attempt which are just about to be completed (the completion was triggered by the previous attempt). These containers should not be restarted because they are no longer needed. This closes #6237. [FLINK-9567][runtime][yarn] Fix the bug that Flink does not release Yarn container when onContainerCompleted callback method happened after full restart Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b0aca56 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b0aca56 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b0aca56 Branch: refs/heads/release-1.5 Commit: 4b0aca5681e749cc8b0389d389e1fac736653eb5 Parents: 41cef0d Author: yangshimin <yangshi...@youzan.com> Authored: Mon Jul 2 11:56:00 2018 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 2 16:47:38 2018 +0200 ---------------------------------------------------------------------- .../resourcemanager/ResourceManager.java | 8 +++ .../slotmanager/SlotManager.java | 2 + .../apache/flink/yarn/YarnResourceManager.java | 12 +++- .../flink/yarn/YarnResourceManagerTest.java | 59 ++++++++++++++++++++ 4 files changed, 80 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3ea5c2e..6b104ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -1125,5 +1125,13 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> return CompletableFuture.completedFuture(null); } } + + // ------------------------------------------------------------------------ + // Resource Management + // ------------------------------------------------------------------------ + + protected int getNumberPendingSlotRequests() { + return slotManager.getNumberPendingSlotRequests(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index b2dbba8..d74979a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -168,6 +168,8 @@ public class SlotManager implements AutoCloseable { } } + public int getNumberPendingSlotRequests() {return pendingSlotRequests.size(); } + // --------------------------------------------------------------------------------------------- // Component lifecycle methods // --------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index c498634..ab031be 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -334,7 +334,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); + internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); } } @@ -510,4 +510,14 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme } } + /** + * Request new container if pending containers cannot satisfies pending slot requests. + */ + private void internalRequestYarnContainer(Resource resource, Priority priority) { + int pendingSlotRequests = getNumberPendingSlotRequests(); + int pendingSlotAllocation = numPendingContainerRequests * defaultNumSlots; + if (pendingSlotRequests > pendingSlotAllocation) { + requestYarnContainer(resource, priority); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 4fffc2b..8c6d7f7 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -103,6 +105,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -206,6 +209,7 @@ public class YarnResourceManagerTest extends TestLogger { protected void runAsync(final Runnable runnable) { runnable.run(); } + } class Context { @@ -421,4 +425,59 @@ public class YarnResourceManagerTest extends TestLogger { assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); }}; } + + /** + * Tests that YarnResourceManager will not request more containers than needs during + * callback from Yarn when container is Completed. + * @throws Exception + */ + @Test + public void testOnContainerCompleted() throws Exception { + new Context() {{ + startResourceManager(); + CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + ContainerId testContainerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + 1); + + // Callback from YARN when container is allocated. + Container testingContainer = mock(Container.class); + when(testingContainer.getId()).thenReturn(testContainerId); + when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Callback from YARN when container is Completed, pending request can not be fulfilled by pending + // containers, need to request new container. + ContainerStatus testingContainerStatus = mock(ContainerStatus.class); + when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); + when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); + when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); + when(testingContainerStatus.getExitStatus()).thenReturn(-1); + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + // Callback from YARN when container is Completed happened before global fail, pending request + // slot is already fulfilled by pending containers, no need to request new container. + when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); + when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); + when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); + when(testingContainerStatus.getExitStatus()).thenReturn(-1); + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + }}; + } }