This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 013a85e3b [GOBBLIN-1728] Fix YarnService incorrect container 
allocation behavior (#3586)
013a85e3b is described below

commit 013a85e3b04b986cd4ca16458b246bac5ed949fa
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Oct 24 14:30:34 2022 -0700

    [GOBBLIN-1728] Fix YarnService incorrect container allocation behavior 
(#3586)
    
    * [GOBBLIN-1728] Fix YarnService incorrect container allocation behavior
    
    Due to race condition and incorrect API call, the yarn service both 
allocates too many containers and in
    rare cases has a discrepancy between the number of containers state due to 
race conditions.
    
    * Log improvement for container counts by helix tag
    
    * Address comments and add short-circuit check if there is no match for 
priority for a given resource. This occurs during the initial request of 
containers
    
    Change tested for another 6 hours with ~256 high volume topic partitions
---
 .../gobblin/yarn/YarnAutoScalingManager.java       |  2 +-
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  8 +-
 .../java/org/apache/gobblin/yarn/YarnService.java  | 92 ++++++++++++++++------
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  1 +
 4 files changed, 75 insertions(+), 28 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index e7ab082e2..7c4da8fd8 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -359,4 +359,4 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 1ecbd4510..72f9cc336 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -23,6 +23,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -240,12 +241,13 @@ public class YarnHelixUtils {
    * @return helix tag that this container should be assigned with, if null 
means need to use the default
    */
   public static String findHelixTagForContainer(Container container,
-      Map<String, Integer> helixTagAllocatedContainerCount, 
YarnContainerRequestBundle requestedYarnContainer) {
+      Map<String, AtomicInteger> helixTagAllocatedContainerCount, 
YarnContainerRequestBundle requestedYarnContainer) {
     String foundTag = null;
     if(requestedYarnContainer != null && 
requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString()))
 {
       for (String tag : 
requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString()))
 {
         int desiredCount = 
requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
-        int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag, 
0);
+        helixTagAllocatedContainerCount.putIfAbsent(tag, new AtomicInteger(0));
+        int allocatedCount = helixTagAllocatedContainerCount.get(tag).get();
         foundTag = tag;
         if(allocatedCount < desiredCount) {
           return foundTag;
@@ -254,4 +256,4 @@ public class YarnHelixUtils {
     }
     return foundTag;
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 8a12bb0ce..f899bf74e 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.yarn;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,12 +28,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 import lombok.AllArgsConstructor;
 import org.apache.hadoop.conf.Configuration;
@@ -111,7 +114,6 @@ import org.apache.gobblin.yarn.event.NewContainerRequest;
 
 import static 
org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
 
-
 /**
  * This class is responsible for all Yarn-related stuffs including 
ApplicationMaster registration,
  * ApplicationMaster un-registration, Yarn container management, etc.
@@ -194,7 +196,7 @@ public class YarnService extends AbstractIdleService {
   private final Set<String> unusedHelixInstanceNames = 
ConcurrentHashMap.newKeySet();
 
   // The map from helix tag to allocated container count
-  private final Map<String, Integer> allocatedContainerCountMap = 
Maps.newConcurrentMap();
+  private final ConcurrentMap<String, AtomicInteger> 
allocatedContainerCountMap = Maps.newConcurrentMap();
 
   private final boolean isPurgingOfflineHelixInstancesEnabled;
   private final long helixPurgeLaggingThresholdMs;
@@ -395,7 +397,8 @@ public class YarnService extends AbstractIdleService {
         synchronized (this.allContainersStopped) {
           try {
             // Wait 5 minutes for the containers to stop
-            this.allContainersStopped.wait(5 * 60 * 1000);
+            Duration waitTimeout = Duration.ofMinutes(5);
+            this.allContainersStopped.wait(waitTimeout.toMillis());
             LOGGER.info("All of the containers have been stopped");
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -461,12 +464,13 @@ public class YarnService extends AbstractIdleService {
    * @param inUseInstances  a set of in use instances
    */
   public synchronized void 
requestTargetNumberOfContainers(YarnContainerRequestBundle 
yarnContainerRequestBundle, Set<String> inUseInstances) {
-    LOGGER.debug("Requesting numTargetContainers {}, in use instances count is 
{}, container map size is {}",
-        yarnContainerRequestBundle.getTotalContainers(), inUseInstances, 
this.containerMap.size());
+    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
+        yarnContainerRequestBundle.getTotalContainers(), 
inUseInstances.size(), this.containerMap.size());
     int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
     // YARN can allocate more than the requested number of containers, compute 
additional allocations and deallocations
     // based on the max of the requested and actual allocated counts
-    int numAllocatedContainers = this.containerMap.size();
+    // Represents the number of containers allocated for across all helix tags
+    int totalAllocatedContainers = this.containerMap.size();
 
     // Request additional containers if the desired count is higher than the 
max of the current allocation or previously
     // requested amount. Note that there may be in-flight or additional 
allocations after numContainers has been computed
@@ -474,11 +478,19 @@ public class YarnService extends AbstractIdleService {
     for (Map.Entry<String, Integer> entry : 
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
       String currentHelixTag = entry.getKey();
       int desiredContainerCount = entry.getValue();
+      Resource resourceForHelixTag = 
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag);
+
       // Calculate requested container count based on adding allocated count 
and outstanding ContainerRequests in Yarn
-      int requestedContainerCount = 
allocatedContainerCountMap.getOrDefault(currentHelixTag, 0)
-          + 
getMatchingRequestsCount(yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
-      for(; requestedContainerCount < desiredContainerCount; 
requestedContainerCount++) {
-        requestContainer(Optional.absent(), 
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
+      allocatedContainerCountMap.putIfAbsent(currentHelixTag, new 
AtomicInteger(0));
+      int allocatedContainersForHelixTag = 
allocatedContainerCountMap.get(currentHelixTag).get();
+      int outstandingContainerRequests = 
getMatchingRequestsCount(resourceForHelixTag);
+      int requestedContainerCount = allocatedContainersForHelixTag + 
outstandingContainerRequests;
+      int numContainersNeeded = desiredContainerCount - 
requestedContainerCount;
+      LOGGER.info("Container counts for helixTag={} (allocatedContainers={}, 
outstandingContainerRequests={}, desiredContainerCount={}, 
numContainersNeeded={})",
+          currentHelixTag, allocatedContainersForHelixTag, 
outstandingContainerRequests, desiredContainerCount, numContainersNeeded);
+
+      if (numContainersNeeded > 0) {
+        requestContainers(numContainersNeeded, resourceForHelixTag);
       }
     }
 
@@ -486,11 +498,12 @@ public class YarnService extends AbstractIdleService {
     // This is based on the currently allocated amount since containers may 
still be in the process of being allocated
     // and assigned work. Resizing based on numRequestedContainers at this 
point may release a container right before
     // or soon after it is assigned work.
-    if (numTargetContainers < numAllocatedContainers) {
-      LOGGER.debug("Shrinking number of containers by {}", 
(numAllocatedContainers - numTargetContainers));
-
+    if (numTargetContainers < totalAllocatedContainers) {
       List<Container> containersToRelease = new ArrayList<>();
-      int numToShutdown = numAllocatedContainers - numTargetContainers;
+      int numToShutdown = totalAllocatedContainers - numTargetContainers;
+
+      LOGGER.info("Shrinking number of containers by {} because 
numTargetContainers < totalAllocatedContainers ({} < {})",
+          numToShutdown, numTargetContainers, totalAllocatedContainers);
 
       // Look for eligible containers to release. If a container is in use 
then it is not released.
       for (Map.Entry<ContainerId, ContainerInfo> entry : 
this.containerMap.entrySet()) {
@@ -504,7 +517,7 @@ public class YarnService extends AbstractIdleService {
         }
       }
 
-      LOGGER.debug("Shutting down containers {}", containersToRelease);
+      LOGGER.info("Shutting down {} containers. containersToRelease={}", 
containersToRelease.size(), containersToRelease);
 
       this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
     }
@@ -527,6 +540,18 @@ public class YarnService extends AbstractIdleService {
     requestContainer(preferredNode, desiredResource);
   }
 
+  /**
+   * Request {@param numContainers} from yarn with the specified resource. 
Resources will be allocated without a preferred
+   * node
+   * @param numContainers
+   * @param resource
+   */
+  private void requestContainers(int numContainers, Resource resource) {
+    LOGGER.info("Requesting {} containers with resource={}", numContainers, 
resource);
+    IntStream.range(0, numContainers)
+        .forEach(i -> requestContainer(Optional.absent(), resource));
+  }
+
   // Request containers with specific resource requirement
   private void requestContainer(Optional<String> preferredNode, Resource 
resource) {
     // Fail if Yarn cannot meet container resource requirements
@@ -690,7 +715,7 @@ public class YarnService extends AbstractIdleService {
     //containerId missing from the containersMap.
     String completedInstanceName = completedContainerInfo == null?  
UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
     String helixTag = completedContainerInfo == null ? helixInstanceTags : 
completedContainerInfo.getHelixTag();
-    allocatedContainerCountMap.put(helixTag, 
allocatedContainerCountMap.get(helixTag) - 1);
+    allocatedContainerCountMap.get(helixTag).decrementAndGet();
 
     LOGGER.info(String.format("Container %s running Helix instance %s with tag 
%s has completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, helixTag, 
containerStatus.getExitStatus()));
@@ -789,11 +814,30 @@ public class YarnService extends AbstractIdleService {
 
   /**
    * Get the number of matching container requests for the specified resource 
memory and cores.
+   * Due to YARN-1902 and YARN-660, this API is not 100% accurate. {@link 
AMRMClientCallbackHandler#onContainersAllocated(List)}
+   * contains logic for best effort clean up of requests, and the resource 
tend to match the allocated container. So in practice the count is pretty 
accurate.
+   *
+   * This API call gets the count of container requests for containers that 
are > resource if there is no request with the exact same resource
+   * The RM can return containers that are larger (because of normalization 
etc).
+   * Container may be larger by memory or cpu (e.g. container (1000M, 3cpu) 
can fit request (1000M, 1cpu) or request (500M, 3cpu).
+   *
+   * Thankfully since each helix tag / resource has a different priority, 
matching requests for one helix tag / resource
+   * have complete isolation from another helix tag / resource
    */
   private int getMatchingRequestsCount(Resource resource) {
-    int priorityNum = resourcePriorityMap.getOrDefault(resource.toString(), 0);
+    Integer priorityNum = resourcePriorityMap.get(resource.toString());
+    if (priorityNum == null) { // request has never been made with this 
resource
+      return 0;
+    }
     Priority priority = Priority.newInstance(priorityNum);
-    return getAmrmClientAsync().getMatchingRequests(priority, 
ResourceRequest.ANY, resource).size();
+
+    // Each collection in the list represents a set of requests with each with 
the same resource requirement.
+    // The reason for differing resources can be due to normalization
+    List<? extends Collection<AMRMClient.ContainerRequest>> 
outstandingRequests = getAmrmClientAsync().getMatchingRequests(priority, 
ResourceRequest.ANY, resource);
+    return outstandingRequests == null ? 0 : outstandingRequests.stream()
+        .filter(Objects::nonNull)
+        .mapToInt(Collection::size)
+        .sum();
   }
 
   /**
@@ -845,8 +889,6 @@ public class YarnService extends AbstractIdleService {
               instanceName = null;
             }
           }
-          allocatedContainerCountMap.put(containerHelixTag,
-              allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) + 
1);
         }
 
         if (Strings.isNullOrEmpty(instanceName)) {
@@ -857,12 +899,14 @@ public class YarnService extends AbstractIdleService {
 
         ContainerInfo containerInfo = new ContainerInfo(container, 
instanceName, containerHelixTag);
         containerMap.put(container.getId(), containerInfo);
+        allocatedContainerCountMap.putIfAbsent(containerHelixTag, new 
AtomicInteger(0));
+        allocatedContainerCountMap.get(containerHelixTag).incrementAndGet();
 
-        // Find matching requests and remove the request to reduce the chance 
that a subsequent request
-        // will request extra containers. YARN does not have a delta request 
API and the requests are not
-        // cleaned up automatically.
+        // Find matching requests and remove the request (YARN-660). We the 
scheduler are responsible
+        // for cleaning up requests after allocation based on the design in 
the described ticket.
+        // YARN does not have a delta request API and the requests are not 
cleaned up automatically.
         // Try finding a match first with the host as the resource name then 
fall back to any resource match.
-        // See YARN-1902.
+        // Also see YARN-1902. Container count will explode without this logic 
for removing container requests.
         List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests = amrmClientAsync
             .getMatchingRequests(container.getPriority(), 
container.getNodeHttpAddress(), container.getResource());
 
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index c01d91196..e143a4b35 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -316,6 +316,7 @@ public class YarnServiceTest {
       
Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
 
       
Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      
Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
       
Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
       
Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
       
Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);

Reply via email to