[ 
https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=955661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955661
 ]

ASF GitHub Bot logged work on GOBBLIN-2189:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Feb/25 18:49
            Start Date: 05/Feb/25 18:49
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1943436413


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
     requestNewContainersForStaffingDeltas(deltas);
   }
 
+  /**
+   * Handle the completion of a container. A new container will be requested 
to replace the one
+   * that just exited depending on the exit status.
+   * <p>
+   * A container completes in either of the following conditions:
+   * <ol>
+   *   <li> The container gets stopped by the ApplicationMaster. </li>
+   *   <li> Some error happens in the container and caused the container to 
exit </li>
+   *   <li> The container gets preempted by the ResourceManager </li>
+   *   <li> The container gets killed due to some reason, for example, if it 
runs over the allowed amount of virtual or physical memory </li>
+   * </ol>
+   * A replacement container is needed in all except the first case.
+   * </p>
+   */
+  @Override
+  protected void handleContainerCompletion(ContainerStatus containerStatus) {
+    ContainerId completedContainerId = containerStatus.getContainerId();
+    ContainerInfo completedContainerInfo = 
this.containerMap.remove(completedContainerId);
+
+    // Because callbacks are processed asynchronously, we might encounter 
situations where handleContainerCompletion()
+    // is called before onContainersAllocated(), resulting in the containerId 
missing from the containersMap.
+    // We use removedContainerIds to remember these containers and remove them 
from containerMap later
+    // when we call reviseWorkforcePlanAndRequestNewContainers method
+    if (completedContainerInfo == null) {
+      log.warn("Container {} not found in containerMap. This container 
onContainersCompleted() likely called before onContainersAllocated()",
+          completedContainerId);
+      this.removedContainerIds.add(completedContainerId);
+      return;
+    }
+
+    log.info("Container {} running profile {} completed with exit status {}",
+        completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getExitStatus());
+
+    if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+      log.info("Container {} running profile {} completed with diagnostics: 
{}",
+          completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getDiagnostics());
+    }
+
+    if (this.shutdownInProgress) {
+      log.info("Ignoring container completion for container {} as shutdown is 
in progress", completedContainerId);
+      return;
+    }
+
+    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+    switch (containerStatus.getExitStatus()) {
+      case(ContainerExitStatus.ABORTED):
+        handleAbortedContainer(completedContainerId, completedContainerInfo);
+        break;
+      case(ContainerExitStatus.PREEMPTED):
+        log.info("Container {} for profile {} preempted, starting to launching 
a replacement container",
+            completedContainerId, 
completedContainerInfo.getWorkerProfileName());
+        requestContainersForWorkerProfile(workerProfile, 1);
+        break;
+      case(GENERAL_OOM_EXIT_STATUS_CODE):
+      case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+      case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+        handleContainerExitedWithOOM(completedContainerId, 
completedContainerInfo);
+        break;
+      case(1): // Same as linux exit status 1 Often occurs when 
launch_container.sh failed
+        log.info("Exit status 1.CompletedContainerInfo = {}", 
completedContainerInfo);
+        break;
+      default:
+        break;

Review Comment:
   let's add a no-op for `KILLED_AFTER_APP_COMPLETION` & `SUCCESS` and add a 
log statement for default case, since there are other statuses like 
`DISKS_FAILED`, `KILLED_BY_CONTAINER_SCHEDULER` also in ContainerExitStatus, so 
having a log would be useful in case that also needs to be handled in future
   
   ```
     case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
     case ContainerExitStatus.SUCCESS:
       break;
   
     default:
       // log any other unhandled completion code
       log.warn("Container {} exited with unhandled status code {}. 
ContainerInfo: {}",
           completedContainerId, containerStatus.getExitStatus(), 
completedContainerInfo);
       break;
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -71,29 +158,101 @@ public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
     if (CollectionUtils.isEmpty(scalingDirectives)) {
       return;
     }
+
+    // Correct the actualWorkforceStaffing in case of 
handleContainerCompletion() getting called before onContainersAllocated()
+    Iterator<ContainerId> iterator = removedContainerIds.iterator();
+    while (iterator.hasNext()) {
+      ContainerId containerId = iterator.next();
+      ContainerInfo containerInfo = this.containerMap.remove(containerId);
+      if (containerInfo != null) {
+        WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+        if (currNumContainers > 0) {
+          this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1,
+              System.currentTimeMillis());
+          // Add a scaling directive so that workforcePlan have uptodate 
setPoints for the workerProfile,
+          // otherwise extra containers will be requested when calculating 
deltas
+          scalingDirectives.add(new ScalingDirective(workerProfile.getName(), 
currNumContainers - 1, System.currentTimeMillis()));
+        }
+        iterator.remove();
+      }
+    }
+
     this.workforcePlan.reviseWhenNewer(scalingDirectives);
     StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
     requestNewContainersForStaffingDeltas(deltas);
   }
 
   private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
     deltas.getPerProfileDeltas().forEach(profileDelta -> {
-      if (profileDelta.getDelta() > 0) { // scale up!
-        WorkerProfile workerProfile = profileDelta.getProfile();
-        String profileName = workerProfile.getName();
-        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
-        int delta = profileDelta.getDelta();
+      WorkerProfile workerProfile = profileDelta.getProfile();
+      String profileName = workerProfile.getName();
+      int delta = profileDelta.getDelta();
+      int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+      if (delta > 0) { // scale up!
         log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
             WorkforceProfiles.renderName(profileName), currNumContainers);
         requestContainersForWorkerProfile(workerProfile, delta);
         // update our staffing after requesting new containers
         this.actualWorkforceStaffing.reviseStaffing(profileName, 
currNumContainers + delta, System.currentTimeMillis());
-      } else if (profileDelta.getDelta() < 0) { // scale down!
-        // TODO: Decide how to handle negative deltas
-        log.warn("Handling of Negative delta is not supported yet : Profile {} 
delta {} ",
-            profileDelta.getProfile().getName(), profileDelta.getDelta());
+      } else if (delta < 0) { // scale down!
+        log.info("Releasing {} containers for profile {} having currently {} 
containers", -delta,
+            WorkforceProfiles.renderName(profileName), currNumContainers);
+        releaseContainersForWorkerProfile(profileName, delta);
+        // update our staffing after releasing containers
+        int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+        this.actualWorkforceStaffing.reviseStaffing(profileName, 
numContainersAfterRelease, System.currentTimeMillis());
       } // else, already at staffing plan (or at least have requested, so 
in-progress)
     });
   }
 
+  private void handleAbortedContainer(ContainerId completedContainerId, 
ContainerInfo completedContainerInfo) {
+    // Case 1 : Container release requested while scaling down
+    if (this.releasedContainerCache.getIfPresent(completedContainerId) != 
null) {
+      log.info("Container {} was released while downscaling for profile {}", 
completedContainerId, completedContainerInfo.getWorkerProfileName());
+      this.releasedContainerCache.invalidate(completedContainerId);
+      return;
+    }
+
+    // Case 2 : Container release was not requested, we need to request a 
replacement container
+    log.info("Container {} aborted for profile {}, starting to launch a 
replacement container", completedContainerId, 
completedContainerInfo.getWorkerProfileName());
+    
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+  }
+
+  private synchronized void handleContainerExitedWithOOM(ContainerId 
completedContainerId, ContainerInfo completedContainerInfo) {
+    log.info("Container {} for profile {} exited with OOM, starting to launch 
a replacement container",
+        completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+    List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+    // Update the current staffing to reflect the container that exited with 
OOM
+    int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+    if (currNumContainers > 0) {
+      this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1, System.currentTimeMillis());
+      // Add a scaling directive so that workforcePlan have uptodate setPoints 
for the workerProfile,
+      // otherwise extra containers will be requested when calculating deltas
+      scalingDirectives.add(new ScalingDirective(workerProfile.getName(), 
currNumContainers - 1, System.currentTimeMillis()));
+    }
+
+    // Request a replacement container
+    int currContainerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    int newContainerMemoryMbs = currContainerMemoryMbs * 
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;

Review Comment:
   If `currContainerMemoryMbs` is 36 GB, then `newContainerMemoryMbs` would be 
72GB(considering 2 as multiplier). However, that is higher than 
64GB(`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`) and in this case we would be 
skipping the container request entirely while OOM could have been handled by 
launching 64 GB container. We should launch the last container with 
`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
     requestNewContainersForStaffingDeltas(deltas);
   }
 
+  /**
+   * Handle the completion of a container. A new container will be requested 
to replace the one
+   * that just exited depending on the exit status.
+   * <p>
+   * A container completes in either of the following conditions:
+   * <ol>
+   *   <li> The container gets stopped by the ApplicationMaster. </li>
+   *   <li> Some error happens in the container and caused the container to 
exit </li>
+   *   <li> The container gets preempted by the ResourceManager </li>
+   *   <li> The container gets killed due to some reason, for example, if it 
runs over the allowed amount of virtual or physical memory </li>
+   * </ol>
+   * A replacement container is needed in all except the first case.
+   * </p>
+   */
+  @Override
+  protected void handleContainerCompletion(ContainerStatus containerStatus) {

Review Comment:
   this method is not `synchronized`, but it removes entries from containerMap 
and modifies removedContainerIds, whereas, 
`reviseWorkforcePlanAndRequestNewContainers` is synchronized and also modifies 
both containerMap and removedContainerIds. If request for 
`handleContainerCompletion` interleaves with a call to 
`reviseWorkforcePlanAndRequestNewContainers`, race conditions/inconsistent 
state can happen



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -411,10 +330,24 @@ protected synchronized void 
requestContainersForWorkerProfile(WorkerProfile work
     requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, 
containerCores), Optional.of(allocationRequestId));
   }
 
-  private void requestContainer(Optional<String> preferredNode, 
Optional<Resource> resourceOptional) {
-    Resource desiredResource = resourceOptional.or(Resource.newInstance(
-        this.requestedContainerMemoryMbs, this.requestedContainerCores));
-    requestContainer(preferredNode, desiredResource, Optional.absent());
+  protected synchronized void releaseContainersForWorkerProfile(String 
profileName, int numContainers) {
+    Iterator<Map.Entry<ContainerId, ContainerInfo>> containerMapIterator = 
this.containerMap.entrySet().iterator();
+    while (containerMapIterator.hasNext() && numContainers > 0) {
+      Map.Entry<ContainerId, ContainerInfo> entry = 
containerMapIterator.next();
+      if (entry.getValue().getWorkerProfile().getName().equals(profileName)) {
+        ContainerId containerId = entry.getKey();
+        LOGGER.info("Releasing container {} running profile {}", containerId, 
WorkforceProfiles.renderName(profileName));
+        // Record that this container was explicitly released so that a new 
one is not spawned to replace it
+        // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
+        // can check for the container id and skip spawning a replacement 
container.
+        // Note that this is the best effort since these are asynchronous 
operations and a container may abort concurrently
+        // with the release call. So in some cases a replacement container may 
have already been spawned before
+        // the container is put into the black list.
+        this.releasedContainerCache.put(containerId, "");
+        this.amrmClientAsync.releaseAssignedContainer(containerId);
+        numContainers--;
+      }
+    }

Review Comment:
   please add a log here for how many containers were intended to be released 
and how many were actually released from this method



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
     requestNewContainersForStaffingDeltas(deltas);
   }
 
+  /**
+   * Handle the completion of a container. A new container will be requested 
to replace the one
+   * that just exited depending on the exit status.
+   * <p>
+   * A container completes in either of the following conditions:
+   * <ol>
+   *   <li> The container gets stopped by the ApplicationMaster. </li>
+   *   <li> Some error happens in the container and caused the container to 
exit </li>
+   *   <li> The container gets preempted by the ResourceManager </li>
+   *   <li> The container gets killed due to some reason, for example, if it 
runs over the allowed amount of virtual or physical memory </li>
+   * </ol>
+   * A replacement container is needed in all except the first case.
+   * </p>
+   */
+  @Override
+  protected void handleContainerCompletion(ContainerStatus containerStatus) {
+    ContainerId completedContainerId = containerStatus.getContainerId();
+    ContainerInfo completedContainerInfo = 
this.containerMap.remove(completedContainerId);
+
+    // Because callbacks are processed asynchronously, we might encounter 
situations where handleContainerCompletion()
+    // is called before onContainersAllocated(), resulting in the containerId 
missing from the containersMap.
+    // We use removedContainerIds to remember these containers and remove them 
from containerMap later
+    // when we call reviseWorkforcePlanAndRequestNewContainers method
+    if (completedContainerInfo == null) {
+      log.warn("Container {} not found in containerMap. This container 
onContainersCompleted() likely called before onContainersAllocated()",
+          completedContainerId);
+      this.removedContainerIds.add(completedContainerId);
+      return;
+    }
+
+    log.info("Container {} running profile {} completed with exit status {}",
+        completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getExitStatus());
+
+    if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+      log.info("Container {} running profile {} completed with diagnostics: 
{}",
+          completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getDiagnostics());
+    }
+
+    if (this.shutdownInProgress) {
+      log.info("Ignoring container completion for container {} as shutdown is 
in progress", completedContainerId);
+      return;
+    }
+
+    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+    switch (containerStatus.getExitStatus()) {
+      case(ContainerExitStatus.ABORTED):
+        handleAbortedContainer(completedContainerId, completedContainerInfo);
+        break;
+      case(ContainerExitStatus.PREEMPTED):
+        log.info("Container {} for profile {} preempted, starting to launching 
a replacement container",
+            completedContainerId, 
completedContainerInfo.getWorkerProfileName());
+        requestContainersForWorkerProfile(workerProfile, 1);
+        break;
+      case(GENERAL_OOM_EXIT_STATUS_CODE):
+      case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+      case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+        handleContainerExitedWithOOM(completedContainerId, 
completedContainerInfo);
+        break;
+      case(1): // Same as linux exit status 1 Often occurs when 
launch_container.sh failed

Review Comment:
   use constant `private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 
1;`
   





Issue Time Tracking
-------------------

    Worklog Id:     (was: 955661)
    Time Spent: 1h 50m  (was: 1h 40m)

> Implement ContainerCompletion callback in DynamicScalingYarnService
> -------------------------------------------------------------------
>
>                 Key: GOBBLIN-2189
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2189
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> DynamicScalingYarnService currently doesn't handle scaling down containers 
> and neither does anything if container is killed abruptly or goes OOM. So to 
> handle this scenario containerCompletion callback should be implemented to 
> launch the replacement containers and also scaling down handling should be 
> done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to