Blazer-007 commented on code in PR #4092: URL: https://github.com/apache/gobblin/pull/4092#discussion_r1936737684
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -71,29 +153,96 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling if (CollectionUtils.isEmpty(scalingDirectives)) { return; } + + // Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated() + Iterator<ContainerId> iterator = removedContainerIds.iterator(); + while (iterator.hasNext()) { + ContainerId containerId = iterator.next(); + ContainerInfo containerInfo = this.containerMap.remove(containerId); + if (containerInfo != null) { + WorkerProfile workerProfile = containerInfo.getWorkerProfile(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, + System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + iterator.remove(); + } + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { deltas.getPerProfileDeltas().forEach(profileDelta -> { - if (profileDelta.getDelta() > 0) { // scale up! - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - int delta = profileDelta.getDelta(); + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int delta = profileDelta.getDelta(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + if (delta > 0) { // scale up! log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (profileDelta.getDelta() < 0) { // scale down! - // TODO: Decide how to handle negative deltas - log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", - profileDelta.getProfile().getName(), profileDelta.getDelta()); + } else if (delta < 0) { // scale down! + log.info("Releasing {} containers for profile {} having currently {} containers", -delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + releaseContainersForWorkerProfile(profileName, delta); + // update our staffing after releasing containers + int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); + this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } + private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + // Case 1 : Container release requested while scaling down + if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { + log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); + this.releasedContainerCache.invalidate(completedContainerId); + return; + } + + // Case 2 : Container release was not requested, we need to request a replacement container + log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); + } + + private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + + List<ScalingDirective> scalingDirectives = new ArrayList<>(); + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + // Update the current staffing to reflect the container that exited with OOM + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + + // Request a replacement container + int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable Review Comment: It will be based on ProfileDerivation Added a max limit of 64GB for replacement containers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org