iPalash commented on code in PR #4092: URL: https://github.com/apache/gobblin/pull/4092#discussion_r1926392654
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -62,6 +79,71 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + * <p> + * A container completes in either of the following conditions: + * <ol> + * <li> The container gets stopped by the ApplicationMaster. </li> + * <li> Some error happens in the container and caused the container to exit </li> + * <li> The container gets preempted by the ResourceManager </li> + * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> + * </ol> + * A replacement container is needed in all except the first case. + * </p> + */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", + completedContainerId); + this.removedContainerIds.add(completedContainerId); + return; + } + + log.info("Container {} running profile {} completed with exit status {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus() + ); + + if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { + log.info("Container {} running profile {} completed with diagnostics: {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics() + ); + } + + if (this.shutdownInProgress) { + log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); + return; + } + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + + switch (containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + handleAbortedContainer(completedContainerId, completedContainerInfo); + break; + case(ContainerExitStatus.PREEMPTED): + log.info("Container {} for profile {} preempted, starting to launching a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(workerProfile, 1); + break; + case(137): // General OOM exit status + case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): + case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): + handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); Review Comment: Since we are handling Virtual and Physical Memory the same way, how does increasing Container memory inside this function impact the virtual memory param? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -62,6 +79,71 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + * <p> + * A container completes in either of the following conditions: + * <ol> + * <li> The container gets stopped by the ApplicationMaster. </li> + * <li> Some error happens in the container and caused the container to exit </li> + * <li> The container gets preempted by the ResourceManager </li> + * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> + * </ol> + * A replacement container is needed in all except the first case. + * </p> + */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", Review Comment: Why would this warning occur? ~And since we are preemptively returning, what's the consequence of not handling the completion properly?~ I see we handle this later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org