[ https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=953588&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-953588 ]
ASF GitHub Bot logged work on GOBBLIN-2189: ------------------------------------------- Author: ASF GitHub Bot Created on: 22/Jan/25 09:28 Start Date: 22/Jan/25 09:28 Worklog Time Spent: 10m Work Description: abhishekmjain commented on code in PR #4092: URL: https://github.com/apache/gobblin/pull/4092#discussion_r1924987468 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -62,6 +79,71 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + * <p> + * A container completes in either of the following conditions: + * <ol> + * <li> The container gets stopped by the ApplicationMaster. </li> + * <li> Some error happens in the container and caused the container to exit </li> + * <li> The container gets preempted by the ResourceManager </li> + * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> + * </ol> + * A replacement container is needed in all except the first case. + * </p> + */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", + completedContainerId); + this.removedContainerIds.add(completedContainerId); + return; + } + + log.info("Container {} running profile {} completed with exit status {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus() + ); + + if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { + log.info("Container {} running profile {} completed with diagnostics: {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics() + ); + } + + if (this.shutdownInProgress) { + log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); + return; + } + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + + switch (containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + handleAbortedContainer(completedContainerId, completedContainerInfo); + break; + case(ContainerExitStatus.PREEMPTED): + log.info("Container {} for profile {} preempted, starting to launching a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(workerProfile, 1); + break; + case(137): // General OOM exit status Review Comment: Let's create a constant for this status code Issue Time Tracking ------------------- Worklog Id: (was: 953588) Remaining Estimate: 0h Time Spent: 10m > Implement ContainerCompletion callback in DynamicScalingYarnService > ------------------------------------------------------------------- > > Key: GOBBLIN-2189 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2189 > Project: Apache Gobblin > Issue Type: Improvement > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > DynamicScalingYarnService currently doesn't handle scaling down containers > and neither does anything if container is killed abruptly or goes OOM. So to > handle this scenario containerCompletion callback should be implemented to > launch the replacement containers and also scaling down handling should be > done. -- This message was sent by Atlassian Jira (v8.20.10#820010)