This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd80c072b5deeec6c91d8c0419b3108a4942f889
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Feb 22 19:25:21 2021 +0100

    [FLINK-21190][runtime] Introduces exception history to web UI
---
 .../generated/all_jobmanager_section.html          |   6 +
 .../generated/job_manager_configuration.html       |   6 +
 .../shortcodes/generated/rest_v1_dispatcher.html   |  37 +-
 .../flink/configuration/JobManagerOptions.java     |   9 +
 .../src/test/resources/rest_api_v1.snapshot        |  35 +-
 .../src/app/interfaces/job-exception.ts            |  14 +
 .../job/exceptions/job-exceptions.component.html   |  18 +-
 .../job/exceptions/job-exceptions.component.ts     |  17 +-
 .../rest/handler/job/JobExceptionsHandler.java     | 130 +++++--
 .../rest/messages/JobExceptionsHeaders.java        |  18 +-
 .../runtime/rest/messages/JobExceptionsInfo.java   |  54 ++-
 .../messages/JobExceptionsInfoWithHistory.java     | 273 +++++++++++++++
 .../runtime/scheduler/ExceptionHistoryEntry.java   | 189 ++++++++++
 .../runtime/scheduler/ExecutionGraphInfo.java      |   9 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |  42 ++-
 .../flink/runtime/util/BoundedFIFOQueue.java       |  87 +++++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 380 +++++++++++++++++++--
 .../rest/messages/JobExceptionsInfoTest.java       |  47 ---
 ...=> JobExceptionsInfoWithHistoryNoRootTest.java} |  30 +-
 .../messages/JobExceptionsInfoWithHistoryTest.java |  79 +++++
 .../runtime/scheduler/DefaultSchedulerTest.java    | 109 +++++-
 .../scheduler/ExceptionHistoryEntryTest.java       | 222 ++++++++++++
 .../scheduler/TestExecutionSlotAllocator.java      |  12 +-
 .../TestExecutionSlotAllocatorFactory.java         |   5 +
 .../flink/runtime/util/BoundedFIFOQueueTest.java   | 107 ++++++
 25 files changed, 1770 insertions(+), 165 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 707596c..8a5b586 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -27,6 +27,12 @@
             <td>Dictionary for JobManager to store the archives of completed 
jobs.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.exception-history-size</h5></td>
+            <td style="word-wrap: break-word;">16</td>
+            <td>Integer</td>
+            <td>The maximum number of failures collected by the exception 
history per job.</td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.execution.attempts-history-size</h5></td>
             <td style="word-wrap: break-word;">16</td>
             <td>Integer</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html 
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index ba83988..b0e7e7e 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -33,6 +33,12 @@
             <td>The local address of the network interface that the job 
manager binds to. If not configured, '0.0.0.0' will be used.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.exception-history-size</h5></td>
+            <td style="word-wrap: break-word;">16</td>
+            <td>Integer</td>
+            <td>The maximum number of failures collected by the exception 
history per job.</td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.execution.attempts-history-size</h5></td>
             <td style="word-wrap: break-word;">16</td>
             <td>Integer</td>
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 82e0f0b..181dd68 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -2630,7 +2630,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
       <td class="text-left">Response code: <code>200 OK</code></td>
     </tr>
     <tr>
-      <td colspan="2">Returns the non-recoverable exceptions that have been 
observed by the job. The truncated flag defines whether more exceptions 
occurred, but are not listed, because the response would otherwise get too 
big.</td>
+      <td colspan="2">Returns the most recent exceptions that have been 
handled by Flink for this job. The 'exceptionHistory.truncated' flag defines 
whether exceptions were filtered out through the GET parameter. The backend 
collects only a specific amount of most recent exceptions per job. This can be 
configured through jobmanager.exception-history-size in the Flink 
configuration. The following first-level members are deprecated: 
'root-exception', 'timestamp', 'timestamp', 'truncated'.  [...]
     </tr>
     <tr>
       <td colspan="2">Path parameters</td>
@@ -2685,7 +2685,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
             <code>
 {
   "type" : "object",
-  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory",
   "properties" : {
     "all-exceptions" : {
       "type" : "array",
@@ -2708,6 +2708,39 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
         }
       }
     },
+    "exceptionHistory" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory",
+      "properties" : {
+        "entries" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo",
+            "properties" : {
+              "exceptionName" : {
+                "type" : "string"
+              },
+              "location" : {
+                "type" : "string"
+              },
+              "stacktrace" : {
+                "type" : "string"
+              },
+              "taskName" : {
+                "type" : "string"
+              },
+              "timestamp" : {
+                "type" : "integer"
+              }
+            }
+          }
+        },
+        "truncated" : {
+          "type" : "boolean"
+        }
+      }
+    },
     "root-exception" : {
       "type" : "string"
     },
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index d81f018..0438730 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -257,6 +257,15 @@ public class JobManagerOptions {
                     .withDescription(
                             "The maximum number of prior execution attempts 
kept in history.");
 
+    /** The maximum number of failures kept in the exception history. */
+    @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+    public static final ConfigOption<Integer> MAX_EXCEPTION_HISTORY_SIZE =
+            key("jobmanager.exception-history-size")
+                    .intType()
+                    .defaultValue(16)
+                    .withDescription(
+                            "The maximum number of failures collected by the 
exception history per job.");
+
     /**
      * This option specifies the failover strategy, i.e. how the job 
computation recovers from task
      * failures.
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index ae96ce3..847949e 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1572,7 +1572,7 @@
     },
     "response" : {
       "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory",
       "properties" : {
         "root-exception" : {
           "type" : "string"
@@ -1603,6 +1603,39 @@
         },
         "truncated" : {
           "type" : "boolean"
+        },
+        "exceptionHistory" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory",
+          "properties" : {
+            "entries" : {
+              "type" : "array",
+              "items" : {
+                "type" : "object",
+                "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo",
+                "properties" : {
+                  "exceptionName" : {
+                    "type" : "string"
+                  },
+                  "stacktrace" : {
+                    "type" : "string"
+                  },
+                  "timestamp" : {
+                    "type" : "integer"
+                  },
+                  "taskName" : {
+                    "type" : "string"
+                  },
+                  "location" : {
+                    "type" : "string"
+                  }
+                }
+              }
+            },
+            "truncated" : {
+              "type" : "boolean"
+            }
+          }
         }
       }
     }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts
index 83d831b..79a86c3 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts
@@ -21,6 +21,7 @@ export interface JobExceptionInterface {
   timestamp: number;
   truncated: boolean;
   'all-exceptions': JobExceptionItemInterface[];
+  'exceptionHistory': JobExceptionHistoryInterface;
 }
 
 export interface JobExceptionItemInterface {
@@ -32,3 +33,16 @@ export interface JobExceptionItemInterface {
   timestamp: number;
   'vertex-id': string;
 }
+
+export interface JobExceptionHistoryInterface {
+  entries: ExceptionInfoInterface[];
+  truncated: boolean;
+}
+
+export interface ExceptionInfoInterface {
+  exceptionName: string;
+  stacktrace: string;
+  timestamp: number;
+  taskName: string;
+  location: string;
+}
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html
index ea58a7f..8da139f 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html
@@ -31,6 +31,7 @@
         <tr>
           <th nzShowExpand></th>
           <th>Time</th>
+          <th>Exception</th>
           <th>Name</th>
           <th>Location</th>
         </tr>
@@ -40,21 +41,28 @@
           <tr>
             <td nzShowExpand [(nzExpand)]="exception.expand"></td>
             <td>{{exception.timestamp | date:'yyyy-MM-dd HH:mm:ss'}}</td>
+            <td><div class="name">{{exception.exceptionName}}</div></td>
             <td>
               <div class="name">
-                {{exception.task}}
+                  {{exception.taskName || "(global failure)"}}
               </div>
             </td>
-            <td>{{exception.location}}</td>
+            <td>{{exception.location || "(unassigned)"}}</td>
           </tr>
           <tr [nzExpand]="exception.expand">
-            <td colspan="6" class="expand-td">
-              <flink-monaco-editor *ngIf="exception.expand" class="subtask" 
[value]="exception.exception"></flink-monaco-editor>
+            <td colspan="5" class="expand-td">
+              <flink-monaco-editor *ngIf="exception.expand" class="subtask" 
[value]="exception.stacktrace"></flink-monaco-editor>
             </td>
           </tr>
         </ng-container>
-        <tr *ngIf="truncated">
+        <tr *ngIf="listOfException.length > 0">
           <td colspan="6">
+            <i nz-icon nzType="info-circle" nzTheme="fill"></i>&nbsp;
+            <i>The exception history is limited to the most recent failures 
that caused parts of the job or the entire job to restart. The maximum history 
size can be configured through the Flink configuration.</i>
+          </td>
+        </tr>
+        <tr *ngIf="truncated">
+          <td colspan="5">
             <button nz-button nzBlock nzType="primary" nzGhost 
(click)="loadMore()" [nzLoading]="isLoading">Load More</button>
           </td>
         </tr>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts
index cb72604..0513993 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts
@@ -18,7 +18,7 @@
 
 import { formatDate } from '@angular/common';
 import { Component, OnInit, ChangeDetectionStrategy, ChangeDetectorRef } from 
'@angular/core';
-import { JobExceptionItemInterface } from 'interfaces';
+import { ExceptionInfoInterface } from 'interfaces';
 import { distinctUntilChanged, flatMap, tap } from 'rxjs/operators';
 import { JobService } from 'services';
 
@@ -30,12 +30,13 @@ import { JobService } from 'services';
 })
 export class JobExceptionsComponent implements OnInit {
   rootException = '';
-  listOfException: JobExceptionItemInterface[] = [];
+  listOfException: ExceptionInfoInterface[] = [];
   truncated = false;
   isLoading = false;
   maxExceptions = 0;
+  total = 0;
 
-  trackExceptionBy(_: number, node: JobExceptionItemInterface) {
+  trackExceptionBy(_: number, node: ExceptionInfoInterface) {
     return node.timestamp;
   }
   loadMore() {
@@ -52,13 +53,15 @@ export class JobExceptionsComponent implements OnInit {
       )
       .subscribe(data => {
         // @ts-ignore
-        if (data['root-exception']) {
-          this.rootException = formatDate(data.timestamp, 'yyyy-MM-dd 
HH:mm:ss', 'en') + '\n' + data['root-exception'];
+        var exceptionHistory = data.exceptionHistory
+        if (exceptionHistory.entries.length > 0) {
+          var mostRecentException = exceptionHistory.entries[0]
+          this.rootException = formatDate(mostRecentException.timestamp, 
'yyyy-MM-dd HH:mm:ss', 'en') + '\n' + mostRecentException.stacktrace;
         } else {
           this.rootException = 'No Root Exception';
         }
-        this.truncated = data.truncated;
-        this.listOfException = data['all-exceptions'];
+        this.truncated = exceptionHistory.truncated;
+        this.listOfException = exceptionHistory.entries;
       });
   }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index bb817df..cf9a3cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -18,25 +18,34 @@
 
 package org.apache.flink.runtime.rest.handler.job;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
+import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import 
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,12 +55,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /** Handler serving the job exceptions. */
 public class JobExceptionsHandler
-        extends AbstractAccessExecutionGraphHandler<
-                JobExceptionsInfo, JobExceptionsMessageParameters>
-        implements OnlyExecutionGraphJsonArchivist {
+        extends AbstractExecutionGraphHandler<
+                JobExceptionsInfoWithHistory, JobExceptionsMessageParameters>
+        implements JsonArchivist {
 
     static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
@@ -59,7 +69,10 @@ public class JobExceptionsHandler
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Time timeout,
             Map<String, String> responseHeaders,
-            MessageHeaders<EmptyRequestBody, JobExceptionsInfo, 
JobExceptionsMessageParameters>
+            MessageHeaders<
+                            EmptyRequestBody,
+                            JobExceptionsInfoWithHistory,
+                            JobExceptionsMessageParameters>
                     messageHeaders,
             ExecutionGraphCache executionGraphCache,
             Executor executor) {
@@ -74,9 +87,9 @@ public class JobExceptionsHandler
     }
 
     @Override
-    protected JobExceptionsInfo handleRequest(
+    protected JobExceptionsInfoWithHistory handleRequest(
             HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request,
-            AccessExecutionGraph executionGraph) {
+            ExecutionGraphInfo executionGraph) {
         List<Integer> exceptionToReportMaxSizes =
                 request.getQueryParameter(UpperLimitExceptionParameter.class);
         final int exceptionToReportMaxSize =
@@ -87,24 +100,25 @@ public class JobExceptionsHandler
     }
 
     @Override
-    public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph 
graph)
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
             throws IOException {
-        ResponseBody json = createJobExceptionsInfo(graph, 
MAX_NUMBER_EXCEPTION_TO_REPORT);
+        ResponseBody json =
+                createJobExceptionsInfo(executionGraphInfo, 
MAX_NUMBER_EXCEPTION_TO_REPORT);
         String path =
                 getMessageHeaders()
                         .getTargetRestEndpointURL()
-                        .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+                        .replace(
+                                ':' + JobIDPathParameter.KEY,
+                                executionGraphInfo.getJobId().toString());
         return Collections.singletonList(new ArchivedJson(path, json));
     }
 
-    private static JobExceptionsInfo createJobExceptionsInfo(
-            AccessExecutionGraph executionGraph, int exceptionToReportMaxSize) 
{
-        ErrorInfo rootException = executionGraph.getFailureInfo();
-        String rootExceptionMessage = null;
-        Long rootTimestamp = null;
-        if (rootException != null) {
-            rootExceptionMessage = rootException.getExceptionAsString();
-            rootTimestamp = rootException.getTimestamp();
+    private static JobExceptionsInfoWithHistory createJobExceptionsInfo(
+            ExecutionGraphInfo executionGraphInfo, int 
exceptionToReportMaxSize) {
+        final ArchivedExecutionGraph executionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
+        if (executionGraph.getFailureInfo() == null) {
+            return new JobExceptionsInfoWithHistory();
         }
 
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new 
ArrayList<>();
@@ -118,10 +132,7 @@ public class JobExceptionsHandler
                 }
 
                 TaskManagerLocation location = 
task.getCurrentAssignedResourceLocation();
-                String locationString =
-                        location != null
-                                ? location.getFQDNHostname() + ':' + 
location.dataPort()
-                                : "(unassigned)";
+                String locationString = toString(location);
                 long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
                 taskExceptionList.add(
                         new JobExceptionsInfo.ExecutionExceptionInfo(
@@ -132,7 +143,76 @@ public class JobExceptionsHandler
             }
         }
 
-        return new JobExceptionsInfo(
-                rootExceptionMessage, rootTimestamp, taskExceptionList, 
truncated);
+        final ErrorInfo rootCause = executionGraph.getFailureInfo();
+        return new JobExceptionsInfoWithHistory(
+                rootCause.getExceptionAsString(),
+                rootCause.getTimestamp(),
+                taskExceptionList,
+                truncated,
+                createJobExceptionHistory(
+                        executionGraphInfo.getExceptionHistory(), 
exceptionToReportMaxSize));
+    }
+
+    static JobExceptionsInfoWithHistory.JobExceptionHistory 
createJobExceptionHistory(
+            Iterable<ExceptionHistoryEntry> historyEntries, int limit) {
+        // we need to reverse the history to have a stable result when doing 
paging on it
+        final List<ExceptionHistoryEntry> reversedHistoryEntries = new 
ArrayList<>();
+        Iterables.addAll(reversedHistoryEntries, historyEntries);
+        Collections.reverse(reversedHistoryEntries);
+
+        List<JobExceptionsInfoWithHistory.ExceptionInfo> 
exceptionHistoryEntries =
+                reversedHistoryEntries.stream()
+                        .limit(limit)
+                        .map(JobExceptionsHandler::createExceptionInfo)
+                        .collect(Collectors.toList());
+
+        return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                exceptionHistoryEntries,
+                exceptionHistoryEntries.size() < 
reversedHistoryEntries.size());
+    }
+
+    private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
+            ExceptionHistoryEntry historyEntry) {
+        if (historyEntry.isGlobal()) {
+            return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                    historyEntry.getException().getOriginalErrorClassName(),
+                    historyEntry.getExceptionAsString(),
+                    historyEntry.getTimestamp());
+        }
+
+        Preconditions.checkArgument(
+                historyEntry.getFailingTaskName() != null,
+                "The taskName must not be null for a non-global failure.");
+        Preconditions.checkArgument(
+                historyEntry.getTaskManagerLocation() != null,
+                "The location must not be null for a non-global failure.");
+
+        return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                historyEntry.getException().getOriginalErrorClassName(),
+                historyEntry.getExceptionAsString(),
+                historyEntry.getTimestamp(),
+                historyEntry.getFailingTaskName(),
+                toString(historyEntry.getTaskManagerLocation()));
+    }
+
+    @VisibleForTesting
+    static String toString(@Nullable TaskManagerLocation location) {
+        // '(unassigned)' being the default value is added to support 
backward-compatibility for the
+        // deprecated fields
+        return location != null
+                ? taskManagerLocationToString(location.getFQDNHostname(), 
location.dataPort())
+                : "(unassigned)";
+    }
+
+    @VisibleForTesting
+    @Nullable
+    static String toString(@Nullable 
ExceptionHistoryEntry.ArchivedTaskManagerLocation location) {
+        return location != null
+                ? taskManagerLocationToString(location.getFQDNHostname(), 
location.getPort())
+                : null;
+    }
+
+    private static String taskManagerLocationToString(String fqdnHostname, int 
port) {
+        return String.format("%s:%d", fqdnHostname, port);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
index e6b8018..364809b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
@@ -27,7 +28,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 /** Message headers for the {@link JobExceptionsHandler}. */
 public class JobExceptionsHeaders
         implements MessageHeaders<
-                EmptyRequestBody, JobExceptionsInfo, 
JobExceptionsMessageParameters> {
+                EmptyRequestBody, JobExceptionsInfoWithHistory, 
JobExceptionsMessageParameters> {
 
     private static final JobExceptionsHeaders INSTANCE = new 
JobExceptionsHeaders();
 
@@ -41,8 +42,8 @@ public class JobExceptionsHeaders
     }
 
     @Override
-    public Class<JobExceptionsInfo> getResponseClass() {
-        return JobExceptionsInfo.class;
+    public Class<JobExceptionsInfoWithHistory> getResponseClass() {
+        return JobExceptionsInfoWithHistory.class;
     }
 
     @Override
@@ -71,7 +72,14 @@ public class JobExceptionsHeaders
 
     @Override
     public String getDescription() {
-        return "Returns the non-recoverable exceptions that have been observed 
by the job. The truncated flag defines "
-                + "whether more exceptions occurred, but are not listed, 
because the response would otherwise get too big.";
+        return String.format(
+                "Returns the most recent exceptions that have been handled by 
Flink for this job. The "
+                        + "'exceptionHistory.truncated' flag defines whether 
exceptions were filtered "
+                        + "out through the GET parameter. The backend collects 
only a specific amount "
+                        + "of most recent exceptions per job. This can be 
configured through %s in the "
+                        + "Flink configuration. The following first-level 
members are deprecated: "
+                        + "'root-exception', 'timestamp', 'timestamp', 
'truncated'. Use the data provided "
+                        + "through 'exceptionHistory', instead.",
+                JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE.key());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
index 7450390..2defb01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -27,24 +26,44 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import java.util.List;
 import java.util.Objects;
+import java.util.StringJoiner;
 
-/** Response type of the {@link JobExceptionsHandler}. */
-public class JobExceptionsInfo implements ResponseBody {
+/**
+ * {@code JobExceptionInfo} holds the information for single failure which 
caused a (maybe partial)
+ * job restart.
+ */
+public class JobExceptionsInfo {
 
     public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception";
     public static final String FIELD_NAME_TIMESTAMP = "timestamp";
     public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions";
     public static final String FIELD_NAME_TRUNCATED = "truncated";
 
+    /**
+     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
+     */
+    @Deprecated
     @JsonProperty(FIELD_NAME_ROOT_EXCEPTION)
     private final String rootException;
 
+    /**
+     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
+     */
+    @Deprecated
     @JsonProperty(FIELD_NAME_TIMESTAMP)
     private final Long rootTimestamp;
 
+    /**
+     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
+     */
+    @Deprecated
     @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
     private final List<ExecutionExceptionInfo> allExceptions;
 
+    /**
+     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
+     */
+    @Deprecated
     @JsonProperty(FIELD_NAME_TRUNCATED)
     private final boolean truncated;
 
@@ -80,6 +99,16 @@ public class JobExceptionsInfo implements ResponseBody {
         return Objects.hash(rootException, rootTimestamp, allExceptions, 
truncated);
     }
 
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", JobExceptionsInfo.class.getSimpleName() 
+ "[", "]")
+                .add("rootException='" + rootException + "'")
+                .add("rootTimestamp=" + rootTimestamp)
+                .add("allExceptions=" + allExceptions)
+                .add("truncated=" + truncated)
+                .toString();
+    }
+
     @JsonIgnore
     public String getRootException() {
         return rootException;
@@ -104,7 +133,14 @@ public class JobExceptionsInfo implements ResponseBody {
     // Static helper classes
     // 
---------------------------------------------------------------------------------
 
-    /** Nested class to encapsulate the task execution exception. */
+    /**
+     * Nested class to encapsulate the task execution exception.
+     *
+     * @deprecated {@code ExecutionExceptionInfo} will be replaced by {@link
+     *     JobExceptionsInfoWithHistory.ExceptionInfo} as part of the effort 
of deprecating {@link
+     *     JobExceptionsInfo#allExceptions}.
+     */
+    @Deprecated
     public static final class ExecutionExceptionInfo {
         public static final String FIELD_NAME_EXCEPTION = "exception";
         public static final String FIELD_NAME_TASK = "task";
@@ -155,5 +191,15 @@ public class JobExceptionsInfo implements ResponseBody {
         public int hashCode() {
             return Objects.hash(timestamp, exception, task, location);
         }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
ExecutionExceptionInfo.class.getSimpleName() + "[", "]")
+                    .add("exception='" + exception + "'")
+                    .add("task='" + task + "'")
+                    .add("location='" + location + "'")
+                    .add("timestamp=" + timestamp)
+                    .toString();
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
new file mode 100644
index 0000000..bd0b2aa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+import static 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo} 
providing a history of
+ * previously caused failures. It's the response type of the {@link 
JobExceptionsHandler}.
+ */
+public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements 
ResponseBody {
+
+    public static final String FIELD_NAME_EXCEPTION_HISTORY = 
"exceptionHistory";
+
+    @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+    private final JobExceptionHistory exceptionHistory;
+
+    @JsonCreator
+    public JobExceptionsInfoWithHistory(
+            @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
+            @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
+            @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) 
List<ExecutionExceptionInfo> allExceptions,
+            @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated,
+            @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory 
exceptionHistory) {
+        super(rootException, rootTimestamp, allExceptions, truncated);
+        this.exceptionHistory = exceptionHistory;
+    }
+
+    public JobExceptionsInfoWithHistory() {
+        this(
+                null,
+                null,
+                Collections.emptyList(),
+                false,
+                new JobExceptionHistory(Collections.emptyList(), false));
+    }
+
+    @JsonIgnore
+    public JobExceptionHistory getExceptionHistory() {
+        return exceptionHistory;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
+        return this.isTruncated() == that.isTruncated()
+                && Objects.equals(this.getRootException(), 
that.getRootException())
+                && Objects.equals(this.getRootTimestamp(), 
that.getRootTimestamp())
+                && Objects.equals(this.getAllExceptions(), 
that.getAllExceptions())
+                && Objects.equals(exceptionHistory, that.exceptionHistory);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                isTruncated(),
+                getRootException(),
+                getRootTimestamp(),
+                getAllExceptions(),
+                exceptionHistory);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", 
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
+                .add("rootException='" + getRootException() + "'")
+                .add("rootTimestamp=" + getRootTimestamp())
+                .add("allExceptions=" + getAllExceptions())
+                .add("truncated=" + isTruncated())
+                .add("exceptionHistory=" + exceptionHistory)
+                .toString();
+    }
+
+    /** {@code JobExceptionHistory} collects all previously caught errors. */
+    public static final class JobExceptionHistory {
+
+        public static final String FIELD_NAME_ENTRIES = "entries";
+        public static final String FIELD_NAME_TRUNCATED = "truncated";
+
+        @JsonProperty(FIELD_NAME_ENTRIES)
+        private final List<ExceptionInfo> entries;
+
+        @JsonProperty(FIELD_NAME_TRUNCATED)
+        private final boolean truncated;
+
+        @JsonCreator
+        public JobExceptionHistory(
+                @JsonProperty(FIELD_NAME_ENTRIES) List<ExceptionInfo> entries,
+                @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
+            this.entries = entries;
+            this.truncated = truncated;
+        }
+
+        @JsonIgnore
+        public List<ExceptionInfo> getEntries() {
+            return entries;
+        }
+
+        @JsonIgnore
+        public boolean isTruncated() {
+            return truncated;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JobExceptionHistory that = (JobExceptionHistory) o;
+            return this.isTruncated() == that.isTruncated()
+                    && Objects.equals(entries, that.entries);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(entries, truncated);
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", 
JobExceptionHistory.class.getSimpleName() + "[", "]")
+                    .add("entries=" + entries)
+                    .add("truncated=" + truncated)
+                    .toString();
+        }
+    }
+
+    /** Collects the information of a single exception. */
+    public static class ExceptionInfo {
+
+        public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName";
+        public static final String FIELD_NAME_EXCEPTION_STACKTRACE = 
"stacktrace";
+        public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = 
"timestamp";
+        public static final String FIELD_NAME_TASK_NAME = "taskName";
+        public static final String FIELD_NAME_LOCATION = "location";
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_NAME)
+        private final String exceptionName;
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE)
+        private final String stacktrace;
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP)
+        private final long timestamp;
+
+        @JsonInclude(NON_NULL)
+        @JsonProperty(FIELD_NAME_TASK_NAME)
+        @Nullable
+        private final String taskName;
+
+        @JsonInclude(NON_NULL)
+        @JsonProperty(FIELD_NAME_LOCATION)
+        @Nullable
+        private final String location;
+
+        public ExceptionInfo(String exceptionName, String stacktrace, long 
timestamp) {
+            this(exceptionName, stacktrace, timestamp, null, null);
+        }
+
+        @JsonCreator
+        public ExceptionInfo(
+                @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
+                @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String 
stacktrace,
+                @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
+                @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
+                @JsonProperty(FIELD_NAME_LOCATION) @Nullable String location) {
+            this.exceptionName = checkNotNull(exceptionName);
+            this.stacktrace = checkNotNull(stacktrace);
+            this.timestamp = timestamp;
+            this.taskName = taskName;
+            this.location = location;
+        }
+
+        @JsonIgnore
+        public String getExceptionName() {
+            return exceptionName;
+        }
+
+        @JsonIgnore
+        public String getStacktrace() {
+            return stacktrace;
+        }
+
+        @JsonIgnore
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        @JsonIgnore
+        @Nullable
+        public String getTaskName() {
+            return taskName;
+        }
+
+        @JsonIgnore
+        @Nullable
+        public String getLocation() {
+            return location;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ExceptionInfo that = (ExceptionInfo) o;
+            return exceptionName.equals(that.exceptionName)
+                    && stacktrace.equals(that.stacktrace)
+                    && Objects.equals(timestamp, that.timestamp)
+                    && Objects.equals(taskName, that.taskName)
+                    && Objects.equals(location, that.location);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(exceptionName, stacktrace, timestamp, 
taskName, location);
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() 
+ "[", "]")
+                    .add("exceptionName='" + exceptionName + "'")
+                    .add("stacktrace='" + stacktrace + "'")
+                    .add("timestamp=" + timestamp)
+                    .add("taskName='" + taskName + "'")
+                    .add("location='" + location + "'")
+                    .toString();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java
new file mode 100644
index 0000000..83f6154
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.StringJoiner;
+
+/**
+ * {@code ExceptionHistoryEntry} collects information about a single failure 
that triggered the
+ * scheduler's failure handling.
+ */
+public class ExceptionHistoryEntry extends ErrorInfo {
+
+    private static final long serialVersionUID = -3855285510064263701L;
+
+    @Nullable private final String failingTaskName;
+    @Nullable private final ArchivedTaskManagerLocation taskManagerLocation;
+
+    /**
+     * Creates a {@code ExceptionHistoryEntry} representing a global failure 
from the passed {@code
+     * Throwable} and timestamp.
+     *
+     * @param cause The reason for the failure.
+     * @param timestamp The time the failure was caught.
+     * @return The {@code ExceptionHistoryEntry} instance.
+     */
+    public static ExceptionHistoryEntry fromGlobalFailure(Throwable cause, 
long timestamp) {
+        return new ExceptionHistoryEntry(cause, timestamp, null, null);
+    }
+
+    /**
+     * Creates a {@code ExceptionHistoryEntry} representing a local failure 
using the passed
+     * information.
+     *
+     * @param execution The {@link AccessExecution} that caused the failure.
+     * @param failingTaskName The name of the task the {@code execution} is 
connected to.
+     * @return The {@code ExceptionHistoryEntry} instance.
+     */
+    public static ExceptionHistoryEntry fromFailedExecution(
+            AccessExecution execution, String failingTaskName) {
+        ErrorInfo failureInfo =
+                execution
+                        .getFailureInfo()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "The passed Execution does not 
provide a failureCause."));
+        return new ExceptionHistoryEntry(
+                failureInfo.getException(),
+                failureInfo.getTimestamp(),
+                failingTaskName,
+                ArchivedTaskManagerLocation.fromTaskManagerLocation(
+                        execution.getAssignedResourceLocation()));
+    }
+
+    @VisibleForTesting
+    public ExceptionHistoryEntry(
+            Throwable cause,
+            long timestamp,
+            @Nullable String failingTaskName,
+            @Nullable ArchivedTaskManagerLocation taskManagerLocation) {
+        super(cause, timestamp);
+        this.failingTaskName = failingTaskName;
+        this.taskManagerLocation = taskManagerLocation;
+    }
+
+    public boolean isGlobal() {
+        return failingTaskName == null;
+    }
+
+    @Nullable
+    public String getFailingTaskName() {
+        return failingTaskName;
+    }
+
+    @Nullable
+    public ArchivedTaskManagerLocation getTaskManagerLocation() {
+        return taskManagerLocation;
+    }
+
+    /**
+     * {@code ArchivedTaskManagerLocation} represents a archived (static) 
version of a {@link
+     * TaskManagerLocation}. It overcomes the issue with {@link 
TaskManagerLocation#inetAddress}
+     * being partially transient due to the cache becoming out-dated.
+     */
+    public static class ArchivedTaskManagerLocation implements Serializable {
+
+        private static final long serialVersionUID = -6596854145482446664L;
+
+        private final ResourceID resourceID;
+        private final String addressStr;
+        private final int port;
+        private final String hostname;
+        private final String fqdnHostname;
+
+        /**
+         * Creates a {@code ArchivedTaskManagerLocation} copy of the passed 
{@link
+         * TaskManagerLocation}.
+         *
+         * @param taskManagerLocation The {@code TaskManagerLocation} that's 
going to be copied.
+         * @return The corresponding {@code ArchivedTaskManagerLocation} or 
{@code null} if {@code
+         *     null} was passed.
+         */
+        @VisibleForTesting
+        @Nullable
+        public static ArchivedTaskManagerLocation fromTaskManagerLocation(
+                TaskManagerLocation taskManagerLocation) {
+            if (taskManagerLocation == null) {
+                return null;
+            }
+
+            return new ArchivedTaskManagerLocation(
+                    taskManagerLocation.getResourceID(),
+                    taskManagerLocation.addressString(),
+                    taskManagerLocation.dataPort(),
+                    taskManagerLocation.getHostname(),
+                    taskManagerLocation.getFQDNHostname());
+        }
+
+        private ArchivedTaskManagerLocation(
+                ResourceID resourceID,
+                String addressStr,
+                int port,
+                String hostname,
+                String fqdnHost) {
+            this.resourceID = resourceID;
+            this.addressStr = addressStr;
+            this.port = port;
+            this.hostname = hostname;
+            this.fqdnHostname = fqdnHost;
+        }
+
+        public ResourceID getResourceID() {
+            return resourceID;
+        }
+
+        public String getAddress() {
+            return addressStr;
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        public String getHostname() {
+            return hostname;
+        }
+
+        public String getFQDNHostname() {
+            return fqdnHostname;
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(
+                            ", ", 
ArchivedTaskManagerLocation.class.getSimpleName() + "[", "]")
+                    .add("resourceID=" + resourceID)
+                    .add("addressStr='" + addressStr + "'")
+                    .add("port=" + port)
+                    .add("hostname='" + hostname + "'")
+                    .add("fqdnHostname='" + fqdnHostname + "'")
+                    .toString();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index 4b7b746..d43a355 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -20,12 +20,10 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * {@code ExecutionGraphInfo} serves as a composite class that provides 
different {@link
@@ -36,14 +34,15 @@ public class ExecutionGraphInfo implements Serializable {
     private static final long serialVersionUID = -6134203195124124202L;
 
     private final ArchivedExecutionGraph executionGraph;
-    private final List<ErrorInfo> exceptionHistory;
+    private final Iterable<ExceptionHistoryEntry> exceptionHistory;
 
     public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
         this(executionGraph, Collections.emptyList());
     }
 
     public ExecutionGraphInfo(
-            ArchivedExecutionGraph executionGraph, List<ErrorInfo> 
exceptionHistory) {
+            ArchivedExecutionGraph executionGraph,
+            Iterable<ExceptionHistoryEntry> exceptionHistory) {
         this.executionGraph = executionGraph;
         this.exceptionHistory = exceptionHistory;
     }
@@ -56,7 +55,7 @@ public class ExecutionGraphInfo implements Serializable {
         return executionGraph;
     }
 
-    public List<ErrorInfo> getExceptionHistory() {
+    public Iterable<ExceptionHistoryEntry> getExceptionHistory() {
         return exceptionHistory;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 9aeabd4..c08ea6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -43,7 +44,6 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -79,6 +79,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.util.BoundedFIFOQueue;
 import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -141,7 +142,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
 
-    private final List<ErrorInfo> taskFailureHistory = new ArrayList<>();
+    private final BoundedFIFOQueue<ExceptionHistoryEntry> exceptionHistory;
 
     private final ExecutionGraphFactory executionGraphFactory;
 
@@ -206,6 +207,10 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         this.operatorCoordinatorHandler =
                 new OperatorCoordinatorHandler(executionGraph, 
this::handleGlobalFailure);
         
operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);
+        exceptionHistory =
+                new BoundedFIFOQueue<>(
+                        jobMasterConfiguration.getInteger(
+                                JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE));
     }
 
     private void registerShutDownCheckpointServicesOnExecutionGraphTermination(
@@ -535,7 +540,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
     }
 
     protected final void archiveGlobalFailure(@Nullable Throwable failure, 
long timestamp) {
-        
taskFailureHistory.add(ErrorInfo.createErrorInfoWithNullableCause(failure, 
timestamp));
+        exceptionHistory.add(ExceptionHistoryEntry.fromGlobalFailure(failure, 
timestamp));
         log.debug("Archive global failure.", failure);
     }
 
@@ -549,16 +554,15 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
 
         if (executionOptional.isPresent()) {
             final Execution failedExecution = executionOptional.get();
-            failedExecution
-                    .getFailureInfo()
-                    .ifPresent(
-                            failureInfo -> {
-                                taskFailureHistory.add(failureInfo);
-                                log.debug(
-                                        "Archive local failure causing attempt 
{} to fail: {}",
-                                        failedExecution.getAttemptId(),
-                                        failureInfo.getExceptionAsString());
-                            });
+            final ExceptionHistoryEntry exceptionHistoryEntry =
+                    ExceptionHistoryEntry.fromFailedExecution(
+                            failedExecution,
+                            
failedExecution.getVertex().getTaskNameWithSubtaskIndex());
+            exceptionHistory.add(exceptionHistoryEntry);
+            log.debug(
+                    "Archive local failure causing attempt {} to fail: {}",
+                    failedExecution.getAttemptId(),
+                    exceptionHistoryEntry.getExceptionAsString());
         } else {
             // fallback in case of a global fail over - no failed state is set 
and, therefore, no
             // timestamp was taken
@@ -646,9 +650,17 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
     protected void notifyPartitionDataAvailableInternal(
             IntermediateResultPartitionID resultPartitionId) {}
 
+    /**
+     * Returns a copy of the current history of task failures.
+     *
+     * @return a copy of the current history of task failures.
+     */
     @VisibleForTesting
-    protected List<ErrorInfo> getExceptionHistory() {
-        return taskFailureHistory;
+    protected Iterable<ExceptionHistoryEntry> getExceptionHistory() {
+        final Collection<ExceptionHistoryEntry> copy = new 
ArrayList<>(exceptionHistory.size());
+        exceptionHistory.forEach(copy::add);
+
+        return copy;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
new file mode 100644
index 0000000..aa8f81a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * {@code BoundedFIFOQueue} collects elements up to given amount. Reaching 
this limit will result in
+ * removing the oldest element from this queue (First-In/First-Out; FIFO).
+ *
+ * @param <T> The type of elements collected.
+ */
+public class BoundedFIFOQueue<T> implements Iterable<T>, Serializable {
+
+    private static final long serialVersionUID = -890727339944580409L;
+
+    private final int maxSize;
+    private final Queue<T> elements;
+
+    /**
+     * Creates a {@code BoundedFIFOQueue} with the given maximum size.
+     *
+     * @param maxSize The maximum size of this queue. Exceeding this limit 
would result in removing
+     *     the oldest element (FIFO).
+     * @throws IllegalArgumentException If {@code maxSize} is less than 0.
+     */
+    public BoundedFIFOQueue(int maxSize) {
+        Preconditions.checkArgument(maxSize >= 0, "The maximum size should be 
at least 0.");
+
+        this.maxSize = maxSize;
+        this.elements = new LinkedList<>();
+    }
+
+    /**
+     * Adds an element to the end of the queue. An element will be removed 
from the head of the
+     * queue if the queue would exceed its maximum size by adding the new 
element.
+     *
+     * @param element The element that should be added to the end of the queue.
+     * @throws NullPointerException If {@code null} is passed as an element.
+     */
+    public void add(T element) {
+        Preconditions.checkNotNull(element);
+        if (elements.add(element) && elements.size() > maxSize) {
+            elements.poll();
+        }
+    }
+
+    /**
+     * Returns the number of currently stored elements.
+     *
+     * @return The number of currently stored elements.
+     */
+    public int size() {
+        return this.elements.size();
+    }
+
+    /**
+     * Returns the {@code BoundedFIFOQueue}'s {@link Iterator}.
+     *
+     * @return The queue's {@code Iterator}.
+     */
+    @Override
+    public Iterator<T> iterator() {
+        return elements.iterator();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index bdbd8ba..dd1e5ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
@@ -37,68 +36,245 @@ import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
+import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /** Test for the {@link JobExceptionsHandler}. */
 public class JobExceptionsHandlerTest extends TestLogger {
 
+    private final JobExceptionsHandler testInstance =
+            new JobExceptionsHandler(
+                    CompletableFuture::new,
+                    TestingUtils.TIMEOUT(),
+                    Collections.emptyMap(),
+                    JobExceptionsHeaders.getInstance(),
+                    new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
+                    TestingUtils.defaultExecutor());
+
+    @Test
+    public void testNoExceptions() throws HandlerRequestException {
+        final ExecutionGraphInfo executionGraphInfo =
+                new ExecutionGraphInfo(new 
ArchivedExecutionGraphBuilder().build());
+
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response.getRootException(), is(nullValue()));
+        assertThat(response.getRootTimestamp(), is(nullValue()));
+        assertFalse(response.isTruncated());
+        assertThat(response.getAllExceptions(), empty());
+        assertThat(response.getExceptionHistory().getEntries(), empty());
+    }
+
+    @Test
+    public void testOnlyRootCause() throws HandlerRequestException {
+        final Throwable rootCause = new RuntimeException("root cause");
+        final long rootCauseTimestamp = System.currentTimeMillis();
+
+        final ExecutionGraphInfo executionGraphInfo =
+                createExecutionGraphInfo(
+                        ExceptionHistoryEntry.fromGlobalFailure(rootCause, 
rootCauseTimestamp));
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response.getRootException(), 
is(ExceptionUtils.stringifyException(rootCause)));
+        assertThat(response.getRootTimestamp(), is(rootCauseTimestamp));
+        assertFalse(response.isTruncated());
+        assertThat(response.getAllExceptions(), empty());
+
+        assertThat(
+                response.getExceptionHistory().getEntries(),
+                contains(historyContainsGlobalFailure(rootCause, 
rootCauseTimestamp)));
+    }
+
+    @Test
+    public void testWithExceptionHistory() throws HandlerRequestException {
+        final ExceptionHistoryEntry rootCause =
+                ExceptionHistoryEntry.fromGlobalFailure(
+                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+        final ExceptionHistoryEntry otherFailure =
+                new ExceptionHistoryEntry(
+                        new RuntimeException("exception #1"),
+                        System.currentTimeMillis(),
+                        "task name",
+                        
ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation(
+                                new LocalTaskManagerLocation()));
+
+        final ExecutionGraphInfo executionGraphInfo =
+                createExecutionGraphInfo(rootCause, otherFailure);
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(
+                response.getExceptionHistory().getEntries(),
+                contains(
+                        historyContainsGlobalFailure(
+                                rootCause.getException(), 
rootCause.getTimestamp()),
+                        historyContainsJobExceptionInfo(
+                                otherFailure.getException(),
+                                otherFailure.getTimestamp(),
+                                otherFailure.getFailingTaskName(),
+                                JobExceptionsHandler.toString(
+                                        
otherFailure.getTaskManagerLocation()))));
+        assertFalse(response.getExceptionHistory().isTruncated());
+    }
+
+    @Test
+    public void testWithExceptionHistoryWithTruncationThroughParameter()
+            throws HandlerRequestException {
+        final ExceptionHistoryEntry rootCause =
+                ExceptionHistoryEntry.fromGlobalFailure(
+                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+        final ExceptionHistoryEntry otherFailure =
+                new ExceptionHistoryEntry(
+                        new RuntimeException("exception #1"),
+                        System.currentTimeMillis(),
+                        "task name",
+                        
ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation(
+                                new LocalTaskManagerLocation()));
+
+        final ExecutionGraphInfo executionGraphInfo =
+                createExecutionGraphInfo(rootCause, otherFailure);
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 1);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(
+                response.getExceptionHistory().getEntries(),
+                contains(
+                        historyContainsGlobalFailure(
+                                rootCause.getException(), 
rootCause.getTimestamp())));
+        assertThat(response.getExceptionHistory().getEntries(), 
iterableWithSize(1));
+        assertTrue(response.getExceptionHistory().isTruncated());
+    }
+
+    @Test
+    public void testTaskManagerLocationFallbackHandling() {
+        assertThat(JobExceptionsHandler.toString((TaskManagerLocation) null), 
is("(unassigned)"));
+    }
+
+    @Test
+    public void testTaskManagerLocationHandling() {
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+        assertThat(
+                JobExceptionsHandler.toString(taskManagerLocation),
+                is(
+                        String.format(
+                                "%s:%s",
+                                taskManagerLocation.getFQDNHostname(),
+                                taskManagerLocation.dataPort())));
+    }
+
+    @Test
+    public void testArchivedTaskManagerLocationFallbackHandling() {
+        assertThat(
+                JobExceptionsHandler.toString(
+                        (ExceptionHistoryEntry.ArchivedTaskManagerLocation) 
null),
+                is(nullValue()));
+    }
+
+    @Test
+    public void testArchivedTaskManagerLocationHandling() {
+        final ExceptionHistoryEntry.ArchivedTaskManagerLocation 
taskManagerLocation =
+                
ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation(
+                        new LocalTaskManagerLocation());
+        assertThat(
+                JobExceptionsHandler.toString(taskManagerLocation),
+                is(
+                        String.format(
+                                "%s:%s",
+                                taskManagerLocation.getFQDNHostname(),
+                                taskManagerLocation.getPort())));
+    }
+
     @Test
     public void testGetJobExceptionsInfo() throws HandlerRequestException {
-        final JobExceptionsHandler jobExceptionsHandler =
-                new JobExceptionsHandler(
-                        () -> null,
-                        TestingUtils.TIMEOUT(),
-                        Collections.emptyMap(),
-                        JobExceptionsHeaders.getInstance(),
-                        new DefaultExecutionGraphCache(
-                                TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
-                        TestingUtils.defaultExecutor());
         final int numExceptions = 20;
-        final AccessExecutionGraph archivedExecutionGraph =
-                createAccessExecutionGraph(numExceptions);
-        checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, 
numExceptions, 10);
-        checkExceptionLimit(
-                jobExceptionsHandler, archivedExecutionGraph, numExceptions, 
numExceptions);
-        checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, 
numExceptions, 30);
+        final ExecutionGraphInfo archivedExecutionGraph = 
createAccessExecutionGraph(numExceptions);
+        checkExceptionLimit(testInstance, archivedExecutionGraph, 
numExceptions, 10);
+        checkExceptionLimit(testInstance, archivedExecutionGraph, 
numExceptions, numExceptions);
+        checkExceptionLimit(testInstance, archivedExecutionGraph, 
numExceptions, 30);
     }
 
     private static void checkExceptionLimit(
             JobExceptionsHandler jobExceptionsHandler,
-            AccessExecutionGraph graph,
+            ExecutionGraphInfo graph,
             int maxNumExceptions,
             int numExpectedException)
             throws HandlerRequestException {
         final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
handlerRequest =
-                createRequest(graph.getJobID(), numExpectedException);
+                createRequest(graph.getJobId(), numExpectedException);
         final JobExceptionsInfo jobExceptionsInfo =
                 jobExceptionsHandler.handleRequest(handlerRequest, graph);
-        final int numReportedException =
-                maxNumExceptions >= numExpectedException ? 
numExpectedException : maxNumExceptions;
+        final int numReportedException = Math.min(maxNumExceptions, 
numExpectedException);
         assertEquals(jobExceptionsInfo.getAllExceptions().size(), 
numReportedException);
     }
 
-    private static AccessExecutionGraph createAccessExecutionGraph(int 
numTasks) {
+    private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks) 
{
         Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
         for (int i = 0; i < numTasks; i++) {
             final JobVertexID jobVertexId = new JobVertexID();
             tasks.put(jobVertexId, 
createArchivedExecutionJobVertex(jobVertexId));
         }
-        return new ArchivedExecutionGraphBuilder().setTasks(tasks).build();
+
+        final Throwable failureCause = new RuntimeException("root cause");
+        final long failureTimestamp = System.currentTimeMillis();
+        final List<ExceptionHistoryEntry> exceptionHistory =
+                Collections.singletonList(
+                        new ExceptionHistoryEntry(
+                                failureCause,
+                                failureTimestamp,
+                                "test task #1",
+                                
ExceptionHistoryEntry.ArchivedTaskManagerLocation
+                                        .fromTaskManagerLocation(new 
LocalTaskManagerLocation())));
+        return new ExecutionGraphInfo(
+                new ArchivedExecutionGraphBuilder()
+                        .setFailureCause(new ErrorInfo(failureCause, 
failureTimestamp))
+                        .setTasks(tasks)
+                        .build(),
+                exceptionHistory);
     }
 
     private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex(
@@ -141,6 +317,32 @@ public class JobExceptionsHandlerTest extends TestLogger {
                 emptyAccumulators);
     }
 
+    // -------- exception history related utility methods for creating the 
input data --------
+
+    private static ExecutionGraphInfo createExecutionGraphInfo(
+            ExceptionHistoryEntry... historyEntries) {
+        final ArchivedExecutionGraphBuilder executionGraphBuilder =
+                new ArchivedExecutionGraphBuilder();
+        final List<ExceptionHistoryEntry> historyEntryCollection = new 
ArrayList<>();
+
+        for (int i = 0; i < historyEntries.length; i++) {
+            if (i == 0) {
+                // first entry is root cause
+                executionGraphBuilder.setFailureCause(
+                        new ErrorInfo(
+                                historyEntries[i].getException(),
+                                historyEntries[i].getTimestamp()));
+            }
+
+            historyEntryCollection.add(historyEntries[i]);
+        }
+
+        // we have to reverse it to simulate how the Scheduler collects it
+        Collections.reverse(historyEntryCollection);
+
+        return new ExecutionGraphInfo(executionGraphBuilder.build(), 
historyEntryCollection);
+    }
+
     private static HandlerRequest<EmptyRequestBody, 
JobExceptionsMessageParameters> createRequest(
             JobID jobId, int size) throws HandlerRequestException {
         final Map<String, String> pathParameters = new HashMap<>();
@@ -154,4 +356,138 @@ public class JobExceptionsHandlerTest extends TestLogger {
                 pathParameters,
                 queryParameters);
     }
+
+    // -------- factory methods for instantiating new Matchers --------
+
+    private static Matcher<ExceptionInfo> historyContainsJobExceptionInfo(
+            Throwable expectedFailureCause,
+            long expectedFailureTimestamp,
+            String expectedTaskNameWithSubtaskId,
+            String expectedTaskManagerLocation) {
+        return new ExceptionInfoMatcher(
+                expectedFailureCause,
+                expectedFailureTimestamp,
+                expectedTaskNameWithSubtaskId,
+                expectedTaskManagerLocation);
+    }
+
+    private static Matcher<ExceptionInfo> historyContainsGlobalFailure(
+            Throwable expectedFailureCause, long expectedFailureTimestamp) {
+        return historyContainsJobExceptionInfo(
+                expectedFailureCause, expectedFailureTimestamp, null, null);
+    }
+
+    // -------- Matcher implementations used in this test class --------
+
+    /** Checks the given {@link ExceptionInfo} instance. */
+    private static class ExceptionInfoMatcher extends 
TypeSafeDiagnosingMatcher<ExceptionInfo> {
+
+        private final Throwable expectedException;
+        private final long expectedTimestamp;
+        private final String expectedTaskName;
+        private final String expectedLocation;
+
+        public ExceptionInfoMatcher(
+                Throwable expectedException,
+                long expectedTimestamp,
+                String expectedTaskName,
+                String expectedLocation) {
+            this.expectedException = 
deserializeSerializedThrowable(expectedException);
+            this.expectedTimestamp = expectedTimestamp;
+            this.expectedTaskName = expectedTaskName;
+            this.expectedLocation = expectedLocation;
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description
+                    .appendText("exceptionName=")
+                    .appendText(getExpectedExceptionName())
+                    .appendText(", exceptionStacktrace=")
+                    .appendText(getExpectedStacktrace())
+                    .appendText(", timestamp=")
+                    .appendText(String.valueOf(expectedTimestamp))
+                    .appendText(", taskName=")
+                    .appendText(expectedTaskName)
+                    .appendText(", location=")
+                    .appendText(expectedLocation);
+        }
+
+        private String getExpectedExceptionName() {
+            return expectedException.getClass().getName();
+        }
+
+        private String getExpectedStacktrace() {
+            return ExceptionUtils.stringifyException(expectedException);
+        }
+
+        @Override
+        protected boolean matchesSafely(ExceptionInfo info, Description 
description) {
+            return matches(
+                            info,
+                            description,
+                            ExceptionInfo::getExceptionName,
+                            getExpectedExceptionName(),
+                            "exceptionName")
+                    && matches(
+                            info,
+                            description,
+                            ExceptionInfo::getStacktrace,
+                            
ExceptionUtils.stringifyException(expectedException),
+                            "stacktrace")
+                    && matches(
+                            info,
+                            description,
+                            ExceptionInfo::getTimestamp,
+                            expectedTimestamp,
+                            "timestamp")
+                    && matches(
+                            info,
+                            description,
+                            ExceptionInfo::getTaskName,
+                            expectedTaskName,
+                            "taskName")
+                    && matches(
+                            info,
+                            description,
+                            ExceptionInfo::getLocation,
+                            expectedLocation,
+                            "location");
+        }
+
+        private <R> boolean matches(
+                ExceptionInfo info,
+                Description desc,
+                Function<ExceptionInfo, R> extractor,
+                R expectedValue,
+                String attributeName) {
+            final R actualValue = extractor.apply(info);
+            if (actualValue == null) {
+                return expectedValue == null;
+            }
+
+            final boolean match = actualValue.equals(expectedValue);
+            if (!match) {
+                desc.appendText(attributeName)
+                        .appendText("=")
+                        .appendText(String.valueOf(actualValue));
+            }
+
+            return match;
+        }
+
+        /**
+         * Utility method for unwrapping a {@link SerializedThrowable} again.
+         *
+         * @param throwable The {@code Throwable} that might be unwrapped.
+         * @return The unwrapped {@code Throwable} if the {@code throwable} 
was actually a {@code
+         *     SerializedThrowable}; otherwise the {@code throwable} itself.
+         */
+        protected Throwable deserializeSerializedThrowable(Throwable 
throwable) {
+            return throwable instanceof SerializedThrowable
+                    ? ((SerializedThrowable) throwable)
+                            
.deserializeError(ClassLoader.getSystemClassLoader())
+                    : throwable;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
deleted file mode 100644
index d9b0627..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/** Tests that the {@link JobExceptionsInfo} can be marshalled and 
unmarshalled. */
-public class JobExceptionsInfoTest extends 
RestResponseMarshallingTestBase<JobExceptionsInfo> {
-    @Override
-    protected Class<JobExceptionsInfo> getTestResponseClass() {
-        return JobExceptionsInfo.class;
-    }
-
-    @Override
-    protected JobExceptionsInfo getTestResponseInstance() throws Exception {
-        List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList =
-                new ArrayList<>();
-        executionTaskExceptionInfoList.add(
-                new JobExceptionsInfo.ExecutionExceptionInfo(
-                        "exception1", "task1", "location1", 
System.currentTimeMillis()));
-        executionTaskExceptionInfoList.add(
-                new JobExceptionsInfo.ExecutionExceptionInfo(
-                        "exception2", "task2", "location2", 
System.currentTimeMillis()));
-        return new JobExceptionsInfo(
-                "root exception",
-                System.currentTimeMillis(),
-                executionTaskExceptionInfoList,
-                false);
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
similarity index 55%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
index d2b9bfd..3adf942 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
@@ -19,21 +19,22 @@
 package org.apache.flink.runtime.rest.messages;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
- * Tests that the {@link JobExceptionsInfo} with no root exception can be 
marshalled and
+ * Tests that the {@link JobExceptionsInfoWithHistory} with no root exception 
can be marshalled and
  * unmarshalled.
  */
-public class JobExceptionsInfoNoRootTest
-        extends RestResponseMarshallingTestBase<JobExceptionsInfo> {
+public class JobExceptionsInfoWithHistoryNoRootTest
+        extends RestResponseMarshallingTestBase<JobExceptionsInfoWithHistory> {
     @Override
-    protected Class<JobExceptionsInfo> getTestResponseClass() {
-        return JobExceptionsInfo.class;
+    protected Class<JobExceptionsInfoWithHistory> getTestResponseClass() {
+        return JobExceptionsInfoWithHistory.class;
     }
 
     @Override
-    protected JobExceptionsInfo getTestResponseInstance() throws Exception {
+    protected JobExceptionsInfoWithHistory getTestResponseInstance() throws 
Exception {
         List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList =
                 new ArrayList<>();
         executionTaskExceptionInfoList.add(
@@ -42,6 +43,21 @@ public class JobExceptionsInfoNoRootTest
         executionTaskExceptionInfoList.add(
                 new JobExceptionsInfo.ExecutionExceptionInfo(
                         "exception2", "task2", "location2", 
System.currentTimeMillis()));
-        return new JobExceptionsInfo(null, null, 
executionTaskExceptionInfoList, false);
+        return new JobExceptionsInfoWithHistory(
+                null,
+                null,
+                executionTaskExceptionInfoList,
+                false,
+                new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                        Arrays.asList(
+                                new JobExceptionsInfoWithHistory.ExceptionInfo(
+                                        "global failure #0", "stacktrace #0", 
0L),
+                                new JobExceptionsInfoWithHistory.ExceptionInfo(
+                                        "local task failure #1",
+                                        "stacktrace #1",
+                                        1L,
+                                        "task name",
+                                        "location")),
+                        false));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
new file mode 100644
index 0000000..b10bff2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+/** Tests that the {@link JobExceptionsInfoWithHistory} can be marshalled and 
unmarshalled. */
+public class JobExceptionsInfoWithHistoryTest
+        extends RestResponseMarshallingTestBase<JobExceptionsInfoWithHistory> {
+    @Override
+    protected Class<JobExceptionsInfoWithHistory> getTestResponseClass() {
+        return JobExceptionsInfoWithHistory.class;
+    }
+
+    @Override
+    protected JobExceptionsInfoWithHistory getTestResponseInstance() throws 
Exception {
+        List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList =
+                new ArrayList<>();
+        executionTaskExceptionInfoList.add(
+                new JobExceptionsInfo.ExecutionExceptionInfo(
+                        "exception1", "task1", "location1", 
System.currentTimeMillis()));
+        executionTaskExceptionInfoList.add(
+                new JobExceptionsInfo.ExecutionExceptionInfo(
+                        "exception2", "task2", "location2", 
System.currentTimeMillis()));
+        return new JobExceptionsInfoWithHistory(
+                "root exception",
+                System.currentTimeMillis(),
+                executionTaskExceptionInfoList,
+                false,
+                new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                        Collections.emptyList(), false));
+    }
+
+    /**
+     * {@code taskName} and {@code location} should not be exposed if not set.
+     *
+     * @throws JsonProcessingException is not expected to be thrown
+     */
+    @Test
+    public void testNullFieldsNotSet() throws JsonProcessingException {
+        ObjectMapper objMapper = RestMapperUtils.getStrictObjectMapper();
+        String json =
+                objMapper.writeValueAsString(
+                        new JobExceptionsInfoWithHistory.ExceptionInfo(
+                                "exception name", "stacktrace", 0L));
+
+        assertThat(json, not(CoreMatchers.containsString("taskName")));
+        assertThat(json, not(CoreMatchers.containsString("location")));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index cd2c06f..e1015e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -48,13 +49,16 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.ExecutorUtils;
@@ -65,6 +69,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
 
+import org.hamcrest.collection.IsIterableWithSize;
 import org.hamcrest.core.Is;
 import org.junit.After;
 import org.junit.Before;
@@ -1052,36 +1057,51 @@ public class DefaultSchedulerTest extends TestLogger {
         taskRestartExecutor.triggerScheduledTasks();
         final long end = System.currentTimeMillis();
 
-        final List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
+        final Iterable<ExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.getExceptionHistory();
 
-        assertThat(actualExceptionHistory, hasSize(1));
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
 
-        final ErrorInfo failure = actualExceptionHistory.get(0);
+        final ExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
         assertThat(
                 
failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
                 is(expectedException));
         assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
         assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+        assertThat(failure.getTaskManagerLocation(), is(nullValue()));
+        assertThat(failure.getFailingTaskName(), is(nullValue()));
     }
 
     @Test
     public void testExceptionHistoryWithRestartableFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+        final TestingLogicalSlotBuilder logicalSlotBuilder = new 
TestingLogicalSlotBuilder();
+        logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation);
+        final ExceptionHistoryEntry.ArchivedTaskManagerLocation
+                expectedArchivedTaskManagerLocation =
+                        
ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation(
+                                taskManagerLocation);
+
+        executionSlotAllocatorFactory = new 
TestExecutionSlotAllocatorFactory(logicalSlotBuilder);
+
         final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
 
         // initiate restartable failure
-        final ExecutionAttemptID restartableAttemptId =
+        final ArchivedExecutionVertex taskFailureExecutionVertex =
                 Iterables.getOnlyElement(
-                                scheduler
-                                        .requestJob()
-                                        .getArchivedExecutionGraph()
-                                        .getAllExecutionVertices())
-                        .getCurrentExecutionAttempt()
-                        .getAttemptId();
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
+        final String expectedTaskName = 
taskFailureExecutionVertex.getTaskNameWithSubtaskIndex();
         final RuntimeException restartableException = new 
RuntimeException("restartable exception");
         Range<Long> updateStateTriggeringRestartTimeframe =
-                initiateFailure(scheduler, restartableAttemptId, 
restartableException);
+                initiateFailure(
+                        scheduler,
+                        
taskFailureExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                        restartableException);
 
         taskRestartExecutor.triggerNonPeriodicScheduledTask();
 
@@ -1097,14 +1117,17 @@ public class DefaultSchedulerTest extends TestLogger {
                         .getCurrentExecutionAttempt()
                         .getAttemptId();
         final RuntimeException failingException = new 
RuntimeException("failing exception");
-        Range<Long> updateStateTriggeringJobFailureTimeframe =
+        final Range<Long> updateStateTriggeringJobFailureTimeframe =
                 initiateFailure(scheduler, failingAttemptId, failingException);
 
-        List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
-        assertThat(actualExceptionHistory.size(), is(2));
+        final Iterable<ExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(2));
+        final Iterator<ExceptionHistoryEntry> exceptionHistoryIterator =
+                actualExceptionHistory.iterator();
 
         // assert restarted attempt
-        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        final ExceptionHistoryEntry restartableFailure = 
exceptionHistoryIterator.next();
         assertThat(
                 restartableFailure
                         .getException()
@@ -1116,10 +1139,15 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(
                 restartableFailure.getTimestamp(),
                 
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+        assertThat(restartableFailure.getFailingTaskName(), 
is(expectedTaskName));
+        assertThat(
+                restartableFailure.getTaskManagerLocation(),
+                ExceptionHistoryEntryTest.isArchivedTaskManagerLocation(
+                        expectedArchivedTaskManagerLocation));
 
         // assert job failure attempt
-        ErrorInfo globalFailure = actualExceptionHistory.get(1);
-        Throwable actualException =
+        final ExceptionHistoryEntry globalFailure = 
exceptionHistoryIterator.next();
+        final Throwable actualException =
                 
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
         assertThat(actualException, instanceOf(JobException.class));
         assertThat(actualException, 
FlinkMatchers.containsCause(failingException));
@@ -1129,6 +1157,53 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(
                 globalFailure.getTimestamp(),
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+        assertThat(globalFailure.getFailingTaskName(), is(nullValue()));
+        assertThat(globalFailure.getTaskManagerLocation(), is(nullValue()));
+    }
+
+    @Test
+    public void testExceptionHistoryTruncation() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+        configuration.set(JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE, 1);
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID attemptId0 =
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        attemptId0, ExecutionState.FAILED, new 
RuntimeException("old exception")));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        final ExecutionAttemptID attemptId1 =
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException exception = new RuntimeException("relevant 
exception");
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(attemptId1, ExecutionState.FAILED, 
exception));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        final Iterator<ExceptionHistoryEntry> entryIterator =
+                scheduler.getExceptionHistory().iterator();
+        assertTrue(entryIterator.hasNext());
+        assertThat(
+                entryIterator
+                        .next()
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(exception));
+        assertFalse(entryIterator.hasNext());
     }
 
     private static TaskExecutionState createFailedTaskExecutionState(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java
new file mode 100644
index 0000000..141c5c9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.scheduler.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** {@code ExceptionHistoryEntryTest} tests the instantiation of {@link 
ExceptionHistoryEntry}. */
+public class ExceptionHistoryEntryTest extends TestLogger {
+
+    @Test
+    public void testFromGlobalFailure() {
+        final Throwable failureCause = new RuntimeException("failure cause");
+        final long timestamp = System.currentTimeMillis();
+
+        final ExceptionHistoryEntry testInstance =
+                ExceptionHistoryEntry.fromGlobalFailure(failureCause, 
timestamp);
+
+        assertThat(
+                
testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                is(failureCause));
+        assertThat(testInstance.getTimestamp(), is(timestamp));
+        assertThat(testInstance.getFailingTaskName(), is(nullValue()));
+        assertThat(testInstance.getTaskManagerLocation(), is(nullValue()));
+    }
+
+    @Test
+    public void testFromFailedExecution() {
+        final Throwable failureCause = new RuntimeException("Expected 
failure");
+        final long failureTimestamp = System.currentTimeMillis();
+        final String taskNameWithSubTaskIndex = "task name";
+        final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+
+        final AccessExecution failedExecution =
+                new TestingExecution(
+                        new ErrorInfo(failureCause, failureTimestamp), 
taskManagerLocation);
+        final ExceptionHistoryEntry testInstance =
+                ExceptionHistoryEntry.fromFailedExecution(
+                        failedExecution, taskNameWithSubTaskIndex);
+
+        assertThat(
+                
testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                is(failureCause));
+        assertThat(testInstance.getTimestamp(), is(failureTimestamp));
+        assertThat(testInstance.getFailingTaskName(), 
is(taskNameWithSubTaskIndex));
+        assertThat(
+                testInstance.getTaskManagerLocation(),
+                
isArchivedTaskManagerLocation(fromTaskManagerLocation(taskManagerLocation)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromFailedExecutionWithoutFailure() {
+        final AccessExecution executionWithoutFailure =
+                new TestingExecution(null, new LocalTaskManagerLocation());
+        ExceptionHistoryEntry.fromFailedExecution(executionWithoutFailure, 
"task name");
+    }
+
+    /**
+     * {@code TestingExecution} mocks {@link AccessExecution} to provide the 
relevant methods for
+     * testing {@link 
ExceptionHistoryEntry#fromFailedExecution(AccessExecution, String)}.
+     */
+    private static class TestingExecution implements AccessExecution {
+
+        private final ErrorInfo failureInfo;
+        private final TaskManagerLocation taskManagerLocation;
+
+        private TestingExecution(
+                @Nullable ErrorInfo failureInfo,
+                @Nullable TaskManagerLocation taskManagerLocation) {
+            this.failureInfo = failureInfo;
+            this.taskManagerLocation = taskManagerLocation;
+        }
+
+        @Override
+        public Optional<ErrorInfo> getFailureInfo() {
+            return Optional.ofNullable(failureInfo);
+        }
+
+        @Override
+        public TaskManagerLocation getAssignedResourceLocation() {
+            return taskManagerLocation;
+        }
+
+        @Override
+        public long getStateTimestamp(ExecutionState state) {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() 
{
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public int getParallelSubtaskIndex() {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public IOMetrics getIOMetrics() {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public ExecutionAttemptID getAttemptId() {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public int getAttemptNumber() {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public long[] getStateTimestamps() {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+
+        @Override
+        public ExecutionState getState() {
+            throw new UnsupportedOperationException("Method should not be 
triggered.");
+        }
+    }
+
+    public static Matcher<ExceptionHistoryEntry.ArchivedTaskManagerLocation>
+            isArchivedTaskManagerLocation(
+                    ExceptionHistoryEntry.ArchivedTaskManagerLocation 
actualLocation) {
+        return new ArchivedTaskManagerLocationMatcher(actualLocation);
+    }
+
+    private static class ArchivedTaskManagerLocationMatcher
+            extends 
TypeSafeDiagnosingMatcher<ExceptionHistoryEntry.ArchivedTaskManagerLocation> {
+
+        private final ExceptionHistoryEntry.ArchivedTaskManagerLocation 
expectedLocation;
+
+        public ArchivedTaskManagerLocationMatcher(
+                ExceptionHistoryEntry.ArchivedTaskManagerLocation 
expectedLocation) {
+            this.expectedLocation = expectedLocation;
+        }
+
+        @Override
+        protected boolean matchesSafely(
+                ExceptionHistoryEntry.ArchivedTaskManagerLocation actual, 
Description description) {
+            if (actual == null) {
+                return expectedLocation == null;
+            }
+
+            boolean match = true;
+            if (!Objects.equals(actual.getAddress(), 
expectedLocation.getAddress())) {
+                description.appendText(" 
address=").appendText(actual.getAddress());
+                match = false;
+            }
+
+            if (!Objects.equals(actual.getFQDNHostname(), 
expectedLocation.getFQDNHostname())) {
+                description.appendText(" 
FQDNHostname=").appendText(actual.getFQDNHostname());
+                match = false;
+            }
+
+            if (!Objects.equals(actual.getHostname(), 
expectedLocation.getHostname())) {
+                description.appendText(" 
hostname=").appendText(actual.getHostname());
+                match = false;
+            }
+
+            if (!Objects.equals(actual.getResourceID(), 
expectedLocation.getResourceID())) {
+                description
+                        .appendText(" resourceID=")
+                        .appendText(actual.getResourceID().toString());
+                match = false;
+            }
+
+            if (!Objects.equals(actual.getPort(), expectedLocation.getPort())) 
{
+                description.appendText(" 
port=").appendText(String.valueOf(actual.getPort()));
+                match = false;
+            }
+
+            return match;
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description.appendText(String.valueOf(expectedLocation));
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index 5c938c3..59fea52 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -42,16 +42,22 @@ public class TestExecutionSlotAllocator implements 
ExecutionSlotAllocator, SlotO
     private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
pendingRequests =
             new HashMap<>();
 
-    private final TestingLogicalSlotBuilder logicalSlotBuilder = new 
TestingLogicalSlotBuilder();
+    private final TestingLogicalSlotBuilder logicalSlotBuilder;
 
     private boolean autoCompletePendingRequests = true;
 
     private final List<LogicalSlot> returnedSlots = new ArrayList<>();
 
-    public TestExecutionSlotAllocator() {}
+    public TestExecutionSlotAllocator() {
+        this(new TestingLogicalSlotBuilder());
+    }
 
     public TestExecutionSlotAllocator(TaskManagerGateway taskManagerGateway) {
-        logicalSlotBuilder.setTaskManagerGateway(taskManagerGateway);
+        this(new 
TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway));
+    }
+
+    public TestExecutionSlotAllocator(TestingLogicalSlotBuilder 
logicalSlotBuilder) {
+        this.logicalSlotBuilder = logicalSlotBuilder;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
index 59f3ab2..9d71af5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 
 /** Factory for {@link TestExecutionSlotAllocatorFactory}. */
 public class TestExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocatorFactory {
@@ -34,6 +35,10 @@ public class TestExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocator
         this.testExecutionSlotAllocator = new 
TestExecutionSlotAllocator(gateway);
     }
 
+    public TestExecutionSlotAllocatorFactory(TestingLogicalSlotBuilder 
logicalSlotBuilder) {
+        this.testExecutionSlotAllocator = new 
TestExecutionSlotAllocator(logicalSlotBuilder);
+    }
+
     @Override
     public ExecutionSlotAllocator createInstance(final 
ExecutionSlotAllocationContext context) {
         return testExecutionSlotAllocator;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
new file mode 100644
index 0000000..818b7e1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/** {@code BoundedFIFOQueueTest} tests {@link BoundedFIFOQueue}. */
+public class BoundedFIFOQueueTest extends TestLogger {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorFailing() {
+        new BoundedFIFOQueue<>(-1);
+    }
+
+    @Test
+    public void testQueueWithMaxSize0() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance, iterableWithSize(0));
+        testInstance.add(1);
+        assertThat(testInstance, iterableWithSize(0));
+    }
+
+    @Test
+    public void testQueueWithMaxSize2() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
+        assertThat(testInstance, iterableWithSize(0));
+
+        testInstance.add(1);
+        assertThat(testInstance, contains(1));
+
+        testInstance.add(2);
+        assertThat(testInstance, contains(1, 2));
+
+        testInstance.add(3);
+        assertThat(testInstance, contains(2, 3));
+    }
+
+    @Test
+    public void testAddNullHandling() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(1);
+        try {
+            testInstance.add(null);
+            fail("A NullPointerException is expected to be thrown.");
+        } catch (NullPointerException e) {
+            // NullPointerException is expected
+        }
+
+        assertThat(testInstance, iterableWithSize(0));
+    }
+
+    /**
+     * Tests that {@link BoundedFIFOQueue#size()} returns the number of 
elements currently stored in
+     * the queue with a {@code maxSize} of 0.
+     */
+    @Test
+    public void testSizeWithMaxSize0() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance.size(), is(0));
+
+        testInstance.add(1);
+        assertThat(testInstance.size(), is(0));
+    }
+
+    /**
+     * Tests that {@link BoundedFIFOQueue#size()} returns the number of 
elements currently stored in
+     * the queue with a {@code maxSize} of 2.
+     */
+    @Test
+    public void testSizeWithMaxSize2() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
+        assertThat(testInstance.size(), is(0));
+
+        testInstance.add(5);
+        assertThat(testInstance.size(), is(1));
+
+        testInstance.add(6);
+        assertThat(testInstance.size(), is(2));
+
+        // adding a 3rd element won't increase the size anymore
+        testInstance.add(7);
+        assertThat(testInstance.size(), is(2));
+    }
+}

Reply via email to