This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a8e3ec8e [GOBBLIN-2052] release those containers which are running 
helix task that are stuck in any of the given state (#3932)
2a8e3ec8e is described below

commit 2a8e3ec8e22c2394f0f274d9bc593aea4214621a
Author: Pradeep Pallikila <[email protected]>
AuthorDate: Thu May 30 22:44:28 2024 +0530

    [GOBBLIN-2052] release those containers which are running helix task that 
are stuck in any of the given state (#3932)
    
    * remove container which runs helix task which is stuck in INIT state
    
    * refactored
    
    * refactored
    
    * refactored
    
    * refactored
    
    * refactored
    
    * added uts
    
    * gte event
    
    * fix uts
    
    * added flag to enable releasing of container or not
    
    * added feature flag enabling stuck task detection feature
    
    * added capability to check for tasks that are stuck in any given state
    
    * fixed build
    
    * added extra check to not have running state as stuck state
    
    * fixing build
    
    * resolved comments
    
    * resolved comments
    
    ---------
    
    Co-authored-by: Pradeep Pallikila <[email protected]>
---
 .../gobblin/cluster/GobblinHelixConstants.java     |   2 +
 .../gobblin/yarn/GobblinYarnEventConstants.java    |   1 +
 .../gobblin/yarn/YarnAutoScalingManager.java       | 155 +++++++++++++++++++--
 .../java/org/apache/gobblin/yarn/YarnService.java  |  32 ++++-
 .../yarn/event/ContainerReleaseRequest.java        |  23 +--
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 100 +++++++++++--
 6 files changed, 274 insertions(+), 39 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
index 00b31a8e2..f25b5dc28 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
@@ -24,4 +24,6 @@ public class GobblinHelixConstants {
 
   public static final String SHUTDOWN_MESSAGE_TYPE = "SHUTDOWN";
 
+  public static final String HELIX_INSTANCE_NAME_KEY = "HelixInstanceName";
+
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
index 80441d43e..666cc012b 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
@@ -45,5 +45,6 @@ public class GobblinYarnEventConstants {
     public static final String ERROR = "Error";
     public static final String HELIX_INSTANCE_COMPLETION = 
"HelixInstanceCompletion";
     public static final String SHUTDOWN_REQUEST = "ShutdownRequest";
+    public static final String HELIX_PARTITION_STUCK = "HelixPartitionStuck";
   }
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 21fc47c4a..82e1d0fb4 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
@@ -32,7 +33,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.commons.compress.utils.Sets;
+
+import org.apache.gobblin.cluster.GobblinHelixConstants;
 import org.apache.gobblin.stream.WorkUnitChangeEvent;
+
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -60,6 +65,7 @@ import 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
 
 import static 
org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
 
@@ -71,8 +77,7 @@ import static 
org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_
 @Slf4j
 public class YarnAutoScalingManager extends AbstractIdleService {
   private final String AUTO_SCALING_PREFIX = 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
-  private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
-      AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+  private final String AUTO_SCALING_POLLING_INTERVAL_SECS = 
AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
   private final String TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX 
+ "taskAttemptsThreshold";
   private final int DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = 20;
   private final String SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD = 
AUTO_SCALING_PREFIX + "splitWorkUnitReachThreshold";
@@ -82,21 +87,24 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
   private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
   private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 
AUTO_SCALING_PREFIX + "overProvisionFactor";
+  private final String STUCK_TASK_CONTAINER_RELEASE_THRESHOLD_MINUTES =
+      AUTO_SCALING_PREFIX + "stuckTaskContainerReleaseThresholdMinutes";
+  private final String RELEASE_CONTAINER_IF_TASK_IS_STUCK = 
AUTO_SCALING_PREFIX + "releaseContainerIfTaskIsStuck";
+  private final String DETECT_IF_TASK_IS_STUCK = AUTO_SCALING_PREFIX + 
"detectIfTaskIsStuck";
+  private final String ENABLE_DETECTION_FOR_TASK_STATES = AUTO_SCALING_PREFIX 
+ "enableDetectionForTaskStates";
   private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 
1.0;
+  private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + 
"initialDelay";
+  private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
+  private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + 
"windowSize";
+  public final static int 
DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+  private final static int 
DEFAULT_MAX_TIME_MINUTES_TO_RELEASE_CONTAINER_HAVING_HELIX_TASK_THAT_IS_STUCK = 
20;
+
   // The cluster level default tags for Helix instances
   private final String defaultHelixInstanceTags;
   private final int defaultContainerMemoryMbs;
   private final int defaultContainerCores;
-
-  private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + 
"initialDelay";
-  private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
   private int taskAttemptsThreshold;
   private final boolean splitWorkUnitReachThreshold;
-
-  private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + 
"windowSize";
-
-  public final static int 
DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
-
   private final Config config;
   private final HelixManager helixManager;
   private final ScheduledExecutorService autoScalingExecutor;
@@ -105,6 +113,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
   private static int maxIdleTimeInMinutesBeforeScalingDown = 
DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+  private final int maxTimeInMinutesBeforeReleasingContainerHavingStuckTask;
+  private final boolean enableReleasingContainerHavingStuckTask;
+  private final boolean enableDetectionStuckTask;
+  private final HashSet<TaskPartitionState> detectionForStuckTaskStates;
   private static final HashSet<TaskPartitionState>
       UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, 
TaskPartitionState.DROPPED, TaskPartitionState.COMPLETED, 
TaskPartitionState.TIMED_OUT);
 
@@ -136,6 +148,34 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
         DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD);
     this.splitWorkUnitReachThreshold = ConfigUtils.getBoolean(this.config, 
SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD,
         DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD);
+    this.maxTimeInMinutesBeforeReleasingContainerHavingStuckTask = 
ConfigUtils.getInt(this.config,
+        STUCK_TASK_CONTAINER_RELEASE_THRESHOLD_MINUTES,
+        
DEFAULT_MAX_TIME_MINUTES_TO_RELEASE_CONTAINER_HAVING_HELIX_TASK_THAT_IS_STUCK);
+    this.enableReleasingContainerHavingStuckTask = 
ConfigUtils.getBoolean(this.config,
+        RELEASE_CONTAINER_IF_TASK_IS_STUCK, false);
+    this.enableDetectionStuckTask = ConfigUtils.getBoolean(this.config, 
DETECT_IF_TASK_IS_STUCK, false);
+    this.detectionForStuckTaskStates = 
getTaskStatesForWhichDetectionIsEnabled();
+  }
+
+  private HashSet<TaskPartitionState> 
getTaskStatesForWhichDetectionIsEnabled() {
+    HashSet<TaskPartitionState> taskStates = new HashSet<>();
+    if (this.enableDetectionStuckTask) {
+      List<String> taskStatesEnabledForDetection = 
ConfigUtils.getStringList(this.config, ENABLE_DETECTION_FOR_TASK_STATES);
+      for (String taskState : taskStatesEnabledForDetection) {
+        try {
+          TaskPartitionState helixTaskState = 
TaskPartitionState.valueOf(taskState);
+          if(helixTaskState == TaskPartitionState.RUNNING) {
+            log.warn("Running state is not allowed for detection as it is not 
a stuck state, ignoring");
+            continue;
+          }
+          taskStates.add(helixTaskState);
+        } catch (IllegalArgumentException e) {
+          log.warn("Invalid task state {} provided for detection, ignoring", 
taskState);
+        }
+      }
+    }
+    log.info("Detection of task being stuck is enabled on following task 
states {}", taskStates);
+    return taskStates;
   }
 
   @Override
@@ -150,7 +190,9 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     this.autoScalingExecutor.scheduleAtFixedRate(new 
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
             this.yarnService, this.partitionsPerContainer, 
this.overProvisionFactor,
             this.slidingFixedSizeWindow, 
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
-            this.defaultContainerMemoryMbs, this.defaultContainerCores, 
this.taskAttemptsThreshold, this.splitWorkUnitReachThreshold),
+            this.defaultContainerMemoryMbs, this.defaultContainerCores, 
this.taskAttemptsThreshold,
+            this.splitWorkUnitReachThreshold, 
this.maxTimeInMinutesBeforeReleasingContainerHavingStuckTask,
+            this.enableReleasingContainerHavingStuckTask, 
this.enableDetectionStuckTask, this.detectionForStuckTaskStates),
         initialDelay, scheduleInterval, TimeUnit.SECONDS);
   }
 
@@ -179,13 +221,22 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     private final int defaultContainerCores;
     private final int taskAttemptsThreshold;
     private final boolean splitWorkUnitReachThreshold;
+    private final int maxTimeInMinutesBeforeReleasingContainerHavingStuckTask;
+    private final boolean enableReleasingContainerHavingStuckTask;
+    private final boolean enableDetectionStuckTask;
+    private final HashSet<TaskPartitionState> taskStates;
 
     /**
      * A static map that keep track of an idle instance and its latest 
beginning idle time.
      * If an instance is no longer idle when inspected, it will be dropped 
from this map.
      */
     private static final Map<String, Long> instanceIdleSince = new HashMap<>();
-
+    /**
+     * A static nested map that keep track of an instances which contains the 
tasks which are present in any of the
+     * configured states along with its latest beginning idle time in any of 
those states. If an instance is no longer
+     * in the given states when inspected, it will be dropped from this map.
+     */
+    private static final Map<String, Long> instanceStuckSince = new 
HashMap<>();
 
     @Override
     public void run() {
@@ -219,6 +270,19 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       return null;
     }
 
+
+    private String getParticipantInGivenStateForHelixPartition(final 
JobContext jobContext, final int partition,
+        final HashSet<TaskPartitionState> taskStates) {
+      if (taskStates.contains(jobContext.getPartitionState(partition))) {
+        log.info("Helix task {} is in {} state at helix participant {}",
+            jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionState(partition),
+            jobContext.getAssignedParticipant(partition));
+        return jobContext.getAssignedParticipant(partition);
+      }
+
+      return null;
+    }
+
     /**
      * Iterate through the workflows configured in Helix to figure out the 
number of required partitions
      * and request the {@link YarnService} to scale to the desired number of 
containers.
@@ -226,6 +290,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     @VisibleForTesting
     void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
+      // helixInstancesContainingStuckTasks maintains the set of helix 
instances/participants containing tasks that are
+      // stuck in any of the configured states.
+      final Set<String> helixInstancesContainingStuckTasks = new HashSet<>();
+
       YarnContainerRequestBundle yarnContainerRequestBundle = new 
YarnContainerRequestBundle();
       for (Map.Entry<String, WorkflowConfig> workFlowEntry : 
taskDriver.getWorkflows().entrySet()) {
         WorkflowContext workflowContext = 
taskDriver.getWorkflowContext(workFlowEntry.getKey());
@@ -259,6 +327,13 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
                 .map(i -> getInuseParticipantForHelixPartition(jobContext, i))
                 .filter(Objects::nonNull).collect(Collectors.toSet()));
 
+            if (enableDetectionStuckTask) {
+              // if feature is not enabled the set 
helixInstancesContainingStuckTasks will always be empty
+              
helixInstancesContainingStuckTasks.addAll(jobContext.getPartitionSet().stream()
+                  .map(helixPartition -> 
getParticipantInGivenStateForHelixPartition(jobContext, helixPartition, 
taskStates))
+                  .filter(Objects::nonNull).collect(Collectors.toSet()));
+            }
+
             numPartitions = jobContext.getPartitionSet().size();
             // Job level config for helix instance tags takes precedence over 
other tag configurations
             if (jobConfig != null) {
@@ -286,6 +361,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       // and potentially replanner-instance.
       Set<String> allParticipants = 
HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);
 
+      final Set<Container> containersToRelease = new HashSet<>();
+
       // Find all joined participants not in-use for this round of inspection.
       // If idle time is beyond tolerance, mark the instance as unused by 
assigning timestamp as -1.
       for (String participant : allParticipants) {
@@ -299,9 +376,49 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(helixInstancesContainingStuckTasks.contains(participant)) {
+          instanceStuckSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (isInstanceStuck(participant)) {
+            // release the corresponding container as the helix task is stuck 
for a long time
+            log.info("Instance {} has some helix partition that is stuck for 
{} minutes, "
+                + "releasing the container enabled : {}", participant,
+                TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - 
instanceStuckSince.get(participant)),
+                enableReleasingContainerHavingStuckTask);
+
+            // get container of the helix participant
+            Optional<Container> container = 
yarnService.getContainerInfoGivenHelixParticipant(participant);
+            instanceStuckSince.remove(participant);
+            String containerId = "";
+            if(container.isPresent()) {
+              if (enableReleasingContainerHavingStuckTask) {
+                containersToRelease.add(container.get());
+              }
+              containerId = container.get().getId().toString();
+            } else {
+              log.warn("Container information for participant {} is not 
found", participant);
+            }
+
+            if(this.yarnService.getEventSubmitter().isPresent()) {
+              // send GTE
+              
this.yarnService.getEventSubmitter().get().submit(GobblinYarnEventConstants.EventNames.HELIX_PARTITION_STUCK,
+                  GobblinHelixConstants.HELIX_INSTANCE_NAME_KEY, participant,
+                  GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
+            }
+          }
+        } else {
+          instanceStuckSince.remove(participant);
+        }
+      }
+
+      // release the containers
+      if(!containersToRelease.isEmpty()) {
+        this.yarnService.getEventBus().post(new 
ContainerReleaseRequest(containersToRelease, true));
       }
+
       slidingWindowReservoir.add(yarnContainerRequestBundle);
 
+
       log.debug("There are {} containers being requested in total, tag-count 
map {}, tag-resource map {}",
           yarnContainerRequestBundle.getTotalContainers(), 
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
           yarnContainerRequestBundle.getHelixTagResourceMap());
@@ -309,17 +426,29 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
 inUseInstances);
     }
 
-    @VisibleForTesting
     /**
      * Return true is the condition for tagging an instance as "unused" holds.
      * The condition, by default is that if an instance went back to
      * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
      * not tag that instance as "unused" and have that as the candidate for 
scaling down.
      */
+    @VisibleForTesting
     boolean isInstanceUnused(String participant){
       return System.currentTimeMillis() - instanceIdleSince.get(participant) >
           TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
     }
+
+    /**
+     * Return true is the condition for tagging an instance as stuck.
+     * The condition, by default is that if a task running on an instance went 
back to any other state other than given
+     * states within {@link 
#maxTimeInMinutesBeforeReleasingContainerHavingStuckTask} minutes, we will
+     * not tag that instance as stuck and the container will not be scaled 
down.
+     */
+    @VisibleForTesting
+    boolean isInstanceStuck(final String participant) {
+      return System.currentTimeMillis() - instanceStuckSince.get(participant) >
+          
TimeUnit.MINUTES.toMillis(maxTimeInMinutesBeforeReleasingContainerHavingStuckTask);
+    }
   }
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 5155c39b4..d7c482aa4 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -142,6 +142,7 @@ public class YarnService extends AbstractIdleService {
   private final FileSystem fs;
 
   private final Optional<GobblinMetrics> gobblinMetrics;
+  @Getter
   private final Optional<EventSubmitter> eventSubmitter;
 
   @VisibleForTesting
@@ -313,6 +314,21 @@ public class YarnService extends AbstractIdleService {
         newContainerRequest.getResource());
   }
 
+  /**
+   *  getContainerInfoGivenHelixParticipant returns the container of the given 
helixParticipant if it exists else
+   *  return Optional<Container>
+   * @param helixParticipant
+   * @return Container
+   */
+  public Optional<Container> getContainerInfoGivenHelixParticipant(final 
String helixParticipant) {
+    for (ContainerInfo containerInfo : this.containerMap.values()) {
+      if (containerInfo.getHelixParticipantId().equals(helixParticipant)) {
+        return Optional.fromNullable(containerInfo.getContainer());
+      }
+    }
+    return Optional.absent();
+  }
+
   protected NMClientCallbackHandler getNMClientCallbackHandler() {
     return new NMClientCallbackHandler();
   }
@@ -335,13 +351,15 @@ public class YarnService extends AbstractIdleService {
     for (Container container : containerReleaseRequest.getContainers()) {
       LOGGER.info(String.format("Releasing container %s running on %s", 
container.getId(), container.getNodeId()));
 
-      // Record that this container was explicitly released so that a new one 
is not spawned to replace it
-      // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
-      // can check for the container id and skip spawning a replacement 
container.
-      // Note that this is the best effort since these are asynchronous 
operations and a container may abort concurrently
-      // with the release call. So in some cases a replacement container may 
have already been spawned before
-      // the container is put into the black list.
-      this.releasedContainerCache.put(container.getId(), "");
+      if(!containerReleaseRequest.isShouldSpinUpReplacementContainers()) {
+        // Record that this container was explicitly released so that a new 
one is not spawned to replace it
+        // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
+        // can check for the container id and skip spawning a replacement 
container.
+        // Note that this is the best effort since these are asynchronous 
operations and a container may abort concurrently
+        // with the release call. So in some cases a replacement container may 
have already been spawned before
+        // the container is put into the black list.
+        this.releasedContainerCache.put(container.getId(), "");
+      }
       this.amrmClientAsync.releaseAssignedContainer(container.getId());
     }
   }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
index 151ac2216..a824b3966 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
@@ -21,6 +21,9 @@ import java.util.Collection;
 
 import org.apache.hadoop.yarn.api.records.Container;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 
 /**
  * A type of event for container release requests to be used with a {@link 
com.google.common.eventbus.EventBus}.
@@ -28,19 +31,21 @@ import org.apache.hadoop.yarn.api.records.Container;
  * the Resource Manager, while {@link ContainerShutdownRequest} shuts down a 
container through the
  * Node Manager
  */
+@Getter
+@AllArgsConstructor
 public class ContainerReleaseRequest {
-  private final Collection<Container> containers;
-
-  public ContainerReleaseRequest(Collection<Container> containers) {
-    this.containers = containers;
-  }
-
   /**
-   * Get the IDs of the containers to release.
+   * -- GETTER --
+   *  Get the IDs of the containers to release.
    *
    * @return the IDs of the containers to release
    */
-  public Collection<Container> getContainers() {
-    return this.containers;
+  private final Collection<Container> containers;
+  // shouldSpinUpReplacementContainers is used to indicate whether to replace 
the released containers with new ones or not
+  private final boolean shouldSpinUpReplacementContainers;
+
+  public ContainerReleaseRequest(Collection<Container> containers) {
+    this.containers = containers;
+    this.shouldSpinUpReplacementContainers = false;
   }
 }
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index d21d530d7..4ede7fb6f 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -18,11 +18,14 @@
 package org.apache.gobblin.yarn;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
@@ -32,6 +35,7 @@ import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -40,10 +44,13 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.EventBus;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
 
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -79,7 +86,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable.run();
     ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
@@ -110,7 +119,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable.run();
 
@@ -148,7 +159,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable.run();
 
@@ -188,7 +201,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable.run();
 
@@ -216,7 +231,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 2,
             1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable.run();
 
@@ -240,7 +257,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             1.2, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable1.run();
 
@@ -253,7 +272,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             0.1, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable2.run();
 
@@ -266,7 +287,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             6.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable3.run();
 
@@ -276,6 +299,7 @@ public class YarnAutoScalingManagerTest {
     assertContainerRequest(mockYarnService, 12, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
+
   /**
    * Test suppressed exception
    */
@@ -393,7 +417,9 @@ public class YarnAutoScalingManagerTest {
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-            defaultContainerCores, 20, false);
+            defaultContainerCores, 20, false,
+            10,
+            false, false, new HashSet<>());
 
     runnable.run();
 
@@ -414,6 +440,46 @@ public class YarnAutoScalingManagerTest {
     Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag), 
3);
   }
 
+  /**
+   * Test the scenarios when an instance in cluster has any partition that is 
stuck in INIT state for too long
+   */
+  @Test
+  public void testInstanceStuckInINITState()  {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    Container mockContainer = mock(Container.class);
+    EventBus mockEventBus = mock(EventBus.class);
+
+    WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver, 
ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+    
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
 mockWorkflowConfig));
+
+    // Having both partition assigned to single instance initially, in this 
case, GobblinYarnTaskRunner-2
+    JobContext mockJobContext = getJobContext(mockTaskDriver, 
ImmutableMap.of(1,"GobblinYarnTaskRunner-1", 2, "GobblinYarnTaskRunner-2"), 
"job1");
+    
Mockito.when(mockJobContext.getPartitionState(1)).thenReturn(TaskPartitionState.RUNNING);
+    
Mockito.when(mockJobContext.getPartitionState(2)).thenReturn(TaskPartitionState.INIT);
+    
Mockito.when(mockYarnService.getContainerInfoGivenHelixParticipant("GobblinYarnTaskRunner-2")).thenReturn(Optional.of(mockContainer));
+    Mockito.when(mockYarnService.getEventBus()).thenReturn(mockEventBus);
+    
Mockito.when(mockYarnService.getEventSubmitter()).thenReturn(Optional.absent());
+    Set<Container> containers = new HashSet<>();
+    containers.add(mockContainer);
+    mockEventBus.post(new ContainerReleaseRequest(containers, true));
+    ContainerId mockContainerId = mock(ContainerId.class);
+    Mockito.when(mockContainer.getId()).thenReturn(mockContainerId);
+    Mockito.when(mockContainerId.toString()).thenReturn("container-1");
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2"));
+
+    TestYarnAutoScalingRunnable runnable = new 
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+        1, helixDataAccessor);
+
+
+
+    runnable.setAlwaysInINITState(true);
+    runnable.run();
+
+    assertContainerRequest(mockYarnService, 2, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+  }
+
+
   private HelixDataAccessor getHelixDataAccessor(List<String> taskRunners) {
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
@@ -474,12 +540,17 @@ public class YarnAutoScalingManagerTest {
   private static class TestYarnAutoScalingRunnable extends 
YarnAutoScalingManager.YarnAutoScalingRunnable {
     boolean raiseException = false;
     boolean alwaysUnused = false;
+    boolean alwaysInINITState = false;
 
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService 
yarnService, int partitionsPerContainer,
         HelixDataAccessor helixDataAccessor) {
       super(taskDriver, yarnService, partitionsPerContainer, 1.0,
           noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory,
-          defaultContainerCores, 20, false);
+          defaultContainerCores, 20, false,
+          10,
+          true, true, new HashSet<TaskPartitionState>() {{
+            add(TaskPartitionState.INIT);
+          }});
     }
 
     @Override
@@ -499,9 +570,18 @@ public class YarnAutoScalingManagerTest {
       this.alwaysUnused = alwaysUnused;
     }
 
+    void setAlwaysInINITState(boolean alwaysInINITState) {
+      this.alwaysInINITState = alwaysInINITState;
+    }
+
     @Override
     boolean isInstanceUnused(String participant) {
       return alwaysUnused || super.isInstanceUnused(participant);
     }
+
+    @Override
+    boolean isInstanceStuck(String participant) {
+      return alwaysInINITState || super.isInstanceStuck(participant);
+    }
   }
 }


Reply via email to