suez1224 closed pull request #7078: [FLINK-10848][YARN] properly remove YARN
ContainerRequest upon container allocation success
URL: https://github.com/apache/flink/pull/7078
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3763f6592af..f1e6a3a767c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -165,6 +165,8 @@
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); //
memory is overwritten in the MiniYARNCluster.
+
YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
+
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000);
// 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 8e686bbbe34..3327505e32d 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -438,6 +438,8 @@ private void containersAllocated(List<Container>
containers) {
numPendingContainerRequests = Math.max(0,
numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining
pending container requests: {}",
container.getId(), numPendingContainerRequests);
+ resourceManagerClient.removeContainerRequest(new
AMRMClient.ContainerRequest(
+ container.getResource(), null, null,
container.getPriority()));
// decide whether to return the container, or whether
to start a TaskManager
if (numRegistered + containersInLaunch.size() <
numRequired) {
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6ff5cd66487..6669f16fa40 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -361,7 +361,8 @@ public void onContainersAllocated(List<Container>
containers) {
"Received new container: {} - Remaining
pending container requests: {}",
container.getId(),
numPendingContainerRequests);
-
+
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+ container.getResource(), null,
null, container.getPriority()));
if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index 10b2ce97d6f..d665df6bc7c 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,6 +43,8 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -69,8 +71,11 @@
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -125,6 +130,8 @@ public void
testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except
1),
i));
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container",
1234));
+
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
containerList.add(mockContainer);
}
@@ -233,6 +240,8 @@ public Object answer(InvocationOnMock invocation) throws
Throwable {
int numberOfRegisteredResources = (Integer)
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+ verify(resourceManagerClient,
times(numInitialTaskManagers)).removeContainerRequest(
+
any(AMRMClient.ContainerRequest.class));
assertEquals(numInitialTaskManagers,
numberOfRegisteredResources);
} finally {
if (resourceManager != null) {
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index d41d42d7a05..ee325dad0f7 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -401,6 +401,8 @@ public void testStopWorker() throws Exception {
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
verify(mockResourceManagerClient).removeContainerRequest(
+
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer),
any(ContainerLaunchContext.class));
// Remote task executor registers with
YarnResourceManager.
@@ -496,6 +498,8 @@ public void testOnContainerCompleted() throws Exception {
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
verify(mockResourceManagerClient).removeContainerRequest(
+
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer),
any(ContainerLaunchContext.class));
// Callback from YARN when container is
Completed, pending request can not be fulfilled by pending
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services