zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1197573074
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java:
##########
@@ -51,22 +58,39 @@ public class ExecutionFailureHandler {
/** Number of all restarts happened since this job is submitted. */
private long numberOfRestarts;
+ private final Context taskFailureCtx;
+ private final Context globalFailureCtx;
+ private final Collection<FailureEnricher> failureEnrichers;
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
/**
* Creates the handler to deal with task failures.
*
* @param schedulingTopology contains the topology info for failover
* @param failoverStrategy helps to decide tasks to restart on task
failures
* @param restartBackoffTimeStrategy helps to decide whether to restart
failed tasks and the
* restarting delay
+ * @param mainThreadExecutor the main thread executor of the job master
+ * @param failureEnrichers a collection of {@link FailureEnricher} that
enrich failures
+ * @param taskFailureCtx Task failure Context used by FailureEnrichers
+ * @param globalFailureCtx Global failure Context used by FailureEnrichers
*/
public ExecutionFailureHandler(
final SchedulingTopology schedulingTopology,
final FailoverStrategy failoverStrategy,
- final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final Collection<FailureEnricher> failureEnrichers,
+ final Context taskFailureCtx,
+ final Context globalFailureCtx) {
this.schedulingTopology = checkNotNull(schedulingTopology);
this.failoverStrategy = checkNotNull(failoverStrategy);
this.restartBackoffTimeStrategy =
checkNotNull(restartBackoffTimeStrategy);
+ this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+ this.failureEnrichers = checkNotNull(failureEnrichers);
+ this.taskFailureCtx = taskFailureCtx;
Review Comment:
We actually never expect this context to be null in production.
I see that it is not checked to be non-null because it can be `null` in
tests. Usually we will introduce a `DummyFailureEnricherContext` for this
purpose.
But since there is a follow-up task FLINK-32114, I think we can refine it
there. (maybe by introducing `FailureEnricherContextFactory` and
`DummyFailureEnricherContextFactory`?)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -107,26 +121,33 @@ public static RootExceptionHistoryEntry
fromExceptionHistoryEntry(
public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo
errorInfo) {
Preconditions.checkNotNull(errorInfo, "errorInfo");
return fromGlobalFailure(
- errorInfo.getException(), errorInfo.getTimestamp(),
Collections.emptyList());
+ errorInfo.getException(),
+ errorInfo.getTimestamp(),
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ Collections.emptyList());
}
private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation,
Iterable<Execution> executions) {
return new RootExceptionHistoryEntry(
cause,
timestamp,
+ failureLabels,
failingTaskName,
taskManagerLocation,
StreamSupport.stream(executions.spliterator(), false)
.filter(execution ->
execution.getFailureInfo().isPresent())
.map(
execution ->
ExceptionHistoryEntry.create(
- execution,
execution.getVertexWithAttempt()))
+ execution,
+
execution.getVertexWithAttempt(),
+ failureLabels))
Review Comment:
These concurrent failures can be quite different from the root one.
Assigning the same failure labels to them may be confusing. e.g. labeled a
concurrent Flink system error to be `USER` when the root failure is a `USER`
error.
Given that the concurrent failures are not failure root causes, I think they
are not that critical. Therefore I think it's acceptable to assign empty labels
to them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]