This is an automated email from the ASF dual-hosted git repository.
shuyichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e26d90f [FLINK-10848][YARN] properly remove YARN ContainerRequest
upon container allocation success
e26d90f is described below
commit e26d90fc86b266978b4bac84fe02ca34b62983fe
Author: Shuyi Chen <[email protected]>
AuthorDate: Sat Nov 10 00:42:49 2018 -0800
[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container
allocation success
This closes #7078
---
.../src/test/java/org/apache/flink/yarn/YarnTestBase.java | 2 ++
.../java/org/apache/flink/yarn/YarnFlinkResourceManager.java | 2 ++
.../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++-
.../java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++
.../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++
5 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3763f65..f1e6a3a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); //
memory is overwritten in the MiniYARNCluster.
+
YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
+
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000);
// 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 8e686bb..3327505 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends
FlinkResourceManager<RegisteredYar
numPendingContainerRequests = Math.max(0,
numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining
pending container requests: {}",
container.getId(), numPendingContainerRequests);
+ resourceManagerClient.removeContainerRequest(new
AMRMClient.ContainerRequest(
+ container.getResource(), null, null,
container.getPriority()));
// decide whether to return the container, or whether
to start a TaskManager
if (numRegistered + containersInLaunch.size() <
numRequired) {
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6ff5cd6..6669f16 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -361,7 +361,8 @@ public class YarnResourceManager extends
ResourceManager<YarnWorkerNode> impleme
"Received new container: {} - Remaining
pending container requests: {}",
container.getId(),
numPendingContainerRequests);
-
+
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+ container.getResource(), null,
null, container.getPriority()));
if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index 10b2ce9..d665df6 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends
TestLogger {
1),
i));
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container",
1234));
+
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
containerList.add(mockContainer);
}
@@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends
TestLogger {
int numberOfRegisteredResources = (Integer)
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+ verify(resourceManagerClient,
times(numInitialTaskManagers)).removeContainerRequest(
+
any(AMRMClient.ContainerRequest.class));
assertEquals(numInitialTaskManagers,
numberOfRegisteredResources);
} finally {
if (resourceManager != null) {
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index d41d42d..ee325da 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger {
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
verify(mockResourceManagerClient).removeContainerRequest(
+
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer),
any(ContainerLaunchContext.class));
// Remote task executor registers with
YarnResourceManager.
@@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger {
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
verify(mockResourceManagerClient).removeContainerRequest(
+
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer),
any(ContainerLaunchContext.class));
// Callback from YARN when container is
Completed, pending request can not be fulfilled by pending