This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2a8e3ec8e [GOBBLIN-2052] release those containers which are running
helix task that are stuck in any of the given state (#3932)
2a8e3ec8e is described below
commit 2a8e3ec8e22c2394f0f274d9bc593aea4214621a
Author: Pradeep Pallikila <[email protected]>
AuthorDate: Thu May 30 22:44:28 2024 +0530
[GOBBLIN-2052] release those containers which are running helix task that
are stuck in any of the given state (#3932)
* remove container which runs helix task which is stuck in INIT state
* refactored
* refactored
* refactored
* refactored
* refactored
* added uts
* gte event
* fix uts
* added flag to enable releasing of container or not
* added feature flag enabling stuck task detection feature
* added capability to check for tasks that are stuck in any given state
* fixed build
* added extra check to not have running state as stuck state
* fixing build
* resolved comments
* resolved comments
---------
Co-authored-by: Pradeep Pallikila <[email protected]>
---
.../gobblin/cluster/GobblinHelixConstants.java | 2 +
.../gobblin/yarn/GobblinYarnEventConstants.java | 1 +
.../gobblin/yarn/YarnAutoScalingManager.java | 155 +++++++++++++++++++--
.../java/org/apache/gobblin/yarn/YarnService.java | 32 ++++-
.../yarn/event/ContainerReleaseRequest.java | 23 +--
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 100 +++++++++++--
6 files changed, 274 insertions(+), 39 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
index 00b31a8e2..f25b5dc28 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixConstants.java
@@ -24,4 +24,6 @@ public class GobblinHelixConstants {
public static final String SHUTDOWN_MESSAGE_TYPE = "SHUTDOWN";
+ public static final String HELIX_INSTANCE_NAME_KEY = "HelixInstanceName";
+
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
index 80441d43e..666cc012b 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnEventConstants.java
@@ -45,5 +45,6 @@ public class GobblinYarnEventConstants {
public static final String ERROR = "Error";
public static final String HELIX_INSTANCE_COMPLETION =
"HelixInstanceCompletion";
public static final String SHUTDOWN_REQUEST = "ShutdownRequest";
+ public static final String HELIX_PARTITION_STUCK = "HelixPartitionStuck";
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 21fc47c4a..82e1d0fb4 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
@@ -32,7 +33,11 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.compress.utils.Sets;
+
+import org.apache.gobblin.cluster.GobblinHelixConstants;
import org.apache.gobblin.stream.WorkUnitChangeEvent;
+
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -60,6 +65,7 @@ import
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import static
org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
@@ -71,8 +77,7 @@ import static
org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_
@Slf4j
public class YarnAutoScalingManager extends AbstractIdleService {
private final String AUTO_SCALING_PREFIX =
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
- private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
- AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+ private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
private final String TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX
+ "taskAttemptsThreshold";
private final int DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = 20;
private final String SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD =
AUTO_SCALING_PREFIX + "splitWorkUnitReachThreshold";
@@ -82,21 +87,24 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR =
AUTO_SCALING_PREFIX + "overProvisionFactor";
+ private final String STUCK_TASK_CONTAINER_RELEASE_THRESHOLD_MINUTES =
+ AUTO_SCALING_PREFIX + "stuckTaskContainerReleaseThresholdMinutes";
+ private final String RELEASE_CONTAINER_IF_TASK_IS_STUCK =
AUTO_SCALING_PREFIX + "releaseContainerIfTaskIsStuck";
+ private final String DETECT_IF_TASK_IS_STUCK = AUTO_SCALING_PREFIX +
"detectIfTaskIsStuck";
+ private final String ENABLE_DETECTION_FOR_TASK_STATES = AUTO_SCALING_PREFIX
+ "enableDetectionForTaskStates";
private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR =
1.0;
+ private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX +
"initialDelay";
+ private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX +
"windowSize";
+ public final static int
DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+ private final static int
DEFAULT_MAX_TIME_MINUTES_TO_RELEASE_CONTAINER_HAVING_HELIX_TASK_THAT_IS_STUCK =
20;
+
// The cluster level default tags for Helix instances
private final String defaultHelixInstanceTags;
private final int defaultContainerMemoryMbs;
private final int defaultContainerCores;
-
- private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX +
"initialDelay";
- private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
private int taskAttemptsThreshold;
private final boolean splitWorkUnitReachThreshold;
-
- private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX +
"windowSize";
-
- public final static int
DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
-
private final Config config;
private final HelixManager helixManager;
private final ScheduledExecutorService autoScalingExecutor;
@@ -105,6 +113,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
private static int maxIdleTimeInMinutesBeforeScalingDown =
DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+ private final int maxTimeInMinutesBeforeReleasingContainerHavingStuckTask;
+ private final boolean enableReleasingContainerHavingStuckTask;
+ private final boolean enableDetectionStuckTask;
+ private final HashSet<TaskPartitionState> detectionForStuckTaskStates;
private static final HashSet<TaskPartitionState>
UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR,
TaskPartitionState.DROPPED, TaskPartitionState.COMPLETED,
TaskPartitionState.TIMED_OUT);
@@ -136,6 +148,34 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD);
this.splitWorkUnitReachThreshold = ConfigUtils.getBoolean(this.config,
SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD,
DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD);
+ this.maxTimeInMinutesBeforeReleasingContainerHavingStuckTask =
ConfigUtils.getInt(this.config,
+ STUCK_TASK_CONTAINER_RELEASE_THRESHOLD_MINUTES,
+
DEFAULT_MAX_TIME_MINUTES_TO_RELEASE_CONTAINER_HAVING_HELIX_TASK_THAT_IS_STUCK);
+ this.enableReleasingContainerHavingStuckTask =
ConfigUtils.getBoolean(this.config,
+ RELEASE_CONTAINER_IF_TASK_IS_STUCK, false);
+ this.enableDetectionStuckTask = ConfigUtils.getBoolean(this.config,
DETECT_IF_TASK_IS_STUCK, false);
+ this.detectionForStuckTaskStates =
getTaskStatesForWhichDetectionIsEnabled();
+ }
+
+ private HashSet<TaskPartitionState>
getTaskStatesForWhichDetectionIsEnabled() {
+ HashSet<TaskPartitionState> taskStates = new HashSet<>();
+ if (this.enableDetectionStuckTask) {
+ List<String> taskStatesEnabledForDetection =
ConfigUtils.getStringList(this.config, ENABLE_DETECTION_FOR_TASK_STATES);
+ for (String taskState : taskStatesEnabledForDetection) {
+ try {
+ TaskPartitionState helixTaskState =
TaskPartitionState.valueOf(taskState);
+ if(helixTaskState == TaskPartitionState.RUNNING) {
+ log.warn("Running state is not allowed for detection as it is not
a stuck state, ignoring");
+ continue;
+ }
+ taskStates.add(helixTaskState);
+ } catch (IllegalArgumentException e) {
+ log.warn("Invalid task state {} provided for detection, ignoring",
taskState);
+ }
+ }
+ }
+ log.info("Detection of task being stuck is enabled on following task
states {}", taskStates);
+ return taskStates;
}
@Override
@@ -150,7 +190,9 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
this.autoScalingExecutor.scheduleAtFixedRate(new
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
this.yarnService, this.partitionsPerContainer,
this.overProvisionFactor,
this.slidingFixedSizeWindow,
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
- this.defaultContainerMemoryMbs, this.defaultContainerCores,
this.taskAttemptsThreshold, this.splitWorkUnitReachThreshold),
+ this.defaultContainerMemoryMbs, this.defaultContainerCores,
this.taskAttemptsThreshold,
+ this.splitWorkUnitReachThreshold,
this.maxTimeInMinutesBeforeReleasingContainerHavingStuckTask,
+ this.enableReleasingContainerHavingStuckTask,
this.enableDetectionStuckTask, this.detectionForStuckTaskStates),
initialDelay, scheduleInterval, TimeUnit.SECONDS);
}
@@ -179,13 +221,22 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final int defaultContainerCores;
private final int taskAttemptsThreshold;
private final boolean splitWorkUnitReachThreshold;
+ private final int maxTimeInMinutesBeforeReleasingContainerHavingStuckTask;
+ private final boolean enableReleasingContainerHavingStuckTask;
+ private final boolean enableDetectionStuckTask;
+ private final HashSet<TaskPartitionState> taskStates;
/**
* A static map that keep track of an idle instance and its latest
beginning idle time.
* If an instance is no longer idle when inspected, it will be dropped
from this map.
*/
private static final Map<String, Long> instanceIdleSince = new HashMap<>();
-
+ /**
+ * A static nested map that keep track of an instances which contains the
tasks which are present in any of the
+ * configured states along with its latest beginning idle time in any of
those states. If an instance is no longer
+ * in the given states when inspected, it will be dropped from this map.
+ */
+ private static final Map<String, Long> instanceStuckSince = new
HashMap<>();
@Override
public void run() {
@@ -219,6 +270,19 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
return null;
}
+
+ private String getParticipantInGivenStateForHelixPartition(final
JobContext jobContext, final int partition,
+ final HashSet<TaskPartitionState> taskStates) {
+ if (taskStates.contains(jobContext.getPartitionState(partition))) {
+ log.info("Helix task {} is in {} state at helix participant {}",
+ jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionState(partition),
+ jobContext.getAssignedParticipant(partition));
+ return jobContext.getAssignedParticipant(partition);
+ }
+
+ return null;
+ }
+
/**
* Iterate through the workflows configured in Helix to figure out the
number of required partitions
* and request the {@link YarnService} to scale to the desired number of
containers.
@@ -226,6 +290,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
@VisibleForTesting
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
+ // helixInstancesContainingStuckTasks maintains the set of helix
instances/participants containing tasks that are
+ // stuck in any of the configured states.
+ final Set<String> helixInstancesContainingStuckTasks = new HashSet<>();
+
YarnContainerRequestBundle yarnContainerRequestBundle = new
YarnContainerRequestBundle();
for (Map.Entry<String, WorkflowConfig> workFlowEntry :
taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext =
taskDriver.getWorkflowContext(workFlowEntry.getKey());
@@ -259,6 +327,13 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
.map(i -> getInuseParticipantForHelixPartition(jobContext, i))
.filter(Objects::nonNull).collect(Collectors.toSet()));
+ if (enableDetectionStuckTask) {
+ // if feature is not enabled the set
helixInstancesContainingStuckTasks will always be empty
+
helixInstancesContainingStuckTasks.addAll(jobContext.getPartitionSet().stream()
+ .map(helixPartition ->
getParticipantInGivenStateForHelixPartition(jobContext, helixPartition,
taskStates))
+ .filter(Objects::nonNull).collect(Collectors.toSet()));
+ }
+
numPartitions = jobContext.getPartitionSet().size();
// Job level config for helix instance tags takes precedence over
other tag configurations
if (jobConfig != null) {
@@ -286,6 +361,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
// and potentially replanner-instance.
Set<String> allParticipants =
HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);
+ final Set<Container> containersToRelease = new HashSet<>();
+
// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by
assigning timestamp as -1.
for (String participant : allParticipants) {
@@ -299,9 +376,49 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(helixInstancesContainingStuckTasks.contains(participant)) {
+ instanceStuckSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (isInstanceStuck(participant)) {
+ // release the corresponding container as the helix task is stuck
for a long time
+ log.info("Instance {} has some helix partition that is stuck for
{} minutes, "
+ + "releasing the container enabled : {}", participant,
+ TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() -
instanceStuckSince.get(participant)),
+ enableReleasingContainerHavingStuckTask);
+
+ // get container of the helix participant
+ Optional<Container> container =
yarnService.getContainerInfoGivenHelixParticipant(participant);
+ instanceStuckSince.remove(participant);
+ String containerId = "";
+ if(container.isPresent()) {
+ if (enableReleasingContainerHavingStuckTask) {
+ containersToRelease.add(container.get());
+ }
+ containerId = container.get().getId().toString();
+ } else {
+ log.warn("Container information for participant {} is not
found", participant);
+ }
+
+ if(this.yarnService.getEventSubmitter().isPresent()) {
+ // send GTE
+
this.yarnService.getEventSubmitter().get().submit(GobblinYarnEventConstants.EventNames.HELIX_PARTITION_STUCK,
+ GobblinHelixConstants.HELIX_INSTANCE_NAME_KEY, participant,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
+ }
+ }
+ } else {
+ instanceStuckSince.remove(participant);
+ }
+ }
+
+ // release the containers
+ if(!containersToRelease.isEmpty()) {
+ this.yarnService.getEventBus().post(new
ContainerReleaseRequest(containersToRelease, true));
}
+
slidingWindowReservoir.add(yarnContainerRequestBundle);
+
log.debug("There are {} containers being requested in total, tag-count
map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(),
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());
@@ -309,17 +426,29 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
inUseInstances);
}
- @VisibleForTesting
/**
* Return true is the condition for tagging an instance as "unused" holds.
* The condition, by default is that if an instance went back to
* active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
* not tag that instance as "unused" and have that as the candidate for
scaling down.
*/
+ @VisibleForTesting
boolean isInstanceUnused(String participant){
return System.currentTimeMillis() - instanceIdleSince.get(participant) >
TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
}
+
+ /**
+ * Return true is the condition for tagging an instance as stuck.
+ * The condition, by default is that if a task running on an instance went
back to any other state other than given
+ * states within {@link
#maxTimeInMinutesBeforeReleasingContainerHavingStuckTask} minutes, we will
+ * not tag that instance as stuck and the container will not be scaled
down.
+ */
+ @VisibleForTesting
+ boolean isInstanceStuck(final String participant) {
+ return System.currentTimeMillis() - instanceStuckSince.get(participant) >
+
TimeUnit.MINUTES.toMillis(maxTimeInMinutesBeforeReleasingContainerHavingStuckTask);
+ }
}
/**
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 5155c39b4..d7c482aa4 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -142,6 +142,7 @@ public class YarnService extends AbstractIdleService {
private final FileSystem fs;
private final Optional<GobblinMetrics> gobblinMetrics;
+ @Getter
private final Optional<EventSubmitter> eventSubmitter;
@VisibleForTesting
@@ -313,6 +314,21 @@ public class YarnService extends AbstractIdleService {
newContainerRequest.getResource());
}
+ /**
+ * getContainerInfoGivenHelixParticipant returns the container of the given
helixParticipant if it exists else
+ * return Optional<Container>
+ * @param helixParticipant
+ * @return Container
+ */
+ public Optional<Container> getContainerInfoGivenHelixParticipant(final
String helixParticipant) {
+ for (ContainerInfo containerInfo : this.containerMap.values()) {
+ if (containerInfo.getHelixParticipantId().equals(helixParticipant)) {
+ return Optional.fromNullable(containerInfo.getContainer());
+ }
+ }
+ return Optional.absent();
+ }
+
protected NMClientCallbackHandler getNMClientCallbackHandler() {
return new NMClientCallbackHandler();
}
@@ -335,13 +351,15 @@ public class YarnService extends AbstractIdleService {
for (Container container : containerReleaseRequest.getContainers()) {
LOGGER.info(String.format("Releasing container %s running on %s",
container.getId(), container.getNodeId()));
- // Record that this container was explicitly released so that a new one
is not spawned to replace it
- // Put the container id in the releasedContainerCache before releasing
it so that handleContainerCompletion()
- // can check for the container id and skip spawning a replacement
container.
- // Note that this is the best effort since these are asynchronous
operations and a container may abort concurrently
- // with the release call. So in some cases a replacement container may
have already been spawned before
- // the container is put into the black list.
- this.releasedContainerCache.put(container.getId(), "");
+ if(!containerReleaseRequest.isShouldSpinUpReplacementContainers()) {
+ // Record that this container was explicitly released so that a new
one is not spawned to replace it
+ // Put the container id in the releasedContainerCache before releasing
it so that handleContainerCompletion()
+ // can check for the container id and skip spawning a replacement
container.
+ // Note that this is the best effort since these are asynchronous
operations and a container may abort concurrently
+ // with the release call. So in some cases a replacement container may
have already been spawned before
+ // the container is put into the black list.
+ this.releasedContainerCache.put(container.getId(), "");
+ }
this.amrmClientAsync.releaseAssignedContainer(container.getId());
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
index 151ac2216..a824b3966 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
@@ -21,6 +21,9 @@ import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Container;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
/**
* A type of event for container release requests to be used with a {@link
com.google.common.eventbus.EventBus}.
@@ -28,19 +31,21 @@ import org.apache.hadoop.yarn.api.records.Container;
* the Resource Manager, while {@link ContainerShutdownRequest} shuts down a
container through the
* Node Manager
*/
+@Getter
+@AllArgsConstructor
public class ContainerReleaseRequest {
- private final Collection<Container> containers;
-
- public ContainerReleaseRequest(Collection<Container> containers) {
- this.containers = containers;
- }
-
/**
- * Get the IDs of the containers to release.
+ * -- GETTER --
+ * Get the IDs of the containers to release.
*
* @return the IDs of the containers to release
*/
- public Collection<Container> getContainers() {
- return this.containers;
+ private final Collection<Container> containers;
+ // shouldSpinUpReplacementContainers is used to indicate whether to replace
the released containers with new ones or not
+ private final boolean shouldSpinUpReplacementContainers;
+
+ public ContainerReleaseRequest(Collection<Container> containers) {
+ this.containers = containers;
+ this.shouldSpinUpReplacementContainers = false;
}
}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index d21d530d7..4ede7fb6f 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -18,11 +18,14 @@
package org.apache.gobblin.yarn;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
@@ -32,6 +35,7 @@ import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
@@ -40,10 +44,13 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.EventBus;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -79,7 +86,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable.run();
ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
@@ -110,7 +119,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable.run();
@@ -148,7 +159,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable.run();
@@ -188,7 +201,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable.run();
@@ -216,7 +231,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 2,
1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable.run();
@@ -240,7 +257,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
1.2, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable1.run();
@@ -253,7 +272,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
0.1, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable2.run();
@@ -266,7 +287,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
6.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable3.run();
@@ -276,6 +299,7 @@ public class YarnAutoScalingManagerTest {
assertContainerRequest(mockYarnService, 12,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
+
/**
* Test suppressed exception
*/
@@ -393,7 +417,9 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ false, false, new HashSet<>());
runnable.run();
@@ -414,6 +440,46 @@ public class YarnAutoScalingManagerTest {
Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag),
3);
}
+ /**
+ * Test the scenarios when an instance in cluster has any partition that is
stuck in INIT state for too long
+ */
+ @Test
+ public void testInstanceStuckInINITState() {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ Container mockContainer = mock(Container.class);
+ EventBus mockEventBus = mock(EventBus.class);
+
+ WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver,
ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
mockWorkflowConfig));
+
+ // Having both partition assigned to single instance initially, in this
case, GobblinYarnTaskRunner-2
+ JobContext mockJobContext = getJobContext(mockTaskDriver,
ImmutableMap.of(1,"GobblinYarnTaskRunner-1", 2, "GobblinYarnTaskRunner-2"),
"job1");
+
Mockito.when(mockJobContext.getPartitionState(1)).thenReturn(TaskPartitionState.RUNNING);
+
Mockito.when(mockJobContext.getPartitionState(2)).thenReturn(TaskPartitionState.INIT);
+
Mockito.when(mockYarnService.getContainerInfoGivenHelixParticipant("GobblinYarnTaskRunner-2")).thenReturn(Optional.of(mockContainer));
+ Mockito.when(mockYarnService.getEventBus()).thenReturn(mockEventBus);
+
Mockito.when(mockYarnService.getEventSubmitter()).thenReturn(Optional.absent());
+ Set<Container> containers = new HashSet<>();
+ containers.add(mockContainer);
+ mockEventBus.post(new ContainerReleaseRequest(containers, true));
+ ContainerId mockContainerId = mock(ContainerId.class);
+ Mockito.when(mockContainer.getId()).thenReturn(mockContainerId);
+ Mockito.when(mockContainerId.toString()).thenReturn("container-1");
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2"));
+
+ TestYarnAutoScalingRunnable runnable = new
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, helixDataAccessor);
+
+
+
+ runnable.setAlwaysInINITState(true);
+ runnable.run();
+
+ assertContainerRequest(mockYarnService, 2,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+ }
+
+
private HelixDataAccessor getHelixDataAccessor(List<String> taskRunners) {
HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
@@ -474,12 +540,17 @@ public class YarnAutoScalingManagerTest {
private static class TestYarnAutoScalingRunnable extends
YarnAutoScalingManager.YarnAutoScalingRunnable {
boolean raiseException = false;
boolean alwaysUnused = false;
+ boolean alwaysInINITState = false;
public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService
yarnService, int partitionsPerContainer,
HelixDataAccessor helixDataAccessor) {
super(taskDriver, yarnService, partitionsPerContainer, 1.0,
noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory,
- defaultContainerCores, 20, false);
+ defaultContainerCores, 20, false,
+ 10,
+ true, true, new HashSet<TaskPartitionState>() {{
+ add(TaskPartitionState.INIT);
+ }});
}
@Override
@@ -499,9 +570,18 @@ public class YarnAutoScalingManagerTest {
this.alwaysUnused = alwaysUnused;
}
+ void setAlwaysInINITState(boolean alwaysInINITState) {
+ this.alwaysInINITState = alwaysInINITState;
+ }
+
@Override
boolean isInstanceUnused(String participant) {
return alwaysUnused || super.isInstanceUnused(participant);
}
+
+ @Override
+ boolean isInstanceStuck(String participant) {
+ return alwaysInINITState || super.isInstanceStuck(participant);
+ }
}
}