[ https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=955661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955661 ]
ASF GitHub Bot logged work on GOBBLIN-2189: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Feb/25 18:49 Start Date: 05/Feb/25 18:49 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4092: URL: https://github.com/apache/gobblin/pull/4092#discussion_r1943436413 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + * <p> + * A container completes in either of the following conditions: + * <ol> + * <li> The container gets stopped by the ApplicationMaster. </li> + * <li> Some error happens in the container and caused the container to exit </li> + * <li> The container gets preempted by the ResourceManager </li> + * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> + * </ol> + * A replacement container is needed in all except the first case. + * </p> + */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + // Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion() + // is called before onContainersAllocated(), resulting in the containerId missing from the containersMap. + // We use removedContainerIds to remember these containers and remove them from containerMap later + // when we call reviseWorkforcePlanAndRequestNewContainers method + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", + completedContainerId); + this.removedContainerIds.add(completedContainerId); + return; + } + + log.info("Container {} running profile {} completed with exit status {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()); + + if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { + log.info("Container {} running profile {} completed with diagnostics: {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()); + } + + if (this.shutdownInProgress) { + log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); + return; + } + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + + switch (containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + handleAbortedContainer(completedContainerId, completedContainerInfo); + break; + case(ContainerExitStatus.PREEMPTED): + log.info("Container {} for profile {} preempted, starting to launching a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(workerProfile, 1); + break; + case(GENERAL_OOM_EXIT_STATUS_CODE): + case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): + case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): + handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); + break; + case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed + log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo); + break; + default: + break; Review Comment: let's add a no-op for `KILLED_AFTER_APP_COMPLETION` & `SUCCESS` and add a log statement for default case, since there are other statuses like `DISKS_FAILED`, `KILLED_BY_CONTAINER_SCHEDULER` also in ContainerExitStatus, so having a log would be useful in case that also needs to be handled in future ``` case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION: case ContainerExitStatus.SUCCESS: break; default: // log any other unhandled completion code log.warn("Container {} exited with unhandled status code {}. ContainerInfo: {}", completedContainerId, containerStatus.getExitStatus(), completedContainerInfo); break; ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -71,29 +158,101 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling if (CollectionUtils.isEmpty(scalingDirectives)) { return; } + + // Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated() + Iterator<ContainerId> iterator = removedContainerIds.iterator(); + while (iterator.hasNext()) { + ContainerId containerId = iterator.next(); + ContainerInfo containerInfo = this.containerMap.remove(containerId); + if (containerInfo != null) { + WorkerProfile workerProfile = containerInfo.getWorkerProfile(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, + System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + iterator.remove(); + } + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { deltas.getPerProfileDeltas().forEach(profileDelta -> { - if (profileDelta.getDelta() > 0) { // scale up! - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - int delta = profileDelta.getDelta(); + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int delta = profileDelta.getDelta(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + if (delta > 0) { // scale up! log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (profileDelta.getDelta() < 0) { // scale down! - // TODO: Decide how to handle negative deltas - log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", - profileDelta.getProfile().getName(), profileDelta.getDelta()); + } else if (delta < 0) { // scale down! + log.info("Releasing {} containers for profile {} having currently {} containers", -delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + releaseContainersForWorkerProfile(profileName, delta); + // update our staffing after releasing containers + int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); + this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } + private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + // Case 1 : Container release requested while scaling down + if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { + log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); + this.releasedContainerCache.invalidate(completedContainerId); + return; + } + + // Case 2 : Container release was not requested, we need to request a replacement container + log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); + } + + private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + + List<ScalingDirective> scalingDirectives = new ArrayList<>(); + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + // Update the current staffing to reflect the container that exited with OOM + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + + // Request a replacement container + int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; Review Comment: If `currContainerMemoryMbs` is 36 GB, then `newContainerMemoryMbs` would be 72GB(considering 2 as multiplier). However, that is higher than 64GB(`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`) and in this case we would be skipping the container request entirely while OOM could have been handled by launching 64 GB container. We should launch the last container with `MAX_REPLACEMENT_CONTAINER_MEMORY_MBS` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + * <p> + * A container completes in either of the following conditions: + * <ol> + * <li> The container gets stopped by the ApplicationMaster. </li> + * <li> Some error happens in the container and caused the container to exit </li> + * <li> The container gets preempted by the ResourceManager </li> + * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> + * </ol> + * A replacement container is needed in all except the first case. + * </p> + */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { Review Comment: this method is not `synchronized`, but it removes entries from containerMap and modifies removedContainerIds, whereas, `reviseWorkforcePlanAndRequestNewContainers` is synchronized and also modifies both containerMap and removedContainerIds. If request for `handleContainerCompletion` interleaves with a call to `reviseWorkforcePlanAndRequestNewContainers`, race conditions/inconsistent state can happen ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -411,10 +330,24 @@ protected synchronized void requestContainersForWorkerProfile(WorkerProfile work requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId)); } - private void requestContainer(Optional<String> preferredNode, Optional<Resource> resourceOptional) { - Resource desiredResource = resourceOptional.or(Resource.newInstance( - this.requestedContainerMemoryMbs, this.requestedContainerCores)); - requestContainer(preferredNode, desiredResource, Optional.absent()); + protected synchronized void releaseContainersForWorkerProfile(String profileName, int numContainers) { + Iterator<Map.Entry<ContainerId, ContainerInfo>> containerMapIterator = this.containerMap.entrySet().iterator(); + while (containerMapIterator.hasNext() && numContainers > 0) { + Map.Entry<ContainerId, ContainerInfo> entry = containerMapIterator.next(); + if (entry.getValue().getWorkerProfile().getName().equals(profileName)) { + ContainerId containerId = entry.getKey(); + LOGGER.info("Releasing container {} running profile {}", containerId, WorkforceProfiles.renderName(profileName)); + // Record that this container was explicitly released so that a new one is not spawned to replace it + // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() + // can check for the container id and skip spawning a replacement container. + // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently + // with the release call. So in some cases a replacement container may have already been spawned before + // the container is put into the black list. + this.releasedContainerCache.put(containerId, ""); + this.amrmClientAsync.releaseAssignedContainer(containerId); + numContainers--; + } + } Review Comment: please add a log here for how many containers were intended to be released and how many were actually released from this method ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + * <p> + * A container completes in either of the following conditions: + * <ol> + * <li> The container gets stopped by the ApplicationMaster. </li> + * <li> Some error happens in the container and caused the container to exit </li> + * <li> The container gets preempted by the ResourceManager </li> + * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> + * </ol> + * A replacement container is needed in all except the first case. + * </p> + */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + // Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion() + // is called before onContainersAllocated(), resulting in the containerId missing from the containersMap. + // We use removedContainerIds to remember these containers and remove them from containerMap later + // when we call reviseWorkforcePlanAndRequestNewContainers method + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", + completedContainerId); + this.removedContainerIds.add(completedContainerId); + return; + } + + log.info("Container {} running profile {} completed with exit status {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()); + + if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { + log.info("Container {} running profile {} completed with diagnostics: {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()); + } + + if (this.shutdownInProgress) { + log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); + return; + } + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + + switch (containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + handleAbortedContainer(completedContainerId, completedContainerInfo); + break; + case(ContainerExitStatus.PREEMPTED): + log.info("Container {} for profile {} preempted, starting to launching a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(workerProfile, 1); + break; + case(GENERAL_OOM_EXIT_STATUS_CODE): + case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): + case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): + handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); + break; + case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed Review Comment: use constant `private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1;` Issue Time Tracking ------------------- Worklog Id: (was: 955661) Time Spent: 1h 50m (was: 1h 40m) > Implement ContainerCompletion callback in DynamicScalingYarnService > ------------------------------------------------------------------- > > Key: GOBBLIN-2189 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2189 > Project: Apache Gobblin > Issue Type: Improvement > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > DynamicScalingYarnService currently doesn't handle scaling down containers > and neither does anything if container is killed abruptly or goes OOM. So to > handle this scenario containerCompletion callback should be implemented to > launch the replacement containers and also scaling down handling should be > done. -- This message was sent by Atlassian Jira (v8.20.10#820010)