This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 013a85e3b [GOBBLIN-1728] Fix YarnService incorrect container
allocation behavior (#3586)
013a85e3b is described below
commit 013a85e3b04b986cd4ca16458b246bac5ed949fa
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Oct 24 14:30:34 2022 -0700
[GOBBLIN-1728] Fix YarnService incorrect container allocation behavior
(#3586)
* [GOBBLIN-1728] Fix YarnService incorrect container allocation behavior
Due to race condition and incorrect API call, the yarn service both
allocates too many containers and in
rare cases has a discrepancy between the number of containers state due to
race conditions.
* Log improvement for container counts by helix tag
* Address comments and add short-circuit check if there is no match for
priority for a given resource. This occurs during the initial request of
containers
Change tested for another 6 hours with ~256 high volume topic partitions
---
.../gobblin/yarn/YarnAutoScalingManager.java | 2 +-
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 8 +-
.../java/org/apache/gobblin/yarn/YarnService.java | 92 ++++++++++++++++------
.../org/apache/gobblin/yarn/YarnServiceTest.java | 1 +
4 files changed, 75 insertions(+), 28 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index e7ab082e2..7c4da8fd8 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -359,4 +359,4 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 1ecbd4510..72f9cc336 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -23,6 +23,7 @@ import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -240,12 +241,13 @@ public class YarnHelixUtils {
* @return helix tag that this container should be assigned with, if null
means need to use the default
*/
public static String findHelixTagForContainer(Container container,
- Map<String, Integer> helixTagAllocatedContainerCount,
YarnContainerRequestBundle requestedYarnContainer) {
+ Map<String, AtomicInteger> helixTagAllocatedContainerCount,
YarnContainerRequestBundle requestedYarnContainer) {
String foundTag = null;
if(requestedYarnContainer != null &&
requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString()))
{
for (String tag :
requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString()))
{
int desiredCount =
requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
- int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag,
0);
+ helixTagAllocatedContainerCount.putIfAbsent(tag, new AtomicInteger(0));
+ int allocatedCount = helixTagAllocatedContainerCount.get(tag).get();
foundTag = tag;
if(allocatedCount < desiredCount) {
return foundTag;
@@ -254,4 +256,4 @@ public class YarnHelixUtils {
}
return foundTag;
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 8a12bb0ce..f899bf74e 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -27,12 +28,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
import lombok.AllArgsConstructor;
import org.apache.hadoop.conf.Configuration;
@@ -111,7 +114,6 @@ import org.apache.gobblin.yarn.event.NewContainerRequest;
import static
org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
-
/**
* This class is responsible for all Yarn-related stuffs including
ApplicationMaster registration,
* ApplicationMaster un-registration, Yarn container management, etc.
@@ -194,7 +196,7 @@ public class YarnService extends AbstractIdleService {
private final Set<String> unusedHelixInstanceNames =
ConcurrentHashMap.newKeySet();
// The map from helix tag to allocated container count
- private final Map<String, Integer> allocatedContainerCountMap =
Maps.newConcurrentMap();
+ private final ConcurrentMap<String, AtomicInteger>
allocatedContainerCountMap = Maps.newConcurrentMap();
private final boolean isPurgingOfflineHelixInstancesEnabled;
private final long helixPurgeLaggingThresholdMs;
@@ -395,7 +397,8 @@ public class YarnService extends AbstractIdleService {
synchronized (this.allContainersStopped) {
try {
// Wait 5 minutes for the containers to stop
- this.allContainersStopped.wait(5 * 60 * 1000);
+ Duration waitTimeout = Duration.ofMinutes(5);
+ this.allContainersStopped.wait(waitTimeout.toMillis());
LOGGER.info("All of the containers have been stopped");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -461,12 +464,13 @@ public class YarnService extends AbstractIdleService {
* @param inUseInstances a set of in use instances
*/
public synchronized void
requestTargetNumberOfContainers(YarnContainerRequestBundle
yarnContainerRequestBundle, Set<String> inUseInstances) {
- LOGGER.debug("Requesting numTargetContainers {}, in use instances count is
{}, container map size is {}",
- yarnContainerRequestBundle.getTotalContainers(), inUseInstances,
this.containerMap.size());
+ LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances
count is {}, container map size is {}",
+ yarnContainerRequestBundle.getTotalContainers(),
inUseInstances.size(), this.containerMap.size());
int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
// YARN can allocate more than the requested number of containers, compute
additional allocations and deallocations
// based on the max of the requested and actual allocated counts
- int numAllocatedContainers = this.containerMap.size();
+ // Represents the number of containers allocated for across all helix tags
+ int totalAllocatedContainers = this.containerMap.size();
// Request additional containers if the desired count is higher than the
max of the current allocation or previously
// requested amount. Note that there may be in-flight or additional
allocations after numContainers has been computed
@@ -474,11 +478,19 @@ public class YarnService extends AbstractIdleService {
for (Map.Entry<String, Integer> entry :
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
String currentHelixTag = entry.getKey();
int desiredContainerCount = entry.getValue();
+ Resource resourceForHelixTag =
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag);
+
// Calculate requested container count based on adding allocated count
and outstanding ContainerRequests in Yarn
- int requestedContainerCount =
allocatedContainerCountMap.getOrDefault(currentHelixTag, 0)
- +
getMatchingRequestsCount(yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
- for(; requestedContainerCount < desiredContainerCount;
requestedContainerCount++) {
- requestContainer(Optional.absent(),
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
+ allocatedContainerCountMap.putIfAbsent(currentHelixTag, new
AtomicInteger(0));
+ int allocatedContainersForHelixTag =
allocatedContainerCountMap.get(currentHelixTag).get();
+ int outstandingContainerRequests =
getMatchingRequestsCount(resourceForHelixTag);
+ int requestedContainerCount = allocatedContainersForHelixTag +
outstandingContainerRequests;
+ int numContainersNeeded = desiredContainerCount -
requestedContainerCount;
+ LOGGER.info("Container counts for helixTag={} (allocatedContainers={},
outstandingContainerRequests={}, desiredContainerCount={},
numContainersNeeded={})",
+ currentHelixTag, allocatedContainersForHelixTag,
outstandingContainerRequests, desiredContainerCount, numContainersNeeded);
+
+ if (numContainersNeeded > 0) {
+ requestContainers(numContainersNeeded, resourceForHelixTag);
}
}
@@ -486,11 +498,12 @@ public class YarnService extends AbstractIdleService {
// This is based on the currently allocated amount since containers may
still be in the process of being allocated
// and assigned work. Resizing based on numRequestedContainers at this
point may release a container right before
// or soon after it is assigned work.
- if (numTargetContainers < numAllocatedContainers) {
- LOGGER.debug("Shrinking number of containers by {}",
(numAllocatedContainers - numTargetContainers));
-
+ if (numTargetContainers < totalAllocatedContainers) {
List<Container> containersToRelease = new ArrayList<>();
- int numToShutdown = numAllocatedContainers - numTargetContainers;
+ int numToShutdown = totalAllocatedContainers - numTargetContainers;
+
+ LOGGER.info("Shrinking number of containers by {} because
numTargetContainers < totalAllocatedContainers ({} < {})",
+ numToShutdown, numTargetContainers, totalAllocatedContainers);
// Look for eligible containers to release. If a container is in use
then it is not released.
for (Map.Entry<ContainerId, ContainerInfo> entry :
this.containerMap.entrySet()) {
@@ -504,7 +517,7 @@ public class YarnService extends AbstractIdleService {
}
}
- LOGGER.debug("Shutting down containers {}", containersToRelease);
+ LOGGER.info("Shutting down {} containers. containersToRelease={}",
containersToRelease.size(), containersToRelease);
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
@@ -527,6 +540,18 @@ public class YarnService extends AbstractIdleService {
requestContainer(preferredNode, desiredResource);
}
+ /**
+ * Request {@param numContainers} from yarn with the specified resource.
Resources will be allocated without a preferred
+ * node
+ * @param numContainers
+ * @param resource
+ */
+ private void requestContainers(int numContainers, Resource resource) {
+ LOGGER.info("Requesting {} containers with resource={}", numContainers,
resource);
+ IntStream.range(0, numContainers)
+ .forEach(i -> requestContainer(Optional.absent(), resource));
+ }
+
// Request containers with specific resource requirement
private void requestContainer(Optional<String> preferredNode, Resource
resource) {
// Fail if Yarn cannot meet container resource requirements
@@ -690,7 +715,7 @@ public class YarnService extends AbstractIdleService {
//containerId missing from the containersMap.
String completedInstanceName = completedContainerInfo == null?
UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
String helixTag = completedContainerInfo == null ? helixInstanceTags :
completedContainerInfo.getHelixTag();
- allocatedContainerCountMap.put(helixTag,
allocatedContainerCountMap.get(helixTag) - 1);
+ allocatedContainerCountMap.get(helixTag).decrementAndGet();
LOGGER.info(String.format("Container %s running Helix instance %s with tag
%s has completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName, helixTag,
containerStatus.getExitStatus()));
@@ -789,11 +814,30 @@ public class YarnService extends AbstractIdleService {
/**
* Get the number of matching container requests for the specified resource
memory and cores.
+ * Due to YARN-1902 and YARN-660, this API is not 100% accurate. {@link
AMRMClientCallbackHandler#onContainersAllocated(List)}
+ * contains logic for best effort clean up of requests, and the resource
tend to match the allocated container. So in practice the count is pretty
accurate.
+ *
+ * This API call gets the count of container requests for containers that
are > resource if there is no request with the exact same resource
+ * The RM can return containers that are larger (because of normalization
etc).
+ * Container may be larger by memory or cpu (e.g. container (1000M, 3cpu)
can fit request (1000M, 1cpu) or request (500M, 3cpu).
+ *
+ * Thankfully since each helix tag / resource has a different priority,
matching requests for one helix tag / resource
+ * have complete isolation from another helix tag / resource
*/
private int getMatchingRequestsCount(Resource resource) {
- int priorityNum = resourcePriorityMap.getOrDefault(resource.toString(), 0);
+ Integer priorityNum = resourcePriorityMap.get(resource.toString());
+ if (priorityNum == null) { // request has never been made with this
resource
+ return 0;
+ }
Priority priority = Priority.newInstance(priorityNum);
- return getAmrmClientAsync().getMatchingRequests(priority,
ResourceRequest.ANY, resource).size();
+
+ // Each collection in the list represents a set of requests with each with
the same resource requirement.
+ // The reason for differing resources can be due to normalization
+ List<? extends Collection<AMRMClient.ContainerRequest>>
outstandingRequests = getAmrmClientAsync().getMatchingRequests(priority,
ResourceRequest.ANY, resource);
+ return outstandingRequests == null ? 0 : outstandingRequests.stream()
+ .filter(Objects::nonNull)
+ .mapToInt(Collection::size)
+ .sum();
}
/**
@@ -845,8 +889,6 @@ public class YarnService extends AbstractIdleService {
instanceName = null;
}
}
- allocatedContainerCountMap.put(containerHelixTag,
- allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) +
1);
}
if (Strings.isNullOrEmpty(instanceName)) {
@@ -857,12 +899,14 @@ public class YarnService extends AbstractIdleService {
ContainerInfo containerInfo = new ContainerInfo(container,
instanceName, containerHelixTag);
containerMap.put(container.getId(), containerInfo);
+ allocatedContainerCountMap.putIfAbsent(containerHelixTag, new
AtomicInteger(0));
+ allocatedContainerCountMap.get(containerHelixTag).incrementAndGet();
- // Find matching requests and remove the request to reduce the chance
that a subsequent request
- // will request extra containers. YARN does not have a delta request
API and the requests are not
- // cleaned up automatically.
+ // Find matching requests and remove the request (YARN-660). We the
scheduler are responsible
+ // for cleaning up requests after allocation based on the design in
the described ticket.
+ // YARN does not have a delta request API and the requests are not
cleaned up automatically.
// Try finding a match first with the host as the resource name then
fall back to any resource match.
- // See YARN-1902.
+ // Also see YARN-1902. Container count will explode without this logic
for removing container requests.
List<? extends Collection<AMRMClient.ContainerRequest>>
matchingRequests = amrmClientAsync
.getMatchingRequests(container.getPriority(),
container.getNodeHttpAddress(), container.getResource());
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index c01d91196..e143a4b35 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -316,6 +316,7 @@ public class YarnServiceTest {
Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+
Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);