pradeepppc commented on code in PR #3932:
URL: https://github.com/apache/gobblin/pull/3932#discussion_r1576581493


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);
+            // get containerInfo of the helix participant
+            YarnService.ContainerInfo containerInfo = 
yarnService.getContainerInfoGivenHelixParticipant(participant);
+            if(containerInfo != null) {
+              containersToRelease.add(containerInfo.getContainer());
+              instanceInitStateSince.remove(participant);
+              inUseInstances.remove(participant);
+            } else {
+              log.warn("ContainerInfo for participant {} is not found", 
participant);
+            }
+          }
+        } else {
+          instanceInitStateSince.remove(participant);
+        }
+      }
+
+      // release the containers
+      if(!containersToRelease.isEmpty()) {
+        this.yarnService.getEventBus().post(new 
ContainerReleaseRequest(containersToRelease, true));
       }
+
       slidingWindowReservoir.add(yarnContainerRequestBundle);
 
+
       log.debug("There are {} containers being requested in total, tag-count 
map {}, tag-resource map {}",
           yarnContainerRequestBundle.getTotalContainers(), 
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
           yarnContainerRequestBundle.getHelixTagResourceMap());
 
       
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
 inUseInstances);
     }
 
-    @VisibleForTesting
     /**
      * Return true is the condition for tagging an instance as "unused" holds.
      * The condition, by default is that if an instance went back to
      * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
      * not tag that instance as "unused" and have that as the candidate for 
scaling down.
      */
+    @VisibleForTesting
     boolean isInstanceUnused(String participant){
       return System.currentTimeMillis() - instanceIdleSince.get(participant) >
           TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
     }
+
+    /**
+     * Return true is the condition for tagging an instance as stuck in INIT 
state holds.
+     * The condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+     * not tag that instance as stuck and the container will not be scaled 
down.
+     */
+    @VisibleForTesting
+    boolean isInstanceStuckInInitState(String participant) {
+      return System.currentTimeMillis() - 
instanceInitStateSince.get(participant) >
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);

Review Comment:
   added new variable 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to