zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1184723091
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time
timeout) {
@Override
public CompletableFuture<Acknowledge> updateTaskExecutionState(
final TaskExecutionState taskExecutionState) {
- FlinkException taskExecutionException;
+ checkNotNull(taskExecutionState, "taskExecutionState");
+ // Use the main/caller thread for all updates to make sure they are
processed in order.
+ // (MainThreadExecutor i.e., the akka thread pool does not guarantee
that)
+ // Only detach for a FAILED state update that is terminal and may
perform io heavy labeling.
+ if
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+ return labelFailure(taskExecutionState)
+ .thenApplyAsync(
+ taskStateWithLabels -> {
+ try {
+ return
doUpdateTaskExecutionState(taskStateWithLabels);
+ } catch (FlinkException e) {
+ throw new CompletionException(e);
+ }
+ },
+ getMainThreadExecutor());
+ }
try {
- checkNotNull(taskExecutionState, "taskExecutionState");
+ return CompletableFuture.completedFuture(
+ doUpdateTaskExecutionState(taskExecutionState));
+ } catch (FlinkException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+ private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState
taskExecutionState)
+ throws FlinkException {
+ @Nullable FlinkException taskExecutionException;
+ try {
if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
Review Comment:
This is not the only entry to notify a task failure to the scheduler. There
are other entries, e.g.
`UpdateSchedulerNgOnInternalFailuresListener#notifyTaskFailure(...)`.
These task failures may originate from within the JobMaster, e.g. from the
source coordinators, from the scheduling process. Seems these failures are not
labeled.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##########
@@ -98,4 +112,13 @@ public String getExceptionAsString() {
public long getTimestamp() {
return timestamp;
}
+
+ /**
+ * Returns the labels associated with the exception.
+ *
+ * @return Map of exception labels
+ */
+ public Map<String, String> getLabels() {
+ return labels;
Review Comment:
It's better to make it an unmodifiable map.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -778,7 +778,7 @@ private static PartitionInfo createFinishedPartitionInfo(
*/
@Override
public void fail(Throwable t) {
- processFail(t, true);
+ processFail(t, true, Collections.emptyMap());
Review Comment:
Why not enriching this failure?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]