XComp commented on a change in pull request #15049:
URL: https://github.com/apache/flink/pull/15049#discussion_r593341506



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -132,7 +142,62 @@ private static JobExceptionsInfo createJobExceptionsInfo(
             }
         }
 
+        final ErrorInfo rootCause = executionGraph.getFailureInfo();
+        return new JobExceptionsInfoWithHistory(
+                rootCause.getException().getOriginalErrorClassName(),
+                rootCause.getExceptionAsString(),
+                rootCause.getTimestamp(),
+                taskExceptionList,
+                truncated,
+                createJobExceptionHistory(
+                        executionGraphInfo.getExceptionHistory(), 
exceptionToReportMaxSize));
+    }
+
+    static JobExceptionsInfoWithHistory.JobExceptionHistory 
createJobExceptionHistory(
+            Iterable<ExceptionHistoryEntry> historyEntries, int limit) {
+        // we need to reverse the history to have a stable result when doing 
paging on it
+        final List<ExceptionHistoryEntry> reversedHistoryEntries = new 
ArrayList<>();
+        Iterables.addAll(reversedHistoryEntries, historyEntries);
+        Collections.reverse(reversedHistoryEntries);
+
+        List<JobExceptionsInfo> exceptionHistoryEntries =
+                reversedHistoryEntries.stream()
+                        .limit(limit)
+                        .map(JobExceptionsHandler::createJobExceptionInfo)
+                        .collect(Collectors.toList());
+
+        return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                exceptionHistoryEntries,
+                exceptionHistoryEntries.size() < 
reversedHistoryEntries.size());
+    }
+
+    private static JobExceptionsInfo 
createJobExceptionInfo(ExceptionHistoryEntry historyEntry) {
         return new JobExceptionsInfo(
-                rootExceptionMessage, rootTimestamp, taskExceptionList, 
truncated);
+                historyEntry.getException().getOriginalErrorClassName(),
+                historyEntry.getExceptionAsString(),
+                historyEntry.getTimestamp(),
+                Collections.singletonList(

Review comment:
       You have a good point here. I refactored the schema not relying on the 
old structure anymore. The refactoring also covered your concern. The 
`taskName` and `location` are not shared anymore if `null`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to