Repository: flink
Updated Branches:
  refs/heads/release-1.5 41cef0da3 -> e1fcd458a


[FLINK-9567][yarn] Only restart containers if there are pending slot requests

The YarnResourceManager should only restart containers if it still has some 
pending
slot requests left. This solves the problem that upon restart of the 
YarnResourceManager
it can happen that one recovers containers from a previous attempt which are 
just about
to be completed (the completion was triggered by the previous attempt). These 
containers
should not be restarted because they are no longer needed.

This closes #6237.

[FLINK-9567][runtime][yarn] Fix the bug that Flink does not release Yarn 
container when onContainerCompleted callback method happened after full restart


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b0aca56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b0aca56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b0aca56

Branch: refs/heads/release-1.5
Commit: 4b0aca5681e749cc8b0389d389e1fac736653eb5
Parents: 41cef0d
Author: yangshimin <yangshi...@youzan.com>
Authored: Mon Jul 2 11:56:00 2018 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Jul 2 16:47:38 2018 +0200

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        |  8 +++
 .../slotmanager/SlotManager.java                |  2 +
 .../apache/flink/yarn/YarnResourceManager.java  | 12 +++-
 .../flink/yarn/YarnResourceManagerTest.java     | 59 ++++++++++++++++++++
 4 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3ea5c2e..6b104ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1125,5 +1125,13 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        return CompletableFuture.completedFuture(null);
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  Resource Management
+       // 
------------------------------------------------------------------------
+
+       protected int getNumberPendingSlotRequests() {
+               return slotManager.getNumberPendingSlotRequests();
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index b2dbba8..d74979a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -168,6 +168,8 @@ public class SlotManager implements AutoCloseable {
                }
        }
 
+       public int getNumberPendingSlotRequests() {return 
pendingSlotRequests.size(); }
+
        // 
---------------------------------------------------------------------------------------------
        // Component lifecycle methods
        // 
---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c498634..ab031be 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                                        if (yarnWorkerNode != null) {
                                                // Container completed 
unexpectedly ~> start a new one
                                                final Container container = 
yarnWorkerNode.getContainer();
-                                               
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
+                                               
internalRequestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
                                                
closeTaskManagerConnection(resourceId, new 
Exception(containerStatus.getDiagnostics()));
                                        }
                                }
@@ -510,4 +510,14 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                }
        }
 
+       /**
+        * Request new container if pending containers cannot satisfies pending 
slot requests.
+        */
+       private void internalRequestYarnContainer(Resource resource, Priority 
priority) {
+               int pendingSlotRequests = getNumberPendingSlotRequests();
+               int pendingSlotAllocation = numPendingContainerRequests * 
defaultNumSlots;
+               if (pendingSlotRequests > pendingSlotAllocation) {
+                       requestYarnContainer(resource, priority);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b0aca56/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 4fffc2b..8c6d7f7 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -103,6 +105,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -206,6 +209,7 @@ public class YarnResourceManagerTest extends TestLogger {
                protected void runAsync(final Runnable runnable) {
                        runnable.run();
                }
+
        }
 
        class Context {
@@ -421,4 +425,59 @@ public class YarnResourceManagerTest extends TestLogger {
                        assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
                }};
        }
+
+       /**
+        * Tests that YarnResourceManager will not request more containers than 
needs during
+        * callback from Yarn when container is Completed.
+        * @throws Exception
+        */
+       @Test
+       public void testOnContainerCompleted() throws Exception {
+               new Context() {{
+                       startResourceManager();
+                       CompletableFuture<?> registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+                               rmServices.slotManager.registerSlotRequest(
+                                       new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
+                               return null;
+                       });
+
+                       // wait for the registerSlotRequest completion
+                       registerSlotRequestFuture.get();
+
+                       ContainerId testContainerId = ContainerId.newInstance(
+                               ApplicationAttemptId.newInstance(
+                                       
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                                       1),
+                               1);
+
+                       // Callback from YARN when container is allocated.
+                       Container testingContainer = mock(Container.class);
+                       
when(testingContainer.getId()).thenReturn(testContainerId);
+                       
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+                       
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+                       
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+                       
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+                       
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                       
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
+
+                       // Callback from YARN when container is Completed, 
pending request can not be fulfilled by pending
+                       // containers, need to request new container.
+                       ContainerStatus testingContainerStatus = 
mock(ContainerStatus.class);
+                       
when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
+                       
when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+                       
when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
+                       
when(testingContainerStatus.getExitStatus()).thenReturn(-1);
+                       
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+                       verify(mockResourceManagerClient, 
times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
+                       // Callback from YARN when container is Completed 
happened before global fail, pending request
+                       // slot is already fulfilled by pending containers, no 
need to request new container.
+                       
when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
+                       
when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+                       
when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
+                       
when(testingContainerStatus.getExitStatus()).thenReturn(-1);
+                       
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+                       verify(mockResourceManagerClient, 
times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+               }};
+       }
 }

Reply via email to