homatthew commented on code in PR #3685:
URL: https://github.com/apache/gobblin/pull/3685#discussion_r1175692587


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -222,8 +223,22 @@ void runInternal() {
           if (jobContext != null) {
             log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
 
-            
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
-                .filter(Objects::nonNull).collect(Collectors.toSet()));
+            inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i 
-> {
+              if(jobContext.getPartitionState(i) == null) {
+                return jobContext.getAssignedParticipant(i);
+              }
+              if (!jobContext.getPartitionState(i).equals(
+                  TaskPartitionState.ERROR) && 
!jobContext.getPartitionState(i).equals(

Review Comment:
   TaskPartitionState:
   
https://github.com/apache/helix/blob/53e583d4ff16cfafdcb06d2164cdd7a8a6a81245/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java#L25-L44
   
   Seems like your code considers 3 edge cases:
   > 1. Null partition state (This may happen when the job is not fully 
initialized). And then we return the assigned participant (which will probably 
return null)
   > 2. Non-error / non-dropped state. i.e. the 2 "internal error" states in 
the comments. This behavior remains unchanged
   > 3. Internal Helix error. Filter out the assigned participant by returning 
null. 
   
   The null edge case feels redundant because we are still calling get assigned 
participant.
   
   Also, for case 2 recall that in Helix the assigned participant is 
responsible for modifying the task state and assigning itself the assigned 
participant. If  Helix does not think we have enough containers (eg lost 
container), then it will still be counted as an assigned participant and it 
would be an error to still consider that container an assigned participant. 
   
   From the linked enum class, I think we should ignore assigned participants 
for `STOPPED`, `COMPLETED`, `TIMED_OUT`, `TASK_ERROR`, `TASK_ABORTED`, `ERROR`, 
`DROPPED`. I.e. anything that isn't `INIT` or `RUNNING`. Especially since the 
autoscaling manager is currently only used for streaming tasks and not 
distributed helix mode. Also we do not expect tasks outside of INIT and RUNNING 
(tasks go to DROPPED if there is any task failure like OOM or 
KafkaIngestionHealthCheck). 
   
   The one thing to consider is whether or not we want to log for all of these 
cases. I think in streaming state transitions are not that noisy. Tasks do not 
tend to exit much once running. (And if they are, then we should really know 
about it). Because right now, there is no transparency on helix state after 
submitting
   
   To simplify this code I don't think we need these 3 if else statements. What 
do you think about this code block instead?
   ```
   List<TaskPartitionState> validStates = 
Arrays.asList(TaskPartitionState.RUNNING, TaskPartitionState.INIT);
   inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i -> {
                 if(validStates.contains(jobContext.getPartitionState(i))) {
                   return jobContext.getAssignedParticipant(i);
                 }
                 
                 // adding log here now for debugging
                 //todo: if this happens frequently, we should reset to status 
to retriable or at least report the error earlier
                 log.error("Helix task {} is in {} state which is unexpected, 
please watch out to see if this get recovered", 
jobContext.getTaskIdForPartition(i), jobContext.getPartitionState(i));
                   return null;
               }).filter(Objects::nonNull).collect(Collectors.toSet()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to