[ 
https://issues.apache.org/jira/browse/GOBBLIN-2052?focusedWorklogId=915951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-915951
 ]

ASF GitHub Bot logged work on GOBBLIN-2052:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/24 05:33
            Start Date: 23/Apr/24 05:33
    Worklog Time Spent: 10m 
      Work Description: arpit09 commented on code in PR #3932:
URL: https://github.com/apache/gobblin/pull/3932#discussion_r1575621084


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String 
getInuseParticipantForHelixPartition(JobContext jobContext, int p
       return null;
     }
 
+
+    private String getParticipantInInitStateForHelixPartition(JobContext 
jobContext, int partition) {
+      if 
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {

Review Comment:
   Let's the swap the check to avoid NPE, 
   `TaskPartitionState.INIT.equals(jobContext.getPartitionState(partition))`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);
+            // get containerInfo of the helix participant
+            YarnService.ContainerInfo containerInfo = 
yarnService.getContainerInfoGivenHelixParticipant(participant);
+            if(containerInfo != null) {
+              containersToRelease.add(containerInfo.getContainer());
+              instanceInitStateSince.remove(participant);
+              inUseInstances.remove(participant);
+            } else {
+              log.warn("ContainerInfo for participant {} is not found", 
participant);
+            }
+          }
+        } else {
+          instanceInitStateSince.remove(participant);
+        }
+      }
+
+      // release the containers
+      if(!containersToRelease.isEmpty()) {
+        this.yarnService.getEventBus().post(new 
ContainerReleaseRequest(containersToRelease, true));
       }
+
       slidingWindowReservoir.add(yarnContainerRequestBundle);
 
+
       log.debug("There are {} containers being requested in total, tag-count 
map {}, tag-resource map {}",
           yarnContainerRequestBundle.getTotalContainers(), 
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
           yarnContainerRequestBundle.getHelixTagResourceMap());
 
       
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
 inUseInstances);
     }
 
-    @VisibleForTesting
     /**
      * Return true is the condition for tagging an instance as "unused" holds.
      * The condition, by default is that if an instance went back to
      * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
      * not tag that instance as "unused" and have that as the candidate for 
scaling down.
      */
+    @VisibleForTesting
     boolean isInstanceUnused(String participant){
       return System.currentTimeMillis() - instanceIdleSince.get(participant) >
           TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
     }
+
+    /**
+     * Return true is the condition for tagging an instance as stuck in INIT 
state holds.
+     * The condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+     * not tag that instance as stuck and the container will not be scaled 
down.
+     */
+    @VisibleForTesting
+    boolean isInstanceStuckInInitState(String participant) {
+      return System.currentTimeMillis() - 
instanceInitStateSince.get(participant) >
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);

Review Comment:
   how did we decide on this variable `maxIdleTimeInMinutesBeforeScalingDown` 
to wait for these many minutes before scaling up ? Can we add a separate 
variable for this case, and avoid using the one for Scaling down as the purpose 
for both of them are different 



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -259,6 +278,11 @@ void runInternal() {
                 .map(i -> getInuseParticipantForHelixPartition(jobContext, i))
                 .filter(Objects::nonNull).collect(Collectors.toSet()));
 
+            instancesInInitState.addAll(jobContext.getPartitionSet().stream()
+                .map(i -> 
getParticipantInInitStateForHelixPartition(jobContext, i))

Review Comment:
   nit: Can say `helixPartition` instead using just `i` variable



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest 
newContainerRequest) {
         newContainerRequest.getResource());
   }
 
+  /**
+   *  getContainerInfoGivenHelixParticipant returns the containerInfo of the 
given helixParticipant if it exists else
+   *  return null
+   * @param helixParticipant
+   * @return ContainerInfo
+   */
+  public ContainerInfo getContainerInfoGivenHelixParticipant(String 
helixParticipant) {
+    for (ContainerInfo containerInfo : this.containerMap.values()) {
+      if (containerInfo.getHelixParticipantId().equals(helixParticipant)) {
+        return containerInfo;
+      }
+    }
+    return null;

Review Comment:
   Can see if using optional will help here, avoiding null checks and NPE in 
future



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest 
newContainerRequest) {
         newContainerRequest.getResource());
   }
 
+  /**
+   *  getContainerInfoGivenHelixParticipant returns the containerInfo of the 
given helixParticipant if it exists else
+   *  return null
+   * @param helixParticipant
+   * @return ContainerInfo
+   */

Review Comment:
   Please use this formatter for apache gobblin - 
https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);

Review Comment:
   Let's print the time for which the instance is stuck in init state



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String 
getInuseParticipantForHelixPartition(JobContext jobContext, int p
       return null;
     }
 
+
+    private String getParticipantInInitStateForHelixPartition(JobContext 
jobContext, int partition) {
+      if 
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
+        log.info("Helix task {} is in {} state",
+            jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionState(partition));
+        return jobContext.getAssignedParticipant(partition);
+      }
+
+      return null;
+    }
+
     /**
      * Iterate through the workflows configured in Helix to figure out the 
number of required partitions
      * and request the {@link YarnService} to scale to the desired number of 
containers.
      */
     @VisibleForTesting
     void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
+      Set<String> instancesInInitState = new HashSet<>();

Review Comment:
   Let's add a comment, that instancesInInitState contains only those 
containers where task is assigned, not all the helix containers in init state. 
We can change the variable name to signify that.



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);
+            // get containerInfo of the helix participant
+            YarnService.ContainerInfo containerInfo = 
yarnService.getContainerInfoGivenHelixParticipant(participant);
+            if(containerInfo != null) {
+              containersToRelease.add(containerInfo.getContainer());
+              instanceInitStateSince.remove(participant);
+              inUseInstances.remove(participant);
+            } else {
+              log.warn("ContainerInfo for participant {} is not found", 
participant);

Review Comment:
   If the participant info is not found, shall we remove it from 
`instanceInitStateSince` as well? When this will get removed from 
`instanceInitStateSince` for this case ? 



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java:
##########
@@ -21,26 +21,33 @@
 
 import org.apache.hadoop.yarn.api.records.Container;
 
+import lombok.Getter;
+
 
 /**
  * A type of event for container release requests to be used with a {@link 
com.google.common.eventbus.EventBus}.
  * This event is different than {@link ContainerShutdownRequest} because it 
releases the container through
  * the Resource Manager, while {@link ContainerShutdownRequest} shuts down a 
container through the
  * Node Manager
  */
+@Getter
 public class ContainerReleaseRequest {
+  /**
+   * 

Issue Time Tracking
-------------------

            Worklog Id:     (was: 915951)
    Remaining Estimate: 0h
            Time Spent: 10m

> Release container which is running yarn task that is stuck in INIT state
> ------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2052
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2052
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-yarn
>            Reporter: pradeep pallikila
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to