This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7aad1f9ad Calculate requested container count based on adding
allocated count and outstanding ContainerRequests in Yarn (#3524)
7aad1f9ad is described below
commit 7aad1f9ade24ad05ab797e150b6c03a03d925c04
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Tue Jun 21 13:10:49 2022 -0700
Calculate requested container count based on adding allocated count and
outstanding ContainerRequests in Yarn (#3524)
---
.../java/org/apache/gobblin/yarn/YarnService.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index a81960fdf..2f8a06c88 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -191,8 +191,6 @@ public class YarnService extends AbstractIdleService {
// instance names get picked up when replacement containers get allocated.
private final Set<String> unusedHelixInstanceNames =
ConcurrentHashMap.newKeySet();
- // The map from helix tag to requested container count
- private final Map<String, Integer> requestedContainerCountMap =
Maps.newConcurrentMap();
// The map from helix tag to allocated container count
private final Map<String, Integer> allocatedContainerCountMap =
Maps.newConcurrentMap();
@@ -444,11 +442,12 @@ public class YarnService extends AbstractIdleService {
for (Map.Entry<String, Integer> entry :
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
String currentHelixTag = entry.getKey();
int desiredContainerCount = entry.getValue();
- int requestedContainerCount =
requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
+ // Calculate requested container count based on adding allocated count
and outstanding ContainerRequests in Yarn
+ int requestedContainerCount =
allocatedContainerCountMap.getOrDefault(currentHelixTag, 0)
+ +
getMatchingRequestsCount(yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
for(; requestedContainerCount < desiredContainerCount;
requestedContainerCount++) {
requestContainer(Optional.absent(),
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
}
- requestedContainerCountMap.put(currentHelixTag, requestedContainerCount);
}
// If the total desired is lower than the currently allocated amount then
release free containers.
@@ -466,8 +465,6 @@ public class YarnService extends AbstractIdleService {
ContainerInfo containerInfo = entry.getValue();
if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
containersToRelease.add(containerInfo.getContainer());
- requestedContainerCountMap.put(containerInfo.getHelixTag(),
- requestedContainerCountMap.get(containerInfo.getHelixTag()) - 1);
}
if (containersToRelease.size() == numToShutdown) {
@@ -480,8 +477,8 @@ public class YarnService extends AbstractIdleService {
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
this.yarnContainerRequest = yarnContainerRequestBundle;
- LOGGER.info("Current tag-container being requested:{}, tag-container
allocated: {}",
- this.requestedContainerCountMap, this.allocatedContainerCountMap);
+ LOGGER.info("Current tag-container desired count:{}, tag-container
allocated: {}",
+ yarnContainerRequestBundle.getHelixTagContainerCountMap(),
this.allocatedContainerCountMap);
}
// Request initial containers with default resource and helix tag
@@ -663,8 +660,8 @@ public class YarnService extends AbstractIdleService {
String helixTag = completedContainerInfo == null ? helixInstanceTags :
completedContainerInfo.getHelixTag();
allocatedContainerCountMap.put(helixTag,
allocatedContainerCountMap.get(helixTag) - 1);
- LOGGER.info(String.format("Container %s running Helix instance %s has
completed with exit status %d",
- containerStatus.getContainerId(), completedInstanceName,
containerStatus.getExitStatus()));
+ LOGGER.info(String.format("Container %s running Helix instance %s with tag
%s has completed with exit status %d",
+ containerStatus.getContainerId(), completedInstanceName, helixTag,
containerStatus.getExitStatus()));
if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
LOGGER.info(String.format("Received the following diagnostics
information for container %s: %s",
@@ -758,6 +755,15 @@ public class YarnService extends AbstractIdleService {
return eventMetadataBuilder;
}
+ /**
+ * Get the number of matching container requests for the specified resource
memory and cores.
+ */
+ private int getMatchingRequestsCount(Resource resource) {
+ int priorityNum = resourcePriorityMap.getOrDefault(resource.toString(), 0);
+ Priority priority = Priority.newInstance(priorityNum);
+ return getAmrmClientAsync().getMatchingRequests(priority,
ResourceRequest.ANY, resource).size();
+ }
+
/**
* A custom implementation of {@link AMRMClientAsync.CallbackHandler}.
*/