This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aad1f9ad Calculate requested container count based on adding 
allocated count and outstanding ContainerRequests in Yarn (#3524)
7aad1f9ad is described below

commit 7aad1f9ade24ad05ab797e150b6c03a03d925c04
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Tue Jun 21 13:10:49 2022 -0700

    Calculate requested container count based on adding allocated count and 
outstanding ContainerRequests in Yarn (#3524)
---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index a81960fdf..2f8a06c88 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -191,8 +191,6 @@ public class YarnService extends AbstractIdleService {
   // instance names get picked up when replacement containers get allocated.
   private final Set<String> unusedHelixInstanceNames = 
ConcurrentHashMap.newKeySet();
 
-  // The map from helix tag to requested container count
-  private final Map<String, Integer> requestedContainerCountMap = 
Maps.newConcurrentMap();
   // The map from helix tag to allocated container count
   private final Map<String, Integer> allocatedContainerCountMap = 
Maps.newConcurrentMap();
 
@@ -444,11 +442,12 @@ public class YarnService extends AbstractIdleService {
     for (Map.Entry<String, Integer> entry : 
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
       String currentHelixTag = entry.getKey();
       int desiredContainerCount = entry.getValue();
-      int requestedContainerCount = 
requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
+      // Calculate requested container count based on adding allocated count 
and outstanding ContainerRequests in Yarn
+      int requestedContainerCount = 
allocatedContainerCountMap.getOrDefault(currentHelixTag, 0)
+          + 
getMatchingRequestsCount(yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
       for(; requestedContainerCount < desiredContainerCount; 
requestedContainerCount++) {
         requestContainer(Optional.absent(), 
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
       }
-      requestedContainerCountMap.put(currentHelixTag, requestedContainerCount);
     }
 
     // If the total desired is lower than the currently allocated amount then 
release free containers.
@@ -466,8 +465,6 @@ public class YarnService extends AbstractIdleService {
         ContainerInfo containerInfo = entry.getValue();
         if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
           containersToRelease.add(containerInfo.getContainer());
-          requestedContainerCountMap.put(containerInfo.getHelixTag(),
-              requestedContainerCountMap.get(containerInfo.getHelixTag()) - 1);
         }
 
         if (containersToRelease.size() == numToShutdown) {
@@ -480,8 +477,8 @@ public class YarnService extends AbstractIdleService {
       this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
     }
     this.yarnContainerRequest = yarnContainerRequestBundle;
-    LOGGER.info("Current tag-container being requested:{}, tag-container 
allocated: {}",
-        this.requestedContainerCountMap, this.allocatedContainerCountMap);
+    LOGGER.info("Current tag-container desired count:{}, tag-container 
allocated: {}",
+        yarnContainerRequestBundle.getHelixTagContainerCountMap(), 
this.allocatedContainerCountMap);
   }
 
   // Request initial containers with default resource and helix tag
@@ -663,8 +660,8 @@ public class YarnService extends AbstractIdleService {
     String helixTag = completedContainerInfo == null ? helixInstanceTags : 
completedContainerInfo.getHelixTag();
     allocatedContainerCountMap.put(helixTag, 
allocatedContainerCountMap.get(helixTag) - 1);
 
-    LOGGER.info(String.format("Container %s running Helix instance %s has 
completed with exit status %d",
-        containerStatus.getContainerId(), completedInstanceName, 
containerStatus.getExitStatus()));
+    LOGGER.info(String.format("Container %s running Helix instance %s with tag 
%s has completed with exit status %d",
+        containerStatus.getContainerId(), completedInstanceName, helixTag, 
containerStatus.getExitStatus()));
 
     if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
       LOGGER.info(String.format("Received the following diagnostics 
information for container %s: %s",
@@ -758,6 +755,15 @@ public class YarnService extends AbstractIdleService {
     return eventMetadataBuilder;
   }
 
+  /**
+   * Get the number of matching container requests for the specified resource 
memory and cores.
+   */
+  private int getMatchingRequestsCount(Resource resource) {
+    int priorityNum = resourcePriorityMap.getOrDefault(resource.toString(), 0);
+    Priority priority = Priority.newInstance(priorityNum);
+    return getAmrmClientAsync().getMatchingRequests(priority, 
ResourceRequest.ANY, resource).size();
+  }
+
   /**
    * A custom implementation of {@link AMRMClientAsync.CallbackHandler}.
    */

Reply via email to