arpit09 commented on code in PR #3932:
URL: https://github.com/apache/gobblin/pull/3932#discussion_r1575621084
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String
getInuseParticipantForHelixPartition(JobContext jobContext, int p
return null;
}
+
+ private String getParticipantInInitStateForHelixPartition(JobContext
jobContext, int partition) {
+ if
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
Review Comment:
Let's the swap the check to avoid NPE,
`TaskPartitionState.INIT.equals(jobContext.getPartitionState(partition))`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
+ // get containerInfo of the helix participant
+ YarnService.ContainerInfo containerInfo =
yarnService.getContainerInfoGivenHelixParticipant(participant);
+ if(containerInfo != null) {
+ containersToRelease.add(containerInfo.getContainer());
+ instanceInitStateSince.remove(participant);
+ inUseInstances.remove(participant);
+ } else {
+ log.warn("ContainerInfo for participant {} is not found",
participant);
+ }
+ }
+ } else {
+ instanceInitStateSince.remove(participant);
+ }
+ }
+
+ // release the containers
+ if(!containersToRelease.isEmpty()) {
+ this.yarnService.getEventBus().post(new
ContainerReleaseRequest(containersToRelease, true));
}
+
slidingWindowReservoir.add(yarnContainerRequestBundle);
+
log.debug("There are {} containers being requested in total, tag-count
map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(),
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
inUseInstances);
}
- @VisibleForTesting
/**
* Return true is the condition for tagging an instance as "unused" holds.
* The condition, by default is that if an instance went back to
* active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
* not tag that instance as "unused" and have that as the candidate for
scaling down.
*/
+ @VisibleForTesting
boolean isInstanceUnused(String participant){
return System.currentTimeMillis() - instanceIdleSince.get(participant) >
TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
}
+
+ /**
+ * Return true is the condition for tagging an instance as stuck in INIT
state holds.
+ * The condition, by default is that if an instance went back to
+ * active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+ * not tag that instance as stuck and the container will not be scaled
down.
+ */
+ @VisibleForTesting
+ boolean isInstanceStuckInInitState(String participant) {
+ return System.currentTimeMillis() -
instanceInitStateSince.get(participant) >
+ TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
Review Comment:
how did we decide on this variable `maxIdleTimeInMinutesBeforeScalingDown`
to wait for these many minutes before scaling up ? Can we add a separate
variable for this case, and avoid using the one for Scaling down as the purpose
for both of them are different
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -259,6 +278,11 @@ void runInternal() {
.map(i -> getInuseParticipantForHelixPartition(jobContext, i))
.filter(Objects::nonNull).collect(Collectors.toSet()));
+ instancesInInitState.addAll(jobContext.getPartitionSet().stream()
+ .map(i ->
getParticipantInInitStateForHelixPartition(jobContext, i))
Review Comment:
nit: Can say `helixPartition` instead using just `i` variable
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest
newContainerRequest) {
newContainerRequest.getResource());
}
+ /**
+ * getContainerInfoGivenHelixParticipant returns the containerInfo of the
given helixParticipant if it exists else
+ * return null
+ * @param helixParticipant
+ * @return ContainerInfo
+ */
+ public ContainerInfo getContainerInfoGivenHelixParticipant(String
helixParticipant) {
+ for (ContainerInfo containerInfo : this.containerMap.values()) {
+ if (containerInfo.getHelixParticipantId().equals(helixParticipant)) {
+ return containerInfo;
+ }
+ }
+ return null;
Review Comment:
Can see if using optional will help here, avoiding null checks and NPE in
future
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest
newContainerRequest) {
newContainerRequest.getResource());
}
+ /**
+ * getContainerInfoGivenHelixParticipant returns the containerInfo of the
given helixParticipant if it exists else
+ * return null
+ * @param helixParticipant
+ * @return ContainerInfo
+ */
Review Comment:
Please use this formatter for apache gobblin -
https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
Review Comment:
Let's print the time for which the instance is stuck in init state
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String
getInuseParticipantForHelixPartition(JobContext jobContext, int p
return null;
}
+
+ private String getParticipantInInitStateForHelixPartition(JobContext
jobContext, int partition) {
+ if
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
+ log.info("Helix task {} is in {} state",
+ jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionState(partition));
+ return jobContext.getAssignedParticipant(partition);
+ }
+
+ return null;
+ }
+
/**
* Iterate through the workflows configured in Helix to figure out the
number of required partitions
* and request the {@link YarnService} to scale to the desired number of
containers.
*/
@VisibleForTesting
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
+ Set<String> instancesInInitState = new HashSet<>();
Review Comment:
Let's add a comment, that instancesInInitState contains only those
containers where task is assigned, not all the helix containers in init state.
We can change the variable name to signify that.
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
+ // get containerInfo of the helix participant
+ YarnService.ContainerInfo containerInfo =
yarnService.getContainerInfoGivenHelixParticipant(participant);
+ if(containerInfo != null) {
+ containersToRelease.add(containerInfo.getContainer());
+ instanceInitStateSince.remove(participant);
+ inUseInstances.remove(participant);
+ } else {
+ log.warn("ContainerInfo for participant {} is not found",
participant);
Review Comment:
If the participant info is not found, shall we remove it from
`instanceInitStateSince` as well? When this will get removed from
`instanceInitStateSince` for this case ?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java:
##########
@@ -21,26 +21,33 @@
import org.apache.hadoop.yarn.api.records.Container;
+import lombok.Getter;
+
/**
* A type of event for container release requests to be used with a {@link
com.google.common.eventbus.EventBus}.
* This event is different than {@link ContainerShutdownRequest} because it
releases the container through
* the Resource Manager, while {@link ContainerShutdownRequest} shuts down a
container through the
* Node Manager
*/
+@Getter
public class ContainerReleaseRequest {
+ /**
+ * -- GETTER --
+ * Get the IDs of the containers to release.
+ *
+ * @return the IDs of the containers to release
+ */
private final Collection<Container> containers;
+ private final boolean spinUpReplacementContainers;
Review Comment:
For boolean variables please start variable name with `is` or `has` or
`should`
Please add a comment for what purpose is it needed ?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
Review Comment:
Did we check if we can get the init time directly from jobContext or helix
directly? If possible we should utilize that time rather adding it ourselves.
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
Review Comment:
`is not stuck` right ?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String
getInuseParticipantForHelixPartition(JobContext jobContext, int p
return null;
}
+
+ private String getParticipantInInitStateForHelixPartition(JobContext
jobContext, int partition) {
+ if
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
+ log.info("Helix task {} is in {} state",
+ jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionState(partition));
+ return jobContext.getAssignedParticipant(partition);
+ }
+
+ return null;
Review Comment:
Can we return Optional here instead of specifically sending `null` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]