[ https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=955983&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955983 ]
ASF GitHub Bot logged work on GOBBLIN-2189: ------------------------------------------- Author: ASF GitHub Bot Created on: 07/Feb/25 04:38 Start Date: 07/Feb/25 04:38 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4092: URL: https://github.com/apache/gobblin/pull/4092#discussion_r1945905660 ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times testDynamicScalingYarnServiceManager.shutDown(); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); Review Comment: what exactly are we testing here? ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times Review Comment: any reason to update this to 7 seconds? typo in comments `5 seconds sleep` ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 Review Comment: `after five invocations` ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -71,29 +164,103 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling if (CollectionUtils.isEmpty(scalingDirectives)) { return; } + + // Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated() + Iterator<ContainerId> iterator = removedContainerIds.iterator(); + while (iterator.hasNext()) { + ContainerId containerId = iterator.next(); + ContainerInfo containerInfo = this.containerMap.remove(containerId); + if (containerInfo != null) { + WorkerProfile workerProfile = containerInfo.getWorkerProfile(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, + System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + iterator.remove(); + } + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { deltas.getPerProfileDeltas().forEach(profileDelta -> { - if (profileDelta.getDelta() > 0) { // scale up! - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - int delta = profileDelta.getDelta(); + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int delta = profileDelta.getDelta(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + if (delta > 0) { // scale up! log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (profileDelta.getDelta() < 0) { // scale down! - // TODO: Decide how to handle negative deltas - log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", - profileDelta.getProfile().getName(), profileDelta.getDelta()); + } else if (delta < 0) { // scale down! + log.info("Releasing {} containers for profile {} having currently {} containers", -delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + releaseContainersForWorkerProfile(profileName, delta); + // update our staffing after releasing containers + int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); + this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } + private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + // Case 1 : Container release requested while scaling down + if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { + log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); + this.releasedContainerCache.invalidate(completedContainerId); + return; + } + + // Case 2 : Container release was not requested, we need to request a replacement container + log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); + } + + private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + + List<ScalingDirective> scalingDirectives = new ArrayList<>(); + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + // Update the current staffing to reflect the container that exited with OOM + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + + // Request a replacement container + int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; + if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS; + } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", + MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); + return; Review Comment: this can be simplified to: ``` int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 2, MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { log.warn("Replacement container memory exceeds max limit {}, not requesting a replacement container.", MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); return; } ``` Issue Time Tracking ------------------- Worklog Id: (was: 955983) Time Spent: 2h 20m (was: 2h 10m) > Implement ContainerCompletion callback in DynamicScalingYarnService > ------------------------------------------------------------------- > > Key: GOBBLIN-2189 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2189 > Project: Apache Gobblin > Issue Type: Improvement > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > DynamicScalingYarnService currently doesn't handle scaling down containers > and neither does anything if container is killed abruptly or goes OOM. So to > handle this scenario containerCompletion callback should be implemented to > launch the replacement containers and also scaling down handling should be > done. -- This message was sent by Atlassian Jira (v8.20.10#820010)