khandelwal-prateek commented on code in PR #4092: URL: https://github.com/apache/gobblin/pull/4092#discussion_r1945905660
########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times testDynamicScalingYarnServiceManager.shutDown(); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); Review Comment: what exactly are we testing here? ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times Review Comment: any reason to update this to 7 seconds? typo in comments `5 seconds sleep` ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 Review Comment: `after five invocations` ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -71,29 +164,103 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling if (CollectionUtils.isEmpty(scalingDirectives)) { return; } + + // Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated() + Iterator<ContainerId> iterator = removedContainerIds.iterator(); + while (iterator.hasNext()) { + ContainerId containerId = iterator.next(); + ContainerInfo containerInfo = this.containerMap.remove(containerId); + if (containerInfo != null) { + WorkerProfile workerProfile = containerInfo.getWorkerProfile(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, + System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + iterator.remove(); + } + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { deltas.getPerProfileDeltas().forEach(profileDelta -> { - if (profileDelta.getDelta() > 0) { // scale up! - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - int delta = profileDelta.getDelta(); + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int delta = profileDelta.getDelta(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + if (delta > 0) { // scale up! log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (profileDelta.getDelta() < 0) { // scale down! - // TODO: Decide how to handle negative deltas - log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", - profileDelta.getProfile().getName(), profileDelta.getDelta()); + } else if (delta < 0) { // scale down! + log.info("Releasing {} containers for profile {} having currently {} containers", -delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + releaseContainersForWorkerProfile(profileName, delta); + // update our staffing after releasing containers + int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); + this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } + private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + // Case 1 : Container release requested while scaling down + if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { + log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); + this.releasedContainerCache.invalidate(completedContainerId); + return; + } + + // Case 2 : Container release was not requested, we need to request a replacement container + log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); + } + + private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + + List<ScalingDirective> scalingDirectives = new ArrayList<>(); + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + // Update the current staffing to reflect the container that exited with OOM + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + } + + // Request a replacement container + int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; + if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS; + } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", + MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); + return; Review Comment: this can be simplified to: ``` int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 2, MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { log.warn("Replacement container memory exceeds max limit {}, not requesting a replacement container.", MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); return; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org