iPalash commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1926392654


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +79,71 @@ protected synchronized void requestInitialContainers() {
     requestNewContainersForStaffingDeltas(deltas);
   }
 
+  /**
+   * Handle the completion of a container. A new container will be requested 
to replace the one
+   * that just exited depending on the exit status.
+   * <p>
+   * A container completes in either of the following conditions:
+   * <ol>
+   *   <li> The container gets stopped by the ApplicationMaster. </li>
+   *   <li> Some error happens in the container and caused the container to 
exit </li>
+   *   <li> The container gets preempted by the ResourceManager </li>
+   *   <li> The container gets killed due to some reason, for example, if it 
runs over the allowed amount of virtual or physical memory </li>
+   * </ol>
+   * A replacement container is needed in all except the first case.
+   * </p>
+   */
+  @Override
+  protected void handleContainerCompletion(ContainerStatus containerStatus) {
+    ContainerId completedContainerId = containerStatus.getContainerId();
+    ContainerInfo completedContainerInfo = 
this.containerMap.remove(completedContainerId);
+
+    if (completedContainerInfo == null) {
+      log.warn("Container {} not found in containerMap. This container 
onContainersCompleted() likely called before onContainersAllocated()",
+          completedContainerId);
+      this.removedContainerIds.add(completedContainerId);
+      return;
+    }
+
+    log.info("Container {} running profile {} completed with exit status {}",
+        completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getExitStatus()
+    );
+
+    if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+      log.info("Container {} running profile {} completed with diagnostics: 
{}",
+          completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getDiagnostics()
+      );
+    }
+
+    if (this.shutdownInProgress) {
+      log.info("Ignoring container completion for container {} as shutdown is 
in progress", completedContainerId);
+      return;
+    }
+
+    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+    switch (containerStatus.getExitStatus()) {
+      case(ContainerExitStatus.ABORTED):
+        handleAbortedContainer(completedContainerId, completedContainerInfo);
+        break;
+      case(ContainerExitStatus.PREEMPTED):
+        log.info("Container {} for profile {} preempted, starting to launching 
a replacement container",
+            completedContainerId, 
completedContainerInfo.getWorkerProfileName());
+        requestContainersForWorkerProfile(workerProfile, 1);
+        break;
+      case(137): // General OOM exit status
+      case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+      case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+        handleContainerExitedWithOOM(completedContainerId, 
completedContainerInfo);

Review Comment:
   Since we are handling Virtual and Physical Memory the same way, how does 
increasing Container memory inside this function impact the virtual memory 
param?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +79,71 @@ protected synchronized void requestInitialContainers() {
     requestNewContainersForStaffingDeltas(deltas);
   }
 
+  /**
+   * Handle the completion of a container. A new container will be requested 
to replace the one
+   * that just exited depending on the exit status.
+   * <p>
+   * A container completes in either of the following conditions:
+   * <ol>
+   *   <li> The container gets stopped by the ApplicationMaster. </li>
+   *   <li> Some error happens in the container and caused the container to 
exit </li>
+   *   <li> The container gets preempted by the ResourceManager </li>
+   *   <li> The container gets killed due to some reason, for example, if it 
runs over the allowed amount of virtual or physical memory </li>
+   * </ol>
+   * A replacement container is needed in all except the first case.
+   * </p>
+   */
+  @Override
+  protected void handleContainerCompletion(ContainerStatus containerStatus) {
+    ContainerId completedContainerId = containerStatus.getContainerId();
+    ContainerInfo completedContainerInfo = 
this.containerMap.remove(completedContainerId);
+
+    if (completedContainerInfo == null) {
+      log.warn("Container {} not found in containerMap. This container 
onContainersCompleted() likely called before onContainersAllocated()",

Review Comment:
   Why would this warning occur? 
   ~And since we are preemptively returning, what's the consequence of not 
handling the completion properly?~  I see we handle this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to