homatthew commented on code in PR #3685:
URL: https://github.com/apache/gobblin/pull/3685#discussion_r1175692587
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -222,8 +223,22 @@ void runInternal() {
if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
-
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
- .filter(Objects::nonNull).collect(Collectors.toSet()));
+ inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i
-> {
+ if(jobContext.getPartitionState(i) == null) {
+ return jobContext.getAssignedParticipant(i);
+ }
+ if (!jobContext.getPartitionState(i).equals(
+ TaskPartitionState.ERROR) &&
!jobContext.getPartitionState(i).equals(
Review Comment:
TaskPartitionState:
https://github.com/apache/helix/blob/53e583d4ff16cfafdcb06d2164cdd7a8a6a81245/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java#L25-L44
Seems like your code considers 3 edge cases:
> 1. Null partition state (This may happen when the job is not fully
initialized). And then we return the assigned participant (which will probably
return null)
> 2. Non-error / non-dropped state. i.e. the 2 "internal error" states in
the comments. This behavior remains unchanged
> 3. Internal Helix error. Filter out the assigned participant by returning
null.
The null edge case feels redundant because we are still calling get assigned
participant.
Also, for case 2 recall that in Helix the assigned participant is
responsible for modifying the task state and assigning itself the assigned
participant. If Helix does not think we have enough containers (eg lost
container), then it will still be counted as an assigned participant and it
would be an error to still consider that container an assigned participant.
From the linked enum class, I think we should ignore assigned participants
for `STOPPED`, `COMPLETED`, `TIMED_OUT`, `TASK_ERROR`, `TASK_ABORTED`, `ERROR`,
`DROPPED`. I.e. anything that isn't `INIT` or `RUNNING`. Especially since the
autoscaling manager is currently only used for streaming tasks and not
distributed helix mode. Also we do not expect tasks outside of INIT and RUNNING
(tasks go to DROPPED if there is any task failure like OOM or
KafkaIngestionHealthCheck).
The one thing to consider is whether or not we want to log for all of these
cases. I think in streaming state transitions are not that noisy. Tasks do not
tend to exit much once running. (And if they are, then we should really know
about it). Because right now, there is no transparency on helix state after
submitting
To simplify this code I don't think we need these 3 if else statements. What
do you think about this code block instead?
```
List<TaskPartitionState> validStates =
Arrays.asList(TaskPartitionState.RUNNING, TaskPartitionState.INIT);
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i -> {
if(validStates.contains(jobContext.getPartitionState(i))) {
return jobContext.getAssignedParticipant(i);
}
// adding log here now for debugging
//todo: if this happens frequently, we should reset to status
to retriable or at least report the error earlier
log.error("Helix task {} is in {} state which is unexpected,
please watch out to see if this get recovered",
jobContext.getTaskIdForPartition(i), jobContext.getPartitionState(i));
return null;
}).filter(Objects::nonNull).collect(Collectors.toSet()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]