[ 
https://issues.apache.org/jira/browse/GOBBLIN-1822?focusedWorklogId=858772&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858772
 ]

ASF GitHub Bot logged work on GOBBLIN-1822:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/23 19:25
            Start Date: 24/Apr/23 19:25
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3685:
URL: https://github.com/apache/gobblin/pull/3685#discussion_r1175692587


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -222,8 +223,22 @@ void runInternal() {
           if (jobContext != null) {
             log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
 
-            
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
-                .filter(Objects::nonNull).collect(Collectors.toSet()));
+            inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i 
-> {
+              if(jobContext.getPartitionState(i) == null) {
+                return jobContext.getAssignedParticipant(i);
+              }
+              if (!jobContext.getPartitionState(i).equals(
+                  TaskPartitionState.ERROR) && 
!jobContext.getPartitionState(i).equals(

Review Comment:
   TaskPartitionState:
   
https://github.com/apache/helix/blob/53e583d4ff16cfafdcb06d2164cdd7a8a6a81245/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java#L25-L44
   
   Seems like your code considers 3 edge cases:
   > 1. Null partition state (This may happen when the job is not fully 
initialized). And then we return the assigned participant (which will probably 
return null)
   > 2. Non-error / non-dropped state. i.e. the 2 "internal error" states in 
the comments. This behavior remains unchanged
   > 3. Internal Helix error. Filter out the assigned participant by returning 
null. 
   
   The null edge case feels redundant because we are still calling get assigned 
participant.
   
   Also, for case 2 recall that in Helix the assigned participant is 
responsible for modifying the task state and assigning itself the assigned 
participant. If  Helix does not think we have enough containers (eg lost 
container), then it will still be counted as an assigned participant and it 
would be an error to still consider that container an assigned participant. 
   
   From the linked enum class, I think we should ignore assigned participants 
for `STOPPED`, `COMPLETED`, `TIMED_OUT`, `TASK_ERROR`, `TASK_ABORTED`, `ERROR`, 
`DROPPED`. I.e. anything that isn't `INIT` or `RUNNING`. Especially since the 
autoscaling manager is currently only used for streaming tasks and not 
distributed helix mode. Also we do not expect tasks outside of INIT and RUNNING 
(tasks go to DROPPED if there is any task failure like OOM or 
KafkaIngestionHealthCheck). 
   
   The one thing to consider is whether or not we want to log for all of these 
cases. I think in streaming state transitions are not that noisy. Tasks do not 
tend to exit much once running. (And if they are, then we should really know 
about it). Because right now, there is no transparency on helix state after 
submitting
   
   To simplify this code I don't think we need these 3 if else statements. What 
do you think about this code block instead?
   ```
   List<TaskPartitionState> validStates = 
Arrays.asList(TaskPartitionState.RUNNING, TaskPartitionState.INIT);
   inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i -> {
                 if(validStates.contains(jobContext.getPartitionState(i))) {
                   return jobContext.getAssignedParticipant(i);
                 }
                 
                 // adding log here now for debugging
                 //todo: if this happens frequently, we should reset to status 
to retriable or at least report the error earlier
                 log.error("Helix task {} is in {} state which is unexpected, 
please watch out to see if this get recovered", 
jobContext.getTaskIdForPartition(i), jobContext.getPartitionState(i));
                   return null;
               }).filter(Objects::nonNull).collect(Collectors.toSet()));
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 858772)
    Time Spent: 0.5h  (was: 20m)

> Logging Abnormal Helix Task States
> ----------------------------------
>
>                 Key: GOBBLIN-1822
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1822
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently, in the autoScalingManager, we iterate through all Helix tasks 
> without logging their statuses. This means that if any issues occur and we 
> need to restart the pipeline, we lose the Helix status information, making it 
> difficult to investigate the problem further.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to