asfgit closed pull request #6669: [FLINK-9567][runtime][yarn] Fix the yarn
container over allocation in…
URL: https://github.com/apache/flink/pull/6669
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index cf3588f6593..956e40fe61b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public void onContainersCompleted(final
List<ContainerStatus> statuses) {
if (yarnWorkerNode != null) {
// Container completed
unexpectedly ~> start a new one
final Container container =
yarnWorkerNode.getContainer();
-
internalRequestYarnContainer(container.getResource(),
yarnWorkerNode.getContainer().getPriority());
+
requestYarnContainer(container.getResource(),
yarnWorkerNode.getContainer().getPriority());
}
// Eagerly close the connection with
task manager.
closeTaskManagerConnection(resourceId,
new Exception(containerStatus.getDiagnostics()));
@@ -443,17 +443,24 @@ private FinalApplicationStatus
getYarnStatus(ApplicationStatus status) {
return new Tuple2<>(host, Integer.valueOf(port));
}
+ /**
+ * Request new container if pending containers cannot satisfies pending
slot requests.
+ */
private void requestYarnContainer(Resource resource, Priority priority)
{
- resourceManagerClient.addContainerRequest(new
AMRMClient.ContainerRequest(resource, null, null, priority));
+ int pendingSlotRequests = getNumberPendingSlotRequests();
+ int pendingSlotAllocation = numPendingContainerRequests *
numberOfTaskSlots;
+ if (pendingSlotRequests > pendingSlotAllocation) {
+ resourceManagerClient.addContainerRequest(new
AMRMClient.ContainerRequest(resource, null, null, priority));
- // make sure we transmit the request fast and receive fast news
of granted allocations
-
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+ // make sure we transmit the request fast and receive
fast news of granted allocations
+
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
- numPendingContainerRequests++;
+ numPendingContainerRequests++;
- log.info("Requesting new TaskExecutor container with resources
{}. Number pending requests {}.",
- resource,
- numPendingContainerRequests);
+ log.info("Requesting new TaskExecutor container with
resources {}. Number pending requests {}.",
+ resource,
+ numPendingContainerRequests);
+ }
}
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource
resource, String containerId, String host)
@@ -510,15 +517,4 @@ private int generatePriority(ResourceProfile
resourceProfile) {
return priority;
}
}
-
- /**
- * Request new container if pending containers cannot satisfies pending
slot requests.
- */
- private void internalRequestYarnContainer(Resource resource, Priority
priority) {
- int pendingSlotRequests = getNumberPendingSlotRequests();
- int pendingSlotAllocation = numPendingContainerRequests *
numberOfTaskSlots;
- if (pendingSlotRequests > pendingSlotAllocation) {
- requestYarnContainer(resource, priority);
- }
- }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services