[
https://issues.apache.org/jira/browse/FLINK-21439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315316#comment-17315316
]
John Phelan edited comment on FLINK-21439 at 4/6/21, 7:35 AM:
--------------------------------------------------------------
hi [~mapohl]! Thank you for the heads up about that change and
{{ExceptionHistoryEntryExtractor}}. I think it makes sense.
What exactly does "global" mean? It seems to often mean that the failure cannot
be associated with a task name. It seems that it often also has a separate
meaning - that a failure will trigger a complete restart rather than one
restricted to a subgraph of the job topology?
Maybe the {{AdaptiveScheduler}} can still emit failures associated with their
task names for users to see, even if it will not yet trigger non global
restarts. It appears {{updateTaskExecutionState}} will be passed errors where
it's possible to collect the necessary information, from this
[testFailureReportedViaUpdateTaskExecutionStateCausesRestart
test|https://github.com/apache/flink/blob/39c959c4a1c459bd3bb52a1475adde1651f7e4be/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java#L229]
To your point of populating the collection/queue: both the states or the
scheduler have access to those relevant methods. Like you said many of the
states seem to not yet have complete handling of the exceptions users would
likely want to see in the GUI etc and the states are likely to evolve and
change.
Maybe collecting exceptions in the scheduler would let us expose visibility
regardless of state's behavior and it will also be coherent with the queue in
the scheduler. From similar code from {{DefaultScheduler}} it seems feasible to
do something like the following in {{AdaptiveScheduler}} to get the necessary
inputs for {{ExceptionHistoryEntryExtractor::extractLocalFailure}} and then
analagously for {{handleGlobalFailure}}:
{code:java}
@Override
public boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionState) {
taskExecutionState.getError(userCodeClassLoader);
ExceptionHistoryEntryExtractor exceptionHistoryEntryExtractor = null;
return state.tryCall(
StateWithExecutionGraph.class,
stateWithExecutionGraph -> {
boolean result =
stateWithExecutionGraph.updateTaskExecutionState(
taskExecutionState);
ExecutionGraph executionGraph =
stateWithExecutionGraph.getExecutionGraph();
if (taskExecutionState.getError(userCodeClassLoader) ==
null) {
# What is the right condition to say this isn't a
"failure"?
return result;
}
Optional<ExecutionVertexID> executionVertexID =
getExecutionVertexID(
taskExecutionState.getID(),
executionGraph);
Iterable<ExecutionVertexID> verticesToRestart =
IterableUtils
.toStream(executionGraph.getSchedulingTopology().getVertices())
.map(SchedulingExecutionVertex::getId)
.filter(v -> !executionVertexID.equals(v))
.collect(Collectors.toSet());
exceptionHistoryEntryExtractor.extractLocalFailure(
executionGraph.getAllVertices(),
executionVertexID,
verticesToRestart);
return result;
},
"updateTaskExecutionState")
.orElse(false);
}
private Optional<ExecutionVertexID> getExecutionVertexID(
ExecutionAttemptID id,
ExecutionGraph executionGraph) {
return
Optional.ofNullable(executionGraph.getRegisteredExecutions().get(id))
.map((e) -> e.getVertex().getID());
}
{code}
Regarding the comment in that code in the conditional early {{return}} block,
what is the right information to say a transition isn't a failure that should
or can be collected? Is it adequate to check whether the error is nonnull in
the {{TaskExecutionStateTransition}}?
was (Author: bytesandwich):
hi [~mapohl]! Thank you for the heads up about that change and
{{ExceptionHistoryEntryExtractor}}. I think it makes sense.
What exactly does "global" mean? It seems to often mean that the failure cannot
be associated with a task name. It seems that it often also has a separate
meaning - that a failure that will trigger a complete restart rather than one
restricted to a subgraph of the job topology?
Maybe the {{AdaptiveScheduler}} can still emit failures associated with their
task names for users to see, even if it will not yet trigger non global
restarts. It appears {{updateTaskExecutionState}} will be passed errors where
it's possible to collect the necessary information, from this
[testFailureReportedViaUpdateTaskExecutionStateCausesRestart
test|https://github.com/apache/flink/blob/39c959c4a1c459bd3bb52a1475adde1651f7e4be/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java#L229]
To your point of populating the collection/queue: both the states or the
scheduler have access to those relevant methods. Like you said many of the
states seem to not yet have complete handling of the exceptions users would
likely want to see in the GUI etc and the states are likely to evolve and
change.
Maybe collecting exceptions in the scheduler would let us expose visibility
regardless of state's behavior and it will also be coherent with the queue in
the scheduler. From similar code from {{DefaultScheduler}} it seems feasible to
do something like the following in {{AdaptiveScheduler}} to get the necessary
inputs for {{ExceptionHistoryEntryExtractor::extractLocalFailure}} and then
analagously for {{handleGlobalFailure}}:
{code:java}
@Override
public boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionState) {
taskExecutionState.getError(userCodeClassLoader);
ExceptionHistoryEntryExtractor exceptionHistoryEntryExtractor = null;
return state.tryCall(
StateWithExecutionGraph.class,
stateWithExecutionGraph -> {
boolean result =
stateWithExecutionGraph.updateTaskExecutionState(
taskExecutionState);
ExecutionGraph executionGraph =
stateWithExecutionGraph.getExecutionGraph();
if (taskExecutionState.getError(userCodeClassLoader) ==
null) {
# What is the right condition to say this isn't a
"failure"?
return result;
}
Optional<ExecutionVertexID> executionVertexID =
getExecutionVertexID(
taskExecutionState.getID(),
executionGraph);
Iterable<ExecutionVertexID> verticesToRestart =
IterableUtils
.toStream(executionGraph.getSchedulingTopology().getVertices())
.map(SchedulingExecutionVertex::getId)
.filter(v -> !executionVertexID.equals(v))
.collect(Collectors.toSet());
exceptionHistoryEntryExtractor.extractLocalFailure(
executionGraph.getAllVertices(),
executionVertexID,
verticesToRestart);
return result;
},
"updateTaskExecutionState")
.orElse(false);
}
private Optional<ExecutionVertexID> getExecutionVertexID(
ExecutionAttemptID id,
ExecutionGraph executionGraph) {
return
Optional.ofNullable(executionGraph.getRegisteredExecutions().get(id))
.map((e) -> e.getVertex().getID());
}
{code}
Regarding the comment in that code in the conditional early {{return}} block,
what is the right information to say a transition isn't a failure that should
or can be collected? Is it adequate to check whether the error is nonnull in
the {{TaskExecutionStateTransition}}?
> Add support for exception history
> ---------------------------------
>
> Key: FLINK-21439
> URL: https://issues.apache.org/jira/browse/FLINK-21439
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.13.0
> Reporter: Matthias
> Assignee: John Phelan
> Priority: Major
> Fix For: 1.13.0
>
> Time Spent: 3h
> Remaining Estimate: 0h
>
> {{SchedulerNG.requestJob}} returns an {{ExecutionGraphInfo}} that was
> introduced in FLINK-21188. This {{ExecutionGraphInfo}} holds the information
> about the {{ArchivedExecutionGraph}} and exception history information.
> Currently, it's a list of {{ErrorInfos}}. This might change due to ongoing
> work in FLINK-21190 where we might introduced a wrapper class with more
> information on the failure.
> The goal of this ticket is to implement the exception history for the
> {{AdaptiveScheduler}}, i.e. collecting the exceptions that caused restarts.
> This collection of failures should be forwarded through
> {{SchedulerNG.requestJob}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)