This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e56d433218dc0e3e6a6bdc4834b680a6b2fa56bf
Author: Yi Zhang <[email protected]>
AuthorDate: Sat Feb 21 11:29:35 2026 +0800

    [FLINK-38977][runtime] Expose exceptions for applications
---
 .../application/PackagedProgramApplication.java    |  38 +++-
 .../PackagedProgramApplicationTest.java            | 122 ++++++----
 .../webmonitor/history/HistoryServerTest.java      |   3 +-
 .../src/test/resources/rest_api_v1.snapshot        |  52 ++++-
 .../runtime/application/AbstractApplication.java   |  13 ++
 .../ApplicationExceptionHistoryEntry.java          |  52 +++++
 .../runtime/application/ArchivedApplication.java   |  19 +-
 .../flink/runtime/dispatcher/Dispatcher.java       |  27 ++-
 .../application/ApplicationExceptionsHandler.java  |  93 ++++++++
 .../ApplicationExceptionsInfoWithHistory.java      | 246 +++++++++++++++++++++
 .../application/ApplicationExceptionsHeaders.java  |  83 +++++++
 .../ApplicationExceptionsMessageParameters.java    |  40 ++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 ++
 .../application/AbstractApplicationTest.java       |  34 +++
 .../ArchivedApplicationStoreTestUtils.java         |   3 +-
 .../application/ApplicationDetailsHandlerTest.java |   3 +-
 .../ApplicationExceptionsHandlerTest.java          | 181 +++++++++++++++
 .../ApplicationsOverviewHandlerTest.java           |   3 +-
 .../ApplicationExceptionsInfoWithHistoryTest.java  |  73 ++++++
 19 files changed, 1030 insertions(+), 68 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
index a06d1a13854..7352ec85a1e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -221,13 +221,21 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                                                     dispatcherGateway, 
ApplicationStatus.SUCCEEDED);
                                         }
 
-                                        final Optional<JobStatus> 
maybeJobStatus =
-                                                extractJobStatus(t);
-                                        if (maybeJobStatus.isPresent()) {
+                                        final 
Optional<UnsuccessfulExecutionException>
+                                                maybeJobFailure = 
extractJobFailure(t);
+                                        // An UnsuccessfulExecutionException 
indicates the job
+                                        // terminated in CANCELED or FAILED 
state, since we already
+                                        // waited for a globally terminal state
+                                        // (FINISHED/CANCELED/FAILED) and 
FINISHED jobs do not throw
+                                        // this exception
+                                        if (maybeJobFailure.isPresent()) {
                                             // the exception is caused by job 
execution results
+                                            UnsuccessfulExecutionException 
jobFailure =
+                                                    maybeJobFailure.get();
+                                            JobStatus jobStatus =
+                                                    
jobFailure.getStatus().orElseThrow();
                                             ApplicationState applicationState =
-                                                    
ApplicationState.fromJobStatus(
-                                                            
maybeJobStatus.get());
+                                                    
ApplicationState.fromJobStatus(jobStatus);
                                             LOG.info("Application {}: ", 
applicationState, t);
                                             if (applicationState == 
ApplicationState.CANCELED) {
                                                 transitionToCanceling();
@@ -238,6 +246,9 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                                                         errorHandler);
 
                                             } else {
+                                                addExceptionHistoryEntry(
+                                                        jobFailure.getCause(),
+                                                        jobFailure.getJobID());
                                                 transitionToFailing();
                                                 return finishAsFailed(
                                                         dispatcherGateway,
@@ -351,7 +362,12 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                     dispatcherGateway, scheduledExecutor, mainThreadExecutor, 
errorHandler);
         }
 
-        LOG.warn("Application failed unexpectedly: ", t);
+        final Optional<ApplicationExecutionException> maybeApplicationFailure =
+                extractApplicationFailure(t);
+        final Throwable cause =
+                maybeApplicationFailure.isPresent() ? 
maybeApplicationFailure.get() : t;
+        LOG.warn("Application failed unexpectedly: ", cause);
+        addExceptionHistoryEntry(cause, null);
         transitionToFailing();
         return finishAsFailed(
                 dispatcherGateway, scheduledExecutor, mainThreadExecutor, 
errorHandler);
@@ -526,10 +542,12 @@ public class PackagedProgramApplication extends 
AbstractApplication {
                 : CompletableFuture.completedFuture(Acknowledge.get());
     }
 
-    private Optional<JobStatus> extractJobStatus(Throwable t) {
-        final Optional<UnsuccessfulExecutionException> maybeException =
-                ExceptionUtils.findThrowable(t, 
UnsuccessfulExecutionException.class);
-        return 
maybeException.flatMap(UnsuccessfulExecutionException::getStatus);
+    private Optional<UnsuccessfulExecutionException> 
extractJobFailure(Throwable t) {
+        return ExceptionUtils.findThrowable(t, 
UnsuccessfulExecutionException.class);
+    }
+
+    private Optional<ApplicationExecutionException> 
extractApplicationFailure(Throwable t) {
+        return ExceptionUtils.findThrowable(t, 
ApplicationExecutionException.class);
     }
 
     /**
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
index 2ae9882b475..e6e26eacdb6 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java
@@ -55,8 +55,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
-import javax.annotation.Nullable;
-
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -487,6 +485,9 @@ public class PackagedProgramApplicationTest {
         application.cancel();
 
         assertApplicationCanceled(application);
+
+        // verify exception history is empty when application is canceled
+        assertThat(application.getExceptionHistory()).isEmpty();
     }
 
     @Test
@@ -580,6 +581,9 @@ public class PackagedProgramApplicationTest {
         assertThat(applicationExecutionFuture.isDone()).isTrue();
         
assertThat(canceledJobIds).containsExactlyInAnyOrderElementsOf(submittedJobIds);
         assertApplicationCanceled(application);
+
+        // verify exception history is empty when application is canceled
+        assertThat(application.getExceptionHistory()).isEmpty();
     }
 
     @Test
@@ -831,44 +835,6 @@ public class PackagedProgramApplicationTest {
                 .isEqualTo(ApplicationStatus.CANCELED);
     }
 
-    @Test
-    void testErrorHandlerIsNotCalledWhenApplicationStatusIsUnknown() throws 
Exception {
-        final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
-        final TestingDispatcherGateway.Builder dispatcherBuilder =
-                canceledJobGatewayBuilder()
-                        .setRequestJobResultFunction(
-                                jobID ->
-                                        CompletableFuture.completedFuture(
-                                                createUnknownJobResult(jobID)))
-                        .setClusterShutdownFunction(
-                                status -> {
-                                    shutdownCalled.set(true);
-                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                });
-
-        final TestingDispatcherGateway dispatcherGateway = 
dispatcherBuilder.build();
-        final CompletableFuture<Void> errorHandlerFuture = new 
CompletableFuture<>();
-        final PackagedProgramApplication application =
-                createAndExecuteApplication(
-                        3,
-                        dispatcherGateway,
-                        scheduledExecutor,
-                        errorHandlerFuture::completeExceptionally);
-
-        // check that application completes exceptionally
-        assertException(
-                application.getApplicationCompletionFuture(), 
UnsuccessfulExecutionException.class);
-
-        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
-        assertApplicationFailed(application);
-
-        // we do not call the error handler
-        assertThat(errorHandlerFuture.isDone()).isFalse();
-
-        // verify that we shut down the cluster
-        assertThat(shutdownCalled.get()).isTrue();
-    }
-
     @Test
     void testErrorHandlerIsNotCalled() throws Exception {
         final AtomicBoolean shutdownCalled = new AtomicBoolean(false);
@@ -1122,6 +1088,70 @@ public class PackagedProgramApplicationTest {
         assertApplicationFailed(application);
     }
 
+    @Test
+    void testExceptionHistoryWhenJobFails() throws Exception {
+        final ConcurrentLinkedDeque<JobID> submittedJobIds = new 
ConcurrentLinkedDeque<>();
+
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                failedJobGatewayBuilder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    submittedJobIds.add(jobGraph.getJobID());
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                });
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(2, dispatcherBuilder.build());
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+
+        // verify exception history contains the job failure
+        assertThat(application.getExceptionHistory()).hasSize(1);
+        
assertThat(application.getExceptionHistory().get(0).getJobId().isPresent()).isTrue();
+        assertThat(application.getExceptionHistory().get(0).getJobId().get())
+                .isEqualTo(submittedJobIds.peek());
+        
assertThat(application.getExceptionHistory().get(0).getExceptionAsString())
+                .contains("Job execution failed");
+    }
+
+    @Test
+    void testExceptionHistoryWhenApplicationFails() throws Exception {
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                TestingDispatcherGateway.newBuilder()
+                        .setSubmitFunction(
+                                jobGraph ->
+                                        FutureUtils.completedExceptionally(
+                                                new FlinkRuntimeException(
+                                                        "Application execution 
failed")));
+
+        final PackagedProgramApplication application =
+                createAndExecuteApplication(1, dispatcherBuilder.build());
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationFailed(application);
+
+        // verify exception history contains the application failure (without 
jobId)
+        assertThat(application.getExceptionHistory()).hasSize(1);
+        
assertThat(application.getExceptionHistory().get(0).getJobId().isPresent()).isFalse();
+        
assertThat(application.getExceptionHistory().get(0).getExceptionAsString())
+                .contains("Application execution failed");
+    }
+
+    @Test
+    void testExceptionHistoryEmptyWhenJobIsCanceled() throws Exception {
+        final TestingDispatcherGateway.Builder dispatcherBuilder = 
canceledJobGatewayBuilder();
+
+        PackagedProgramApplication application =
+                createAndExecuteApplication(3, dispatcherBuilder.build());
+
+        application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertApplicationCanceled(application);
+
+        // verify exception history is empty when job is canceled
+        assertThat(application.getExceptionHistory()).isEmpty();
+    }
+
     private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() {
         return dispatcherGatewayBuilder(JobStatus.FINISHED);
     }
@@ -1268,20 +1298,16 @@ public class PackagedProgramApplicationTest {
         return createJobResult(jobId, JobStatus.CANCELED);
     }
 
-    private static JobResult createUnknownJobResult(final JobID jobId) {
-        return createJobResult(jobId, null);
-    }
-
-    private static JobResult createJobResult(
-            final JobID jobID, @Nullable final JobStatus jobStatus) {
+    private static JobResult createJobResult(final JobID jobID, final 
JobStatus jobStatus) {
         JobResult.Builder builder =
                 new 
JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus);
         if (jobStatus == JobStatus.CANCELED) {
             builder.serializedThrowable(
                     new SerializedThrowable(new 
JobCancellationException(jobID, "Hello", null)));
-        } else if (jobStatus == JobStatus.FAILED || jobStatus == null) {
+        } else if (jobStatus == JobStatus.FAILED) {
             builder.serializedThrowable(
-                    new SerializedThrowable(new JobExecutionException(jobID, 
"bla bla bla")));
+                    new SerializedThrowable(
+                            new JobExecutionException(jobID, "Job execution 
failed")));
         }
         return builder.build();
     }
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 91c213e6d97..e5045af1468 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -944,7 +944,8 @@ class HistoryServerTest {
                 "test-application",
                 ApplicationState.FINISHED,
                 new long[ApplicationState.values().length],
-                jobs);
+                jobs,
+                Collections.emptyList());
     }
 
     private ExecutionGraphInfo createExecutionGraphInfo() {
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 7cebf7c7f9e..6a84ac6d888 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -201,6 +201,56 @@
         }
       }
     }
+  }, {
+    "url" : "/applications/:applicationid/exceptions",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "applicationid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ApplicationExceptionsInfoWithHistory",
+      "properties" : {
+        "exceptionHistory" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ApplicationExceptionsInfoWithHistory:ApplicationExceptionHistory",
+          "properties" : {
+            "entries" : {
+              "type" : "array",
+              "items" : {
+                "type" : "object",
+                "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ApplicationExceptionsInfoWithHistory:ExceptionInfo",
+                "properties" : {
+                  "exceptionName" : {
+                    "type" : "string"
+                  },
+                  "stacktrace" : {
+                    "type" : "string"
+                  },
+                  "timestamp" : {
+                    "type" : "integer"
+                  },
+                  "jobId" : {
+                    "type" : "any"
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
   }, {
     "url" : "/cluster",
     "method" : "DELETE",
@@ -4732,4 +4782,4 @@
       }
     }
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
index aef3f17b893..d910c2c8a2c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -65,6 +67,8 @@ public abstract class AbstractApplication implements 
Serializable {
 
     private final Set<JobID> jobs = new HashSet<>();
 
+    private final List<ApplicationExceptionHistoryEntry> exceptionHistory = 
new ArrayList<>();
+
     /**
      * List of registered application status listeners that will be notified 
via {@link
      * ApplicationStatusListener#notifyApplicationStatusChange} when the 
application state changes.
@@ -127,6 +131,15 @@ public abstract class AbstractApplication implements 
Serializable {
         return Collections.unmodifiableSet(jobs);
     }
 
+    public List<ApplicationExceptionHistoryEntry> getExceptionHistory() {
+        return Collections.unmodifiableList(exceptionHistory);
+    }
+
+    public void addExceptionHistoryEntry(Throwable throwable, @Nullable JobID 
jobId) {
+        exceptionHistory.add(
+                new ApplicationExceptionHistoryEntry(throwable, 
System.currentTimeMillis(), jobId));
+    }
+
     /**
      * Adds a job ID to the jobs set.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationExceptionHistoryEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationExceptionHistoryEntry.java
new file mode 100644
index 00000000000..5a34b9ed5be
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationExceptionHistoryEntry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.application;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/**
+ * {@code ExceptionHistoryEntry} collects information about a single failure 
that triggered the
+ * application's failure.
+ */
+public class ApplicationExceptionHistoryEntry extends ErrorInfo {
+
+    private static final long serialVersionUID = -3855285510064263702L;
+
+    /**
+     * The ID of the job that caused the failure.
+     *
+     * <p>This field is null if the failure was not caused by a job.
+     */
+    @Nullable private final JobID jobId;
+
+    public ApplicationExceptionHistoryEntry(
+            Throwable cause, long timestamp, @Nullable JobID jobId) {
+        super(cause, timestamp);
+        this.jobId = jobId;
+    }
+
+    public Optional<JobID> getJobId() {
+        return Optional.ofNullable(jobId);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
index 771366bc22a..bde97fe721f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 
 /** Read-only information about an {@link AbstractApplication}. */
@@ -42,17 +43,21 @@ public class ArchivedApplication implements Serializable {
 
     private final Map<JobID, ExecutionGraphInfo> jobs;
 
+    private final Collection<ApplicationExceptionHistoryEntry> 
exceptionHistory;
+
     public ArchivedApplication(
             ApplicationID applicationId,
             String applicationName,
             ApplicationState applicationState,
             long[] statusTimestamps,
-            Map<JobID, ExecutionGraphInfo> jobs) {
+            Map<JobID, ExecutionGraphInfo> jobs,
+            Collection<ApplicationExceptionHistoryEntry> exceptionHistory) {
         this.applicationId = applicationId;
         this.applicationName = applicationName;
         this.applicationState = applicationState;
         this.statusTimestamps = statusTimestamps;
         this.jobs = jobs;
+        this.exceptionHistory = exceptionHistory;
     }
 
     public ApplicationID getApplicationId() {
@@ -75,6 +80,10 @@ public class ArchivedApplication implements Serializable {
         return jobs;
     }
 
+    public Collection<ApplicationExceptionHistoryEntry> getExceptionHistory() {
+        return exceptionHistory;
+    }
+
     @Override
     public String toString() {
         return "ArchivedApplication{"
@@ -89,6 +98,8 @@ public class ArchivedApplication implements Serializable {
                 + Arrays.toString(statusTimestamps)
                 + ", jobs="
                 + jobs
+                + ", exceptionHistory="
+                + exceptionHistory
                 + '}';
     }
 
@@ -105,7 +116,8 @@ public class ArchivedApplication implements Serializable {
                 && applicationName.equals(that.applicationName)
                 && applicationState == that.applicationState
                 && Arrays.equals(statusTimestamps, that.statusTimestamps)
-                && jobs.equals(that.jobs);
+                && jobs.equals(that.jobs)
+                && exceptionHistory.equals(that.exceptionHistory);
     }
 
     @Override
@@ -114,6 +126,7 @@ public class ArchivedApplication implements Serializable {
                 + 31 * applicationName.hashCode()
                 + 31 * applicationState.hashCode()
                 + 31 * Arrays.hashCode(statusTimestamps)
-                + 31 * jobs.hashCode();
+                + 31 * jobs.hashCode()
+                + 31 * exceptionHistory.hashCode();
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4d79622e6b7..22a3ee2c444 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -59,6 +59,7 @@ import 
org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -723,13 +724,34 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                                     
executionGraphInfo.getJobId());
                                         }
 
+                                        // record job exception for 
SingleJobApplication
+                                        if (application instanceof 
SingleJobApplication) {
+                                            jobs.values()
+                                                    .forEach(
+                                                            executionGraphInfo 
-> {
+                                                                ErrorInfo 
errorInfo =
+                                                                        
executionGraphInfo
+                                                                               
 .getArchivedExecutionGraph()
+                                                                               
 .getFailureInfo();
+                                                                if (errorInfo 
!= null) {
+                                                                    application
+                                                                            
.addExceptionHistoryEntry(
+                                                                               
     errorInfo
+                                                                               
             .getException(),
+                                                                               
     executionGraphInfo
+                                                                               
             .getJobId());
+                                                                }
+                                                            });
+                                        }
+
                                         ArchivedApplication 
archivedApplication =
                                                 new ArchivedApplication(
                                                         
application.getApplicationId(),
                                                         application.getName(),
                                                         
application.getApplicationStatus(),
                                                         stateTimestamps,
-                                                        jobs);
+                                                        jobs,
+                                                        
application.getExceptionHistory());
 
                                         applications.remove(applicationId);
                                         
writeToArchivedApplicationStore(archivedApplication);
@@ -1262,7 +1284,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                                                                         
ExecutionGraphInfo
                                                                                
 ::getJobId,
                                                                         
executionGraphInfo ->
-                                                                               
 executionGraphInfo)))));
+                                                                               
 executionGraphInfo)),
+                                                
application.getExceptionHistory())));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandler.java
new file mode 100644
index 00000000000..6dc04cd6d88
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import 
org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler returning the exception history for the specified application. */
+public class ApplicationExceptionsHandler
+        extends AbstractRestHandler<
+                RestfulGateway,
+                EmptyRequestBody,
+                ApplicationExceptionsInfoWithHistory,
+                ApplicationMessageParameters>
+        implements ApplicationJsonArchivist {
+
+    public ApplicationExceptionsHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Duration timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<
+                            EmptyRequestBody,
+                            ApplicationExceptionsInfoWithHistory,
+                            ApplicationMessageParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+    }
+
+    @Override
+    public CompletableFuture<ApplicationExceptionsInfoWithHistory> 
handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
RestfulGateway gateway) {
+        ApplicationID applicationId = 
request.getPathParameter(ApplicationIDPathParameter.class);
+
+        return gateway.requestApplication(applicationId, timeout)
+                .thenApply(
+                        archivedApplication ->
+                                ApplicationExceptionsInfoWithHistory
+                                        .fromApplicationExceptionHistory(
+                                                
archivedApplication.getExceptionHistory()));
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveApplicationWithPath(
+            ArchivedApplication archivedApplication) throws IOException {
+        String path =
+                getMessageHeaders()
+                        .getTargetRestEndpointURL()
+                        .replace(
+                                ':' + ApplicationIDPathParameter.KEY,
+                                
archivedApplication.getApplicationId().toHexString());
+        return Collections.singleton(
+                new ArchivedJson(
+                        path,
+                        
ApplicationExceptionsInfoWithHistory.fromApplicationExceptionHistory(
+                                archivedApplication.getExceptionHistory())));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistory.java
new file mode 100644
index 00000000000..15be60cf38c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistory.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.application.ApplicationExceptionHistoryEntry;
+import 
org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code ApplicationExceptionsInfoWithHistory} providing a history of 
previously caused failures.
+ * It's the response type of the {@link ApplicationExceptionsHandler}.
+ */
+public class ApplicationExceptionsInfoWithHistory implements ResponseBody {
+
+    public static final String FIELD_NAME_EXCEPTION_HISTORY = 
"exceptionHistory";
+
+    @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+    private final ApplicationExceptionHistory exceptionHistory;
+
+    @JsonCreator
+    public ApplicationExceptionsInfoWithHistory(
+            @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+                    ApplicationExceptionHistory exceptionHistory) {
+        this.exceptionHistory = exceptionHistory;
+    }
+
+    @JsonIgnore
+    public ApplicationExceptionHistory getExceptionHistory() {
+        return exceptionHistory;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ApplicationExceptionsInfoWithHistory that = 
(ApplicationExceptionsInfoWithHistory) o;
+        return Objects.equals(exceptionHistory, that.exceptionHistory);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(exceptionHistory);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(
+                        ", ", 
ApplicationExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
+                .add("exceptionHistory=" + exceptionHistory)
+                .toString();
+    }
+
+    public static ApplicationExceptionsInfoWithHistory 
fromApplicationExceptionHistory(
+            Collection<ApplicationExceptionHistoryEntry> exceptions) {
+        return new ApplicationExceptionsInfoWithHistory(
+                new ApplicationExceptionHistory(
+                        exceptions.stream()
+                                .map(
+                                        exception ->
+                                                new ExceptionInfo(
+                                                        exception
+                                                                .getException()
+                                                                
.getOriginalErrorClassName(),
+                                                        
exception.getExceptionAsString(),
+                                                        
exception.getTimestamp(),
+                                                        
exception.getJobId().orElse(null)))
+                                .collect(Collectors.toList())));
+    }
+
+    public static final class ApplicationExceptionHistory {
+
+        public static final String FIELD_NAME_ENTRIES = "entries";
+
+        @JsonProperty(FIELD_NAME_ENTRIES)
+        private final List<ExceptionInfo> entries;
+
+        @JsonCreator
+        public ApplicationExceptionHistory(
+                @JsonProperty(FIELD_NAME_ENTRIES) List<ExceptionInfo> entries) 
{
+            this.entries = entries;
+        }
+
+        @JsonIgnore
+        public List<ExceptionInfo> getEntries() {
+            return entries;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ApplicationExceptionHistory that = (ApplicationExceptionHistory) o;
+            return Objects.equals(entries, that.entries);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(entries);
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(
+                            ", ", 
ApplicationExceptionHistory.class.getSimpleName() + "[", "]")
+                    .add("entries=" + entries)
+                    .toString();
+        }
+    }
+
+    public static class ExceptionInfo {
+
+        public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName";
+        public static final String FIELD_NAME_EXCEPTION_STACKTRACE = 
"stacktrace";
+        public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = 
"timestamp";
+        public static final String FIELD_NAME_JOB_ID = "jobId";
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_NAME)
+        private final String exceptionName;
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE)
+        private final String stacktrace;
+
+        @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP)
+        private final long timestamp;
+
+        @JsonInclude(NON_NULL)
+        @JsonProperty(FIELD_NAME_JOB_ID)
+        @JsonSerialize(using = JobIDSerializer.class)
+        @Nullable
+        private final JobID jobId;
+
+        public ExceptionInfo(String exceptionName, String stacktrace, long 
timestamp) {
+            this(exceptionName, stacktrace, timestamp, null);
+        }
+
+        @JsonCreator
+        public ExceptionInfo(
+                @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
+                @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String 
stacktrace,
+                @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
+                @JsonProperty(FIELD_NAME_JOB_ID)
+                        @Nullable
+                        @JsonDeserialize(using = JobIDDeserializer.class)
+                        JobID jobId) {
+            this.exceptionName = checkNotNull(exceptionName);
+            this.stacktrace = checkNotNull(stacktrace);
+            this.timestamp = timestamp;
+            this.jobId = jobId;
+        }
+
+        @JsonIgnore
+        public String getExceptionName() {
+            return exceptionName;
+        }
+
+        @JsonIgnore
+        public String getStacktrace() {
+            return stacktrace;
+        }
+
+        @JsonIgnore
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        @JsonIgnore
+        @Nullable
+        public JobID getJobId() {
+            return jobId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ExceptionInfo that = (ExceptionInfo) o;
+            return exceptionName.equals(that.exceptionName)
+                    && stacktrace.equals(that.stacktrace)
+                    && Objects.equals(timestamp, that.timestamp)
+                    && Objects.equals(jobId, that.jobId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(exceptionName, stacktrace, timestamp, jobId);
+        }
+
+        @Override
+        public String toString() {
+            return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() 
+ "[", "]")
+                    .add("exceptionName='" + exceptionName + "'")
+                    .add("stacktrace='" + stacktrace + "'")
+                    .add("timestamp=" + timestamp)
+                    .add("jobId=" + jobId)
+                    .toString();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsHeaders.java
new file mode 100644
index 00000000000..22a0ad90e03
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsHeaders.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.application;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler;
+import 
org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Message headers for the {@link ApplicationExceptionsHandler}. */
+public class ApplicationExceptionsHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody,
+                ApplicationExceptionsInfoWithHistory,
+                ApplicationMessageParameters> {
+
+    private static final ApplicationExceptionsHeaders INSTANCE = new 
ApplicationExceptionsHeaders();
+
+    public static final String URL =
+            "/applications/:" + ApplicationIDPathParameter.KEY + "/exceptions";
+
+    private ApplicationExceptionsHeaders() {}
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public Class<ApplicationExceptionsInfoWithHistory> getResponseClass() {
+        return ApplicationExceptionsInfoWithHistory.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public ApplicationMessageParameters getUnresolvedMessageParameters() {
+        return new ApplicationMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    public static ApplicationExceptionsHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns exception history for an application.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java
new file mode 100644
index 00000000000..cee07b8b209
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.application;
+
+import 
org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
+
+import java.util.Collection;
+import java.util.List;
+
+/** {@link MessageParameters} for {@link ApplicationExceptionsHandler}. */
+public class ApplicationExceptionsMessageParameters extends 
ApplicationMessageParameters {
+
+    private final UpperLimitExceptionParameter upperLimitExceptionParameter =
+            new UpperLimitExceptionParameter();
+
+    @Override
+    public Collection<MessageQueryParameter<?>> getQueryParameters() {
+        return List.of(upperLimitExceptionParameter);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 2ce3206d69a..6c4fd91b3db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import 
org.apache.flink.runtime.rest.handler.application.ApplicationCancellationHandler;
 import 
org.apache.flink.runtime.rest.handler.application.ApplicationDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler;
 import 
org.apache.flink.runtime.rest.handler.application.ApplicationsOverviewHandler;
 import 
org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
@@ -138,6 +139,7 @@ import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.YarnStopJobTerminationHeaders;
 import 
org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsHeaders;
 import 
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
@@ -516,6 +518,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         
JobManagerApplicationConfigurationHeaders.getInstance(),
                         clusterConfiguration);
 
+        ApplicationExceptionsHandler applicationExceptionsHandler =
+                new ApplicationExceptionsHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        ApplicationExceptionsHeaders.getInstance());
+
         JobAccumulatorsHandler jobAccumulatorsHandler =
                 new JobAccumulatorsHandler(
                         leaderRetriever,
@@ -832,6 +841,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                 Tuple2.of(
                         
jobManagerApplicationConfigurationHandler.getMessageHeaders(),
                         jobManagerApplicationConfigurationHandler));
+        handlers.add(
+                Tuple2.of(
+                        applicationExceptionsHandler.getMessageHeaders(),
+                        applicationExceptionsHandler));
         handlers.add(Tuple2.of(jobAccumulatorsHandler.getMessageHeaders(), 
jobAccumulatorsHandler));
         handlers.add(Tuple2.of(taskManagersHandler.getMessageHeaders(), 
taskManagersHandler));
         handlers.add(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
index a68b5453a3c..f0090507b3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
@@ -229,6 +229,40 @@ public class AbstractApplicationTest {
                 listener.getTargetStates());
     }
 
+    @Test
+    void testInitialExceptionHistory() {
+        AbstractApplication application = new MockApplication(new 
ApplicationID());
+        assertTrue(application.getExceptionHistory().isEmpty());
+    }
+
+    @Test
+    void testAddExceptionHistoryEntryWithoutJobId() {
+        AbstractApplication application = new MockApplication(new 
ApplicationID());
+        Throwable exception = new RuntimeException("Test exception");
+
+        application.addExceptionHistoryEntry(exception, null);
+
+        assertEquals(1, application.getExceptionHistory().size());
+        ApplicationExceptionHistoryEntry entry = 
application.getExceptionHistory().get(0);
+        assertTrue(entry.getExceptionAsString().contains("Test exception"));
+        assertFalse(entry.getJobId().isPresent());
+    }
+
+    @Test
+    void testAddExceptionHistoryEntryWithJobId() {
+        AbstractApplication application = new MockApplication(new 
ApplicationID());
+        JobID jobId = JobID.generate();
+        Throwable exception = new RuntimeException("Test exception with job");
+
+        application.addExceptionHistoryEntry(exception, jobId);
+
+        assertEquals(1, application.getExceptionHistory().size());
+        ApplicationExceptionHistoryEntry entry = 
application.getExceptionHistory().get(0);
+        assertTrue(entry.getExceptionAsString().contains("Test exception with 
job"));
+        assertTrue(entry.getJobId().isPresent());
+        assertEquals(jobId, entry.getJobId().get());
+    }
+
     private static class MockApplication extends AbstractApplication {
         public MockApplication(ApplicationID applicationId) {
             super(applicationId);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
index 70d508c2242..a2721626f4c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
@@ -130,7 +130,8 @@ public class ArchivedApplicationStoreTestUtils {
                             "test-application-" + i,
                             state,
                             new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
-                            jobs));
+                            jobs,
+                            Collections.emptyList()));
         }
 
         return archivedApplications;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
index 055cc412c02..1929e677895 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
@@ -80,7 +80,8 @@ class ApplicationDetailsHandlerTest {
                         "Test Application",
                         ApplicationState.FINISHED,
                         new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         handlerRequest = createRequest(archivedApplication.getApplicationId());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandlerTest.java
new file mode 100644
index 00000000000..27b0e42eab9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandlerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.application.ApplicationExceptionHistoryEntry;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import 
org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.testutils.TestingUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the {@link ApplicationExceptionsHandler}. */
+class ApplicationExceptionsHandlerTest {
+
+    private ApplicationExceptionsHandler handler;
+    private HandlerRequest<EmptyRequestBody> handlerRequest;
+    private ArchivedApplication archivedApplication;
+    private TestingRestfulGateway testingRestfulGateway;
+
+    private static HandlerRequest<EmptyRequestBody> 
createRequest(ApplicationID applicationId)
+            throws HandlerRequestException {
+        Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(ApplicationIDPathParameter.KEY, 
applicationId.toString());
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new ApplicationMessageParameters(),
+                pathParameters,
+                Collections.emptyMap(),
+                Collections.emptyList());
+    }
+
+    @BeforeEach
+    void setUp() throws HandlerRequestException {
+        GatewayRetriever<RestfulGateway> leaderRetriever =
+                () -> CompletableFuture.completedFuture(null);
+        handler =
+                new ApplicationExceptionsHandler(
+                        leaderRetriever,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        ApplicationExceptionsHeaders.getInstance());
+
+        archivedApplication =
+                new ArchivedApplication(
+                        ApplicationID.generate(),
+                        "Test Application",
+                        ApplicationState.FINISHED,
+                        new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
+                        Collections.emptyMap(),
+                        Collections.emptyList());
+
+        handlerRequest = createRequest(archivedApplication.getApplicationId());
+
+        testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setRequestApplicationFunction(
+                                applicationId ->
+                                        
CompletableFuture.completedFuture(archivedApplication))
+                        .build();
+    }
+
+    @Test
+    void testNoException() throws Exception {
+        ApplicationExceptionsInfoWithHistory response =
+                handler.handleRequest(handlerRequest, 
testingRestfulGateway).get();
+
+        assertThat(response.getExceptionHistory().getEntries()).isEmpty();
+    }
+
+    @Test
+    void testExceptionWithJobId() throws Exception {
+        final RuntimeException rootCause = new RuntimeException("exception 
#0");
+        final long rootCauseTimestamp = System.currentTimeMillis();
+        final JobID jobId = new JobID();
+
+        final List<ApplicationExceptionHistoryEntry> exceptionHistory = new 
ArrayList<>();
+        exceptionHistory.add(
+                new ApplicationExceptionHistoryEntry(rootCause, 
rootCauseTimestamp, jobId));
+
+        final ArchivedApplication applicationWithException =
+                new ArchivedApplication(
+                        archivedApplication.getApplicationId(),
+                        archivedApplication.getApplicationName(),
+                        ApplicationState.FAILED,
+                        new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
+                        Collections.emptyMap(),
+                        exceptionHistory);
+
+        testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setRequestApplicationFunction(
+                                applicationId ->
+                                        
CompletableFuture.completedFuture(applicationWithException))
+                        .build();
+
+        final ApplicationExceptionsInfoWithHistory response =
+                handler.handleRequest(handlerRequest, 
testingRestfulGateway).get();
+
+        assertThat(response.getExceptionHistory().getEntries()).hasSize(1);
+        final ApplicationExceptionsInfoWithHistory.ExceptionInfo exceptionInfo 
=
+                response.getExceptionHistory().getEntries().get(0);
+        
assertThat(exceptionInfo.getExceptionName()).isEqualTo(rootCause.getClass().getName());
+        assertThat(exceptionInfo.getTimestamp()).isEqualTo(rootCauseTimestamp);
+        assertThat(exceptionInfo.getJobId()).isNotNull();
+        assertThat(exceptionInfo.getJobId()).isEqualTo(jobId);
+    }
+
+    @Test
+    void testExceptionWithoutJobId() throws Exception {
+        final RuntimeException rootCause = new RuntimeException("exception 
#0");
+        final long rootCauseTimestamp = System.currentTimeMillis();
+
+        final List<ApplicationExceptionHistoryEntry> exceptionHistory = new 
ArrayList<>();
+        exceptionHistory.add(
+                new ApplicationExceptionHistoryEntry(rootCause, 
rootCauseTimestamp, null));
+
+        final ArchivedApplication applicationWithException =
+                new ArchivedApplication(
+                        archivedApplication.getApplicationId(),
+                        archivedApplication.getApplicationName(),
+                        ApplicationState.FAILED,
+                        new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
+                        Collections.emptyMap(),
+                        exceptionHistory);
+
+        testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setRequestApplicationFunction(
+                                applicationId ->
+                                        
CompletableFuture.completedFuture(applicationWithException))
+                        .build();
+
+        final ApplicationExceptionsInfoWithHistory response =
+                handler.handleRequest(handlerRequest, 
testingRestfulGateway).get();
+
+        assertThat(response.getExceptionHistory().getEntries()).hasSize(1);
+        final ApplicationExceptionsInfoWithHistory.ExceptionInfo exceptionInfo 
=
+                response.getExceptionHistory().getEntries().get(0);
+        
assertThat(exceptionInfo.getExceptionName()).isEqualTo(rootCause.getClass().getName());
+        assertThat(exceptionInfo.getTimestamp()).isEqualTo(rootCauseTimestamp);
+        assertThat(exceptionInfo.getJobId()).isNull();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
index be699f88425..9b08453e798 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
@@ -77,7 +77,8 @@ class ApplicationsOverviewHandlerTest {
                         "Test Application",
                         ApplicationState.FINISHED,
                         new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         testingRestfulGateway =
                 new TestingRestfulGateway.Builder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistoryTest.java
new file mode 100644
index 00000000000..7073d7d5822
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistoryTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that the {@link ApplicationExceptionsInfoWithHistory} can be 
marshalled and unmarshalled.
+ */
+@ExtendWith(NoOpTestExtension.class)
+class ApplicationExceptionsInfoWithHistoryTest
+        extends 
RestResponseMarshallingTestBase<ApplicationExceptionsInfoWithHistory> {
+
+    @Override
+    protected Class<ApplicationExceptionsInfoWithHistory> 
getTestResponseClass() {
+        return ApplicationExceptionsInfoWithHistory.class;
+    }
+
+    @Override
+    protected ApplicationExceptionsInfoWithHistory getTestResponseInstance() 
throws Exception {
+        return new ApplicationExceptionsInfoWithHistory(
+                new 
ApplicationExceptionsInfoWithHistory.ApplicationExceptionHistory(
+                        Arrays.asList(
+                                new 
ApplicationExceptionsInfoWithHistory.ExceptionInfo(
+                                        "exception #0", "stacktrace #0", 0L, 
new JobID()),
+                                new 
ApplicationExceptionsInfoWithHistory.ExceptionInfo(
+                                        "exception #1", "stacktrace #1", 1L, 
null))));
+    }
+
+    /**
+     * {@code jobId} should not be exposed if not set.
+     *
+     * @throws JsonProcessingException is not expected to be thrown
+     */
+    @Test
+    void testNullFieldsNotSet() throws JsonProcessingException {
+        ObjectMapper objMapper = RestMapperUtils.getStrictObjectMapper();
+        String json =
+                objMapper.writeValueAsString(
+                        new ApplicationExceptionsInfoWithHistory.ExceptionInfo(
+                                "exception name", "stacktrace", 0L));
+
+        assertThat(json).doesNotContain("jobId");
+    }
+}

Reply via email to