pgaref commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1188101108
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time
timeout) {
@Override
public CompletableFuture<Acknowledge> updateTaskExecutionState(
final TaskExecutionState taskExecutionState) {
- FlinkException taskExecutionException;
+ checkNotNull(taskExecutionState, "taskExecutionState");
+ // Use the main/caller thread for all updates to make sure they are
processed in order.
+ // (MainThreadExecutor i.e., the akka thread pool does not guarantee
that)
+ // Only detach for a FAILED state update that is terminal and may
perform io heavy labeling.
+ if
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+ return labelFailure(taskExecutionState)
+ .thenApplyAsync(
+ taskStateWithLabels -> {
+ try {
+ return
doUpdateTaskExecutionState(taskStateWithLabels);
+ } catch (FlinkException e) {
+ throw new CompletionException(e);
+ }
+ },
+ getMainThreadExecutor());
+ }
try {
- checkNotNull(taskExecutionState, "taskExecutionState");
+ return CompletableFuture.completedFuture(
+ doUpdateTaskExecutionState(taskExecutionState));
+ } catch (FlinkException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+ private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState
taskExecutionState)
+ throws FlinkException {
+ @Nullable FlinkException taskExecutionException;
+ try {
if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
Review Comment:
Looks like the main decision we have to take here is if the failure-labels
going to be used by restart strategies or not -- relying on enrichment for
restarts is what makes it crucial.
The existing implementation was [based on the
assumption](https://lists.apache.org/thread/tq8yrncg7zqtpc8ddpxrkxfpovs1wkkw)
that labels are going to be used by the custom restart strategies in the
future. Since we wanted them asynchronous, the less risky way was through
existing async calls e.g., `JobMaster#updateTaskExecutionState`, and probably
modifying the InternalFailuresListener (rather than changing SchedulerNG update
state to async).
Deciding the failure enrichment is crucial enough to be synchronous -- maybe
part of `DefaultScheduler#restartTasksWithDelay`-- is also an option.
However, decoupling failure labels completely from restart strategies sounds
like a step back here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]