khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1945990379


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -71,29 +164,103 @@ public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
     if (CollectionUtils.isEmpty(scalingDirectives)) {
       return;
     }
+
+    // Correct the actualWorkforceStaffing in case of 
handleContainerCompletion() getting called before onContainersAllocated()
+    Iterator<ContainerId> iterator = removedContainerIds.iterator();
+    while (iterator.hasNext()) {
+      ContainerId containerId = iterator.next();
+      ContainerInfo containerInfo = this.containerMap.remove(containerId);
+      if (containerInfo != null) {
+        WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+        if (currNumContainers > 0) {
+          this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1,
+              System.currentTimeMillis());
+          // Add a scaling directive so that workforcePlan have uptodate 
setPoints for the workerProfile,
+          // otherwise extra containers will be requested when calculating 
deltas
+          scalingDirectives.add(new ScalingDirective(workerProfile.getName(), 
currNumContainers - 1, System.currentTimeMillis()));
+        }
+        iterator.remove();
+      }
+    }
+
     this.workforcePlan.reviseWhenNewer(scalingDirectives);
     StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
     requestNewContainersForStaffingDeltas(deltas);
   }
 
   private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
     deltas.getPerProfileDeltas().forEach(profileDelta -> {
-      if (profileDelta.getDelta() > 0) { // scale up!
-        WorkerProfile workerProfile = profileDelta.getProfile();
-        String profileName = workerProfile.getName();
-        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
-        int delta = profileDelta.getDelta();
+      WorkerProfile workerProfile = profileDelta.getProfile();
+      String profileName = workerProfile.getName();
+      int delta = profileDelta.getDelta();
+      int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+      if (delta > 0) { // scale up!
         log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
             WorkforceProfiles.renderName(profileName), currNumContainers);
         requestContainersForWorkerProfile(workerProfile, delta);
         // update our staffing after requesting new containers
         this.actualWorkforceStaffing.reviseStaffing(profileName, 
currNumContainers + delta, System.currentTimeMillis());
-      } else if (profileDelta.getDelta() < 0) { // scale down!
-        // TODO: Decide how to handle negative deltas
-        log.warn("Handling of Negative delta is not supported yet : Profile {} 
delta {} ",
-            profileDelta.getProfile().getName(), profileDelta.getDelta());
+      } else if (delta < 0) { // scale down!
+        log.info("Releasing {} containers for profile {} having currently {} 
containers", -delta,
+            WorkforceProfiles.renderName(profileName), currNumContainers);
+        releaseContainersForWorkerProfile(profileName, delta);
+        // update our staffing after releasing containers
+        int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+        this.actualWorkforceStaffing.reviseStaffing(profileName, 
numContainersAfterRelease, System.currentTimeMillis());
       } // else, already at staffing plan (or at least have requested, so 
in-progress)
     });
   }
 
+  private void handleAbortedContainer(ContainerId completedContainerId, 
ContainerInfo completedContainerInfo) {
+    // Case 1 : Container release requested while scaling down
+    if (this.releasedContainerCache.getIfPresent(completedContainerId) != 
null) {
+      log.info("Container {} was released while downscaling for profile {}", 
completedContainerId, completedContainerInfo.getWorkerProfileName());
+      this.releasedContainerCache.invalidate(completedContainerId);
+      return;
+    }
+
+    // Case 2 : Container release was not requested, we need to request a 
replacement container
+    log.info("Container {} aborted for profile {}, starting to launch a 
replacement container", completedContainerId, 
completedContainerInfo.getWorkerProfileName());
+    
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+  }
+
+  private synchronized void handleContainerExitedWithOOM(ContainerId 
completedContainerId, ContainerInfo completedContainerInfo) {
+    log.info("Container {} for profile {} exited with OOM, starting to launch 
a replacement container",
+        completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+    List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+    // Update the current staffing to reflect the container that exited with 
OOM
+    int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+    if (currNumContainers > 0) {
+      this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1, System.currentTimeMillis());
+      // Add a scaling directive so that workforcePlan have uptodate setPoints 
for the workerProfile,
+      // otherwise extra containers will be requested when calculating deltas
+      scalingDirectives.add(new ScalingDirective(workerProfile.getName(), 
currNumContainers - 1, System.currentTimeMillis()));
+    }
+
+    // Request a replacement container
+    int currContainerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    int newContainerMemoryMbs = currContainerMemoryMbs * 
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
+    if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && 
newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+      newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS;
+    } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+      log.warn("Expected replacement container memory exceeds the maximum 
allowed memory {}. Not requesting a replacement container.",
+          MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
+      return;

Review Comment:
   yeah, a typo - instead of new container, the check should be on current 
container. Can be updated to:
   
   ```
       if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
           log.warn("Container {} already had max allowed memory {} MBs. Not 
requesting a replacement container.", completedContainerId, 
currContainerMemoryMbs);
           return;
       }
   
       int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 2, 
MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to