zentol commented on a change in pull request #15049:
URL: https://github.com/apache/flink/pull/15049#discussion_r585554691
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
##########
@@ -156,4 +182,56 @@ public int hashCode() {
return Objects.hash(timestamp, exception, task, location);
}
}
+
+ /** Nested class to hold exception information of previous failures. */
+ public static final class JobExceptionHistoryEntry {
+ public static final String FIELD_NAME_ROOT_EXCEPTION =
"root-exception";
+ public static final String FIELD_NAME_TIMESTAMP = "timestamp";
+ public static final String FIELD_NAME_ALL_EXCEPTIONS =
"all-exceptions";
+ public static final String FIELD_NAME_TRUNCATED = "truncated";
Review comment:
re-use existing constants instead
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskFailureHistoryEntry.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * {@code TaskFailureHistoryEntry} collects information about a single task
failure that should be
+ * exposed through the exception history.
+ */
+public class TaskFailureHistoryEntry implements Serializable {
+
+ private static final long serialVersionUID = -3855285510064263701L;
+
+ private final ErrorInfo errorInfo;
+ @Nullable private final String failingTaskName;
+ @Nullable private final TaskManagerLocation assignedResourceLocation;
+
+ private TaskFailureHistoryEntry(
+ ErrorInfo errorInfo,
+ @Nullable String failingTaskName,
+ @Nullable TaskManagerLocation assignedResourceLocation) {
+ this.errorInfo = Preconditions.checkNotNull(errorInfo);
+ this.failingTaskName = failingTaskName;
+ this.assignedResourceLocation = assignedResourceLocation;
+ }
+
+ public boolean isGlobalFailure() {
+ return failingTaskName != null;
+ }
+
+ public ErrorInfo getErrorInfo() {
+ return errorInfo;
+ }
+
+ @Nullable
+ public String getFailingTaskName() {
+ return failingTaskName;
+ }
+
+ @Nullable
+ public TaskManagerLocation getAssignedResourceLocation() {
+ return assignedResourceLocation;
+ }
+
+ public static TaskFailureHistoryEntry createGlobalFailureEntry(
+ Throwable failureCause, long timestamp) {
+ return new TaskFailureHistoryEntry(new ErrorInfo(failureCause,
timestamp), null, null);
Review comment:
Seems like `TaskFailureHistoryEntry` is not an appropriate name? Why not
just extend the `ErrorInfo` instead?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -87,18 +87,23 @@ protected JobExceptionsInfo handleRequest(
}
@Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph
graph)
+ public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
throws IOException {
- ResponseBody json = createJobExceptionsInfo(graph,
MAX_NUMBER_EXCEPTION_TO_REPORT);
+ ResponseBody json =
+ createJobExceptionsInfo(executionGraphInfo,
MAX_NUMBER_EXCEPTION_TO_REPORT);
String path =
getMessageHeaders()
.getTargetRestEndpointURL()
- .replace(':' + JobIDPathParameter.KEY,
graph.getJobID().toString());
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+ executionGraphInfo.getJobId().toString());
return Collections.singletonList(new ArchivedJson(path, json));
}
private static JobExceptionsInfo createJobExceptionsInfo(
- AccessExecutionGraph executionGraph, int exceptionToReportMaxSize)
{
+ ExecutionGraphInfo executionGraphInfo, int
exceptionToReportMaxSize) {
+ final ArchivedExecutionGraph executionGraph =
+ executionGraphInfo.getArchivedExecutionGraph();
Review comment:
Why is the `ExecutionGraphInfo` storing/returning an
`ArchivedExecutionGraph` instead of an `AccessExecutionGraph`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -775,9 +775,14 @@ public final void notifyPartitionDataAvailable(final
ResultPartitionID partition
protected void notifyPartitionDataAvailableInternal(
IntermediateResultPartitionID resultPartitionId) {}
+ /**
+ * Returns a copy of the current history of task failures.
+ *
+ * @return a copy of the current history of task failures.
+ */
@VisibleForTesting
protected List<TaskFailureHistoryEntry> getExceptionHistory() {
- return taskFailureHistory;
+ return new ArrayList<>(taskFailureHistory);
Review comment:
Why do you need a copy, and why would wrapping it in an unmodifiable
list not suffice?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
##########
@@ -156,4 +182,56 @@ public int hashCode() {
return Objects.hash(timestamp, exception, task, location);
}
}
+
+ /** Nested class to hold exception information of previous failures. */
+ public static final class JobExceptionHistoryEntry {
Review comment:
we could move this to the upper level, and have JobExceptionsInfo extend
this class; then we wouldn't duplicate the entire thing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/OnlyExecutionGraphJsonArchivist.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for all classes that want to participate in the archiving of
job-related json responses
+ * but only provide {@link AccessExecutionGraph}-related information.
+ */
+public interface OnlyExecutionGraphJsonArchivist extends JsonArchivist {
Review comment:
Why are we bothering with this distinction in the first place? Any
archivist working on the `ExecutionGraphInfo` has access to the
`AccessExecutionGraph` anyway. It should be trivial to adjust existing
`JsonArchivist` implementations accordingly.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -104,8 +104,12 @@ protected JobExceptionsInfo handleRequest(
private static JobExceptionsInfo createJobExceptionsInfo(
ExecutionGraphInfo executionGraphInfo, int
exceptionToReportMaxSize) {
+ final List<TaskFailureHistoryEntry> rootCauseFirstExceptionHistory =
+ executionGraphInfo.getExceptionHistory();
+ Collections.reverse(rootCauseFirstExceptionHistory);
Review comment:
I feel like such business should be handled in the UI.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]