[ 
https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=957862&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-957862
 ]

ASF GitHub Bot logged work on GOBBLIN-2189:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Feb/25 06:51
            Start Date: 20/Feb/25 06:51
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1962951305


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -72,28 +165,101 @@ public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
       return;
     }
     this.workforcePlan.reviseWhenNewer(scalingDirectives);
+    calcDeltasAndRequestContainers();
+  }
+
+  public synchronized void calcDeltasAndRequestContainers() {
+    // Correct the actualWorkforceStaffing in case of 
handleContainerCompletion() getting called before onContainersAllocated()
+    Iterator<ContainerId> iterator = removedContainerIds.iterator();
+    while (iterator.hasNext()) {
+      ContainerId containerId = iterator.next();
+      ContainerInfo containerInfo = this.containerMap.remove(containerId);
+      if (containerInfo != null) {
+        WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+        if (currNumContainers > 0) {
+          this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1,
+              System.currentTimeMillis());
+        }
+        iterator.remove();
+      }
+    }
     StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
     requestNewContainersForStaffingDeltas(deltas);
   }
 
   private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
     deltas.getPerProfileDeltas().forEach(profileDelta -> {
-      if (profileDelta.getDelta() > 0) { // scale up!
-        WorkerProfile workerProfile = profileDelta.getProfile();
-        String profileName = workerProfile.getName();
-        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
-        int delta = profileDelta.getDelta();
+      WorkerProfile workerProfile = profileDelta.getProfile();
+      String profileName = workerProfile.getName();
+      int delta = profileDelta.getDelta();
+      int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+      if (delta > 0) { // scale up!
         log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
             WorkforceProfiles.renderName(profileName), currNumContainers);
         requestContainersForWorkerProfile(workerProfile, delta);
         // update our staffing after requesting new containers
         this.actualWorkforceStaffing.reviseStaffing(profileName, 
currNumContainers + delta, System.currentTimeMillis());
-      } else if (profileDelta.getDelta() < 0) { // scale down!
-        // TODO: Decide how to handle negative deltas
-        log.warn("Handling of Negative delta is not supported yet : Profile {} 
delta {} ",
-            profileDelta.getProfile().getName(), profileDelta.getDelta());
+      } else if (delta < 0) { // scale down!
+        log.info("Releasing {} containers for profile {} having currently {} 
containers", -delta,
+            WorkforceProfiles.renderName(profileName), currNumContainers);
+        releaseContainersForWorkerProfile(profileName, delta);
+        // update our staffing after releasing containers
+        int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+        this.actualWorkforceStaffing.reviseStaffing(profileName, 
numContainersAfterRelease, System.currentTimeMillis());
       } // else, already at staffing plan (or at least have requested, so 
in-progress)
     });
   }
 
+  private void handleAbortedContainer(ContainerId completedContainerId, 
ContainerInfo completedContainerInfo) {
+    // Case 1 : Container release requested while scaling down
+    if (this.releasedContainerCache.getIfPresent(completedContainerId) != 
null) {
+      log.info("Container {} was released while downscaling for profile {}", 
completedContainerId, completedContainerInfo.getWorkerProfileName());
+      this.releasedContainerCache.invalidate(completedContainerId);
+      return;
+    }
+
+    // Case 2 : Container release was not requested, we need to request a 
replacement container
+    log.info("Container {} aborted for profile {}, starting to launch a 
replacement container", completedContainerId, 
completedContainerInfo.getWorkerProfileName());
+    
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+  }
+
+  private synchronized void handleContainerExitedWithOOM(ContainerId 
completedContainerId, ContainerInfo completedContainerInfo) {
+    log.info("Container {} for profile {} exited with OOM, starting to launch 
a replacement container",
+        completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+    List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+    long currTimeMillis = System.currentTimeMillis();
+    // Update the current staffing to reflect the container that exited with 
OOM
+    int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+    if (currNumContainers > 0) {
+      this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1, currTimeMillis + 1);
+      // Add a scaling directive so that workforcePlan have uptodate setPoints 
for the workerProfile,
+      // otherwise extra containers will be requested when calculating deltas
+      scalingDirectives.add(new ScalingDirective(workerProfile.getName(), 
currNumContainers - 1, currTimeMillis + 2));
+    }
+
+    // Request a replacement container
+    int currContainerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+      log.warn("Container {} already had max allowed memory {} MBs. Not 
requesting a replacement container.",
+          completedContainerId, currContainerMemoryMbs);
+      return;
+    }
+    int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER,
+        MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
+    Optional<ProfileDerivation> optProfileDerivation = Optional.of(new 
ProfileDerivation(workerProfile.getName(),
+        new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
newContainerMemoryMbs + ""))
+    ));
+    scalingDirectives.add(new ScalingDirective(
+        DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + 
profileNameSuffixGenerator.getAndIncrement(),
+        1,
+        currTimeMillis + 3,

Review Comment:
   then let's define a constant `EPSILON_MIILIS = 1` and use that 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 957862)
    Time Spent: 4h 50m  (was: 4h 40m)

> Implement ContainerCompletion callback in DynamicScalingYarnService
> -------------------------------------------------------------------
>
>                 Key: GOBBLIN-2189
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2189
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> DynamicScalingYarnService currently doesn't handle scaling down containers 
> and neither does anything if container is killed abruptly or goes OOM. So to 
> handle this scenario containerCompletion callback should be implemented to 
> launch the replacement containers and also scaling down handling should be 
> done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to