[
https://issues.apache.org/jira/browse/GOBBLIN-2052?focusedWorklogId=915951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-915951
]
ASF GitHub Bot logged work on GOBBLIN-2052:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/24 05:33
Start Date: 23/Apr/24 05:33
Worklog Time Spent: 10m
Work Description: arpit09 commented on code in PR #3932:
URL: https://github.com/apache/gobblin/pull/3932#discussion_r1575621084
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String
getInuseParticipantForHelixPartition(JobContext jobContext, int p
return null;
}
+
+ private String getParticipantInInitStateForHelixPartition(JobContext
jobContext, int partition) {
+ if
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
Review Comment:
Let's the swap the check to avoid NPE,
`TaskPartitionState.INIT.equals(jobContext.getPartitionState(partition))`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
+ // get containerInfo of the helix participant
+ YarnService.ContainerInfo containerInfo =
yarnService.getContainerInfoGivenHelixParticipant(participant);
+ if(containerInfo != null) {
+ containersToRelease.add(containerInfo.getContainer());
+ instanceInitStateSince.remove(participant);
+ inUseInstances.remove(participant);
+ } else {
+ log.warn("ContainerInfo for participant {} is not found",
participant);
+ }
+ }
+ } else {
+ instanceInitStateSince.remove(participant);
+ }
+ }
+
+ // release the containers
+ if(!containersToRelease.isEmpty()) {
+ this.yarnService.getEventBus().post(new
ContainerReleaseRequest(containersToRelease, true));
}
+
slidingWindowReservoir.add(yarnContainerRequestBundle);
+
log.debug("There are {} containers being requested in total, tag-count
map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(),
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
inUseInstances);
}
- @VisibleForTesting
/**
* Return true is the condition for tagging an instance as "unused" holds.
* The condition, by default is that if an instance went back to
* active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
* not tag that instance as "unused" and have that as the candidate for
scaling down.
*/
+ @VisibleForTesting
boolean isInstanceUnused(String participant){
return System.currentTimeMillis() - instanceIdleSince.get(participant) >
TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
}
+
+ /**
+ * Return true is the condition for tagging an instance as stuck in INIT
state holds.
+ * The condition, by default is that if an instance went back to
+ * active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+ * not tag that instance as stuck and the container will not be scaled
down.
+ */
+ @VisibleForTesting
+ boolean isInstanceStuckInInitState(String participant) {
+ return System.currentTimeMillis() -
instanceInitStateSince.get(participant) >
+ TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
Review Comment:
how did we decide on this variable `maxIdleTimeInMinutesBeforeScalingDown`
to wait for these many minutes before scaling up ? Can we add a separate
variable for this case, and avoid using the one for Scaling down as the purpose
for both of them are different
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -259,6 +278,11 @@ void runInternal() {
.map(i -> getInuseParticipantForHelixPartition(jobContext, i))
.filter(Objects::nonNull).collect(Collectors.toSet()));
+ instancesInInitState.addAll(jobContext.getPartitionSet().stream()
+ .map(i ->
getParticipantInInitStateForHelixPartition(jobContext, i))
Review Comment:
nit: Can say `helixPartition` instead using just `i` variable
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest
newContainerRequest) {
newContainerRequest.getResource());
}
+ /**
+ * getContainerInfoGivenHelixParticipant returns the containerInfo of the
given helixParticipant if it exists else
+ * return null
+ * @param helixParticipant
+ * @return ContainerInfo
+ */
+ public ContainerInfo getContainerInfoGivenHelixParticipant(String
helixParticipant) {
+ for (ContainerInfo containerInfo : this.containerMap.values()) {
+ if (containerInfo.getHelixParticipantId().equals(helixParticipant)) {
+ return containerInfo;
+ }
+ }
+ return null;
Review Comment:
Can see if using optional will help here, avoiding null checks and NPE in
future
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest
newContainerRequest) {
newContainerRequest.getResource());
}
+ /**
+ * getContainerInfoGivenHelixParticipant returns the containerInfo of the
given helixParticipant if it exists else
+ * return null
+ * @param helixParticipant
+ * @return ContainerInfo
+ */
Review Comment:
Please use this formatter for apache gobblin -
https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
Review Comment:
Let's print the time for which the instance is stuck in init state
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String
getInuseParticipantForHelixPartition(JobContext jobContext, int p
return null;
}
+
+ private String getParticipantInInitStateForHelixPartition(JobContext
jobContext, int partition) {
+ if
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
+ log.info("Helix task {} is in {} state",
+ jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionState(partition));
+ return jobContext.getAssignedParticipant(partition);
+ }
+
+ return null;
+ }
+
/**
* Iterate through the workflows configured in Helix to figure out the
number of required partitions
* and request the {@link YarnService} to scale to the desired number of
containers.
*/
@VisibleForTesting
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
+ Set<String> instancesInInitState = new HashSet<>();
Review Comment:
Let's add a comment, that instancesInInitState contains only those
containers where task is assigned, not all the helix containers in init state.
We can change the variable name to signify that.
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
+ // get containerInfo of the helix participant
+ YarnService.ContainerInfo containerInfo =
yarnService.getContainerInfoGivenHelixParticipant(participant);
+ if(containerInfo != null) {
+ containersToRelease.add(containerInfo.getContainer());
+ instanceInitStateSince.remove(participant);
+ inUseInstances.remove(participant);
+ } else {
+ log.warn("ContainerInfo for participant {} is not found",
participant);
Review Comment:
If the participant info is not found, shall we remove it from
`instanceInitStateSince` as well? When this will get removed from
`instanceInitStateSince` for this case ?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java:
##########
@@ -21,26 +21,33 @@
import org.apache.hadoop.yarn.api.records.Container;
+import lombok.Getter;
+
/**
* A type of event for container release requests to be used with a {@link
com.google.common.eventbus.EventBus}.
* This event is different than {@link ContainerShutdownRequest} because it
releases the container through
* the Resource Manager, while {@link ContainerShutdownRequest} shuts down a
container through the
* Node Manager
*/
+@Getter
public class ContainerReleaseRequest {
+ /**
+ *
Issue Time Tracking
-------------------
Worklog Id: (was: 915951)
Remaining Estimate: 0h
Time Spent: 10m
> Release container which is running yarn task that is stuck in INIT state
> ------------------------------------------------------------------------
>
> Key: GOBBLIN-2052
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2052
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-yarn
> Reporter: pradeep pallikila
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)