This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e56d433218dc0e3e6a6bdc4834b680a6b2fa56bf Author: Yi Zhang <[email protected]> AuthorDate: Sat Feb 21 11:29:35 2026 +0800 [FLINK-38977][runtime] Expose exceptions for applications --- .../application/PackagedProgramApplication.java | 38 +++- .../PackagedProgramApplicationTest.java | 122 ++++++---- .../webmonitor/history/HistoryServerTest.java | 3 +- .../src/test/resources/rest_api_v1.snapshot | 52 ++++- .../runtime/application/AbstractApplication.java | 13 ++ .../ApplicationExceptionHistoryEntry.java | 52 +++++ .../runtime/application/ArchivedApplication.java | 19 +- .../flink/runtime/dispatcher/Dispatcher.java | 27 ++- .../application/ApplicationExceptionsHandler.java | 93 ++++++++ .../ApplicationExceptionsInfoWithHistory.java | 246 +++++++++++++++++++++ .../application/ApplicationExceptionsHeaders.java | 83 +++++++ .../ApplicationExceptionsMessageParameters.java | 40 ++++ .../runtime/webmonitor/WebMonitorEndpoint.java | 13 ++ .../application/AbstractApplicationTest.java | 34 +++ .../ArchivedApplicationStoreTestUtils.java | 3 +- .../application/ApplicationDetailsHandlerTest.java | 3 +- .../ApplicationExceptionsHandlerTest.java | 181 +++++++++++++++ .../ApplicationsOverviewHandlerTest.java | 3 +- .../ApplicationExceptionsInfoWithHistoryTest.java | 73 ++++++ 19 files changed, 1030 insertions(+), 68 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java index a06d1a13854..7352ec85a1e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java @@ -221,13 +221,21 @@ public class PackagedProgramApplication extends AbstractApplication { dispatcherGateway, ApplicationStatus.SUCCEEDED); } - final Optional<JobStatus> maybeJobStatus = - extractJobStatus(t); - if (maybeJobStatus.isPresent()) { + final Optional<UnsuccessfulExecutionException> + maybeJobFailure = extractJobFailure(t); + // An UnsuccessfulExecutionException indicates the job + // terminated in CANCELED or FAILED state, since we already + // waited for a globally terminal state + // (FINISHED/CANCELED/FAILED) and FINISHED jobs do not throw + // this exception + if (maybeJobFailure.isPresent()) { // the exception is caused by job execution results + UnsuccessfulExecutionException jobFailure = + maybeJobFailure.get(); + JobStatus jobStatus = + jobFailure.getStatus().orElseThrow(); ApplicationState applicationState = - ApplicationState.fromJobStatus( - maybeJobStatus.get()); + ApplicationState.fromJobStatus(jobStatus); LOG.info("Application {}: ", applicationState, t); if (applicationState == ApplicationState.CANCELED) { transitionToCanceling(); @@ -238,6 +246,9 @@ public class PackagedProgramApplication extends AbstractApplication { errorHandler); } else { + addExceptionHistoryEntry( + jobFailure.getCause(), + jobFailure.getJobID()); transitionToFailing(); return finishAsFailed( dispatcherGateway, @@ -351,7 +362,12 @@ public class PackagedProgramApplication extends AbstractApplication { dispatcherGateway, scheduledExecutor, mainThreadExecutor, errorHandler); } - LOG.warn("Application failed unexpectedly: ", t); + final Optional<ApplicationExecutionException> maybeApplicationFailure = + extractApplicationFailure(t); + final Throwable cause = + maybeApplicationFailure.isPresent() ? maybeApplicationFailure.get() : t; + LOG.warn("Application failed unexpectedly: ", cause); + addExceptionHistoryEntry(cause, null); transitionToFailing(); return finishAsFailed( dispatcherGateway, scheduledExecutor, mainThreadExecutor, errorHandler); @@ -526,10 +542,12 @@ public class PackagedProgramApplication extends AbstractApplication { : CompletableFuture.completedFuture(Acknowledge.get()); } - private Optional<JobStatus> extractJobStatus(Throwable t) { - final Optional<UnsuccessfulExecutionException> maybeException = - ExceptionUtils.findThrowable(t, UnsuccessfulExecutionException.class); - return maybeException.flatMap(UnsuccessfulExecutionException::getStatus); + private Optional<UnsuccessfulExecutionException> extractJobFailure(Throwable t) { + return ExceptionUtils.findThrowable(t, UnsuccessfulExecutionException.class); + } + + private Optional<ApplicationExecutionException> extractApplicationFailure(Throwable t) { + return ExceptionUtils.findThrowable(t, ApplicationExecutionException.class); } /** diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java index 2ae9882b475..e6e26eacdb6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationTest.java @@ -55,8 +55,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import javax.annotation.Nullable; - import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; @@ -487,6 +485,9 @@ public class PackagedProgramApplicationTest { application.cancel(); assertApplicationCanceled(application); + + // verify exception history is empty when application is canceled + assertThat(application.getExceptionHistory()).isEmpty(); } @Test @@ -580,6 +581,9 @@ public class PackagedProgramApplicationTest { assertThat(applicationExecutionFuture.isDone()).isTrue(); assertThat(canceledJobIds).containsExactlyInAnyOrderElementsOf(submittedJobIds); assertApplicationCanceled(application); + + // verify exception history is empty when application is canceled + assertThat(application.getExceptionHistory()).isEmpty(); } @Test @@ -831,44 +835,6 @@ public class PackagedProgramApplicationTest { .isEqualTo(ApplicationStatus.CANCELED); } - @Test - void testErrorHandlerIsNotCalledWhenApplicationStatusIsUnknown() throws Exception { - final AtomicBoolean shutdownCalled = new AtomicBoolean(false); - final TestingDispatcherGateway.Builder dispatcherBuilder = - canceledJobGatewayBuilder() - .setRequestJobResultFunction( - jobID -> - CompletableFuture.completedFuture( - createUnknownJobResult(jobID))) - .setClusterShutdownFunction( - status -> { - shutdownCalled.set(true); - return CompletableFuture.completedFuture(Acknowledge.get()); - }); - - final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build(); - final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); - final PackagedProgramApplication application = - createAndExecuteApplication( - 3, - dispatcherGateway, - scheduledExecutor, - errorHandlerFuture::completeExceptionally); - - // check that application completes exceptionally - assertException( - application.getApplicationCompletionFuture(), UnsuccessfulExecutionException.class); - - application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertApplicationFailed(application); - - // we do not call the error handler - assertThat(errorHandlerFuture.isDone()).isFalse(); - - // verify that we shut down the cluster - assertThat(shutdownCalled.get()).isTrue(); - } - @Test void testErrorHandlerIsNotCalled() throws Exception { final AtomicBoolean shutdownCalled = new AtomicBoolean(false); @@ -1122,6 +1088,70 @@ public class PackagedProgramApplicationTest { assertApplicationFailed(application); } + @Test + void testExceptionHistoryWhenJobFails() throws Exception { + final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>(); + + final TestingDispatcherGateway.Builder dispatcherBuilder = + failedJobGatewayBuilder() + .setSubmitFunction( + jobGraph -> { + submittedJobIds.add(jobGraph.getJobID()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final PackagedProgramApplication application = + createAndExecuteApplication(2, dispatcherBuilder.build()); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + + // verify exception history contains the job failure + assertThat(application.getExceptionHistory()).hasSize(1); + assertThat(application.getExceptionHistory().get(0).getJobId().isPresent()).isTrue(); + assertThat(application.getExceptionHistory().get(0).getJobId().get()) + .isEqualTo(submittedJobIds.peek()); + assertThat(application.getExceptionHistory().get(0).getExceptionAsString()) + .contains("Job execution failed"); + } + + @Test + void testExceptionHistoryWhenApplicationFails() throws Exception { + final TestingDispatcherGateway.Builder dispatcherBuilder = + TestingDispatcherGateway.newBuilder() + .setSubmitFunction( + jobGraph -> + FutureUtils.completedExceptionally( + new FlinkRuntimeException( + "Application execution failed"))); + + final PackagedProgramApplication application = + createAndExecuteApplication(1, dispatcherBuilder.build()); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationFailed(application); + + // verify exception history contains the application failure (without jobId) + assertThat(application.getExceptionHistory()).hasSize(1); + assertThat(application.getExceptionHistory().get(0).getJobId().isPresent()).isFalse(); + assertThat(application.getExceptionHistory().get(0).getExceptionAsString()) + .contains("Application execution failed"); + } + + @Test + void testExceptionHistoryEmptyWhenJobIsCanceled() throws Exception { + final TestingDispatcherGateway.Builder dispatcherBuilder = canceledJobGatewayBuilder(); + + PackagedProgramApplication application = + createAndExecuteApplication(3, dispatcherBuilder.build()); + + application.getFinishApplicationFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertApplicationCanceled(application); + + // verify exception history is empty when job is canceled + assertThat(application.getExceptionHistory()).isEmpty(); + } + private TestingDispatcherGateway.Builder finishedJobGatewayBuilder() { return dispatcherGatewayBuilder(JobStatus.FINISHED); } @@ -1268,20 +1298,16 @@ public class PackagedProgramApplicationTest { return createJobResult(jobId, JobStatus.CANCELED); } - private static JobResult createUnknownJobResult(final JobID jobId) { - return createJobResult(jobId, null); - } - - private static JobResult createJobResult( - final JobID jobID, @Nullable final JobStatus jobStatus) { + private static JobResult createJobResult(final JobID jobID, final JobStatus jobStatus) { JobResult.Builder builder = new JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus); if (jobStatus == JobStatus.CANCELED) { builder.serializedThrowable( new SerializedThrowable(new JobCancellationException(jobID, "Hello", null))); - } else if (jobStatus == JobStatus.FAILED || jobStatus == null) { + } else if (jobStatus == JobStatus.FAILED) { builder.serializedThrowable( - new SerializedThrowable(new JobExecutionException(jobID, "bla bla bla"))); + new SerializedThrowable( + new JobExecutionException(jobID, "Job execution failed"))); } return builder.build(); } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 91c213e6d97..e5045af1468 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -944,7 +944,8 @@ class HistoryServerTest { "test-application", ApplicationState.FINISHED, new long[ApplicationState.values().length], - jobs); + jobs, + Collections.emptyList()); } private ExecutionGraphInfo createExecutionGraphInfo() { diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 7cebf7c7f9e..6a84ac6d888 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -201,6 +201,56 @@ } } } + }, { + "url" : "/applications/:applicationid/exceptions", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "applicationid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ApplicationExceptionsInfoWithHistory", + "properties" : { + "exceptionHistory" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ApplicationExceptionsInfoWithHistory:ApplicationExceptionHistory", + "properties" : { + "entries" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ApplicationExceptionsInfoWithHistory:ExceptionInfo", + "properties" : { + "exceptionName" : { + "type" : "string" + }, + "stacktrace" : { + "type" : "string" + }, + "timestamp" : { + "type" : "integer" + }, + "jobId" : { + "type" : "any" + } + } + } + } + } + } + } + } }, { "url" : "/cluster", "method" : "DELETE", @@ -4732,4 +4782,4 @@ } } } ] -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java index aef3f17b893..d910c2c8a2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java @@ -30,6 +30,8 @@ import org.apache.flink.util.concurrent.ScheduledExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +67,8 @@ public abstract class AbstractApplication implements Serializable { private final Set<JobID> jobs = new HashSet<>(); + private final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>(); + /** * List of registered application status listeners that will be notified via {@link * ApplicationStatusListener#notifyApplicationStatusChange} when the application state changes. @@ -127,6 +131,15 @@ public abstract class AbstractApplication implements Serializable { return Collections.unmodifiableSet(jobs); } + public List<ApplicationExceptionHistoryEntry> getExceptionHistory() { + return Collections.unmodifiableList(exceptionHistory); + } + + public void addExceptionHistoryEntry(Throwable throwable, @Nullable JobID jobId) { + exceptionHistory.add( + new ApplicationExceptionHistoryEntry(throwable, System.currentTimeMillis(), jobId)); + } + /** * Adds a job ID to the jobs set. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationExceptionHistoryEntry.java new file mode 100644 index 00000000000..5a34b9ed5be --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationExceptionHistoryEntry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.application; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ErrorInfo; + +import javax.annotation.Nullable; + +import java.util.Optional; + +/** + * {@code ExceptionHistoryEntry} collects information about a single failure that triggered the + * application's failure. + */ +public class ApplicationExceptionHistoryEntry extends ErrorInfo { + + private static final long serialVersionUID = -3855285510064263702L; + + /** + * The ID of the job that caused the failure. + * + * <p>This field is null if the failure was not caused by a job. + */ + @Nullable private final JobID jobId; + + public ApplicationExceptionHistoryEntry( + Throwable cause, long timestamp, @Nullable JobID jobId) { + super(cause, timestamp); + this.jobId = jobId; + } + + public Optional<JobID> getJobId() { + return Optional.ofNullable(jobId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java index 771366bc22a..bde97fe721f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.Map; /** Read-only information about an {@link AbstractApplication}. */ @@ -42,17 +43,21 @@ public class ArchivedApplication implements Serializable { private final Map<JobID, ExecutionGraphInfo> jobs; + private final Collection<ApplicationExceptionHistoryEntry> exceptionHistory; + public ArchivedApplication( ApplicationID applicationId, String applicationName, ApplicationState applicationState, long[] statusTimestamps, - Map<JobID, ExecutionGraphInfo> jobs) { + Map<JobID, ExecutionGraphInfo> jobs, + Collection<ApplicationExceptionHistoryEntry> exceptionHistory) { this.applicationId = applicationId; this.applicationName = applicationName; this.applicationState = applicationState; this.statusTimestamps = statusTimestamps; this.jobs = jobs; + this.exceptionHistory = exceptionHistory; } public ApplicationID getApplicationId() { @@ -75,6 +80,10 @@ public class ArchivedApplication implements Serializable { return jobs; } + public Collection<ApplicationExceptionHistoryEntry> getExceptionHistory() { + return exceptionHistory; + } + @Override public String toString() { return "ArchivedApplication{" @@ -89,6 +98,8 @@ public class ArchivedApplication implements Serializable { + Arrays.toString(statusTimestamps) + ", jobs=" + jobs + + ", exceptionHistory=" + + exceptionHistory + '}'; } @@ -105,7 +116,8 @@ public class ArchivedApplication implements Serializable { && applicationName.equals(that.applicationName) && applicationState == that.applicationState && Arrays.equals(statusTimestamps, that.statusTimestamps) - && jobs.equals(that.jobs); + && jobs.equals(that.jobs) + && exceptionHistory.equals(that.exceptionHistory); } @Override @@ -114,6 +126,7 @@ public class ArchivedApplication implements Serializable { + 31 * applicationName.hashCode() + 31 * applicationState.hashCode() + 31 * Arrays.hashCode(statusTimestamps) - + 31 * jobs.hashCode(); + + 31 * jobs.hashCode() + + 31 * exceptionHistory.hashCode(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 4d79622e6b7..22a3ee2c444 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -59,6 +59,7 @@ import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -723,13 +724,34 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> executionGraphInfo.getJobId()); } + // record job exception for SingleJobApplication + if (application instanceof SingleJobApplication) { + jobs.values() + .forEach( + executionGraphInfo -> { + ErrorInfo errorInfo = + executionGraphInfo + .getArchivedExecutionGraph() + .getFailureInfo(); + if (errorInfo != null) { + application + .addExceptionHistoryEntry( + errorInfo + .getException(), + executionGraphInfo + .getJobId()); + } + }); + } + ArchivedApplication archivedApplication = new ArchivedApplication( application.getApplicationId(), application.getName(), application.getApplicationStatus(), stateTimestamps, - jobs); + jobs, + application.getExceptionHistory()); applications.remove(applicationId); writeToArchivedApplicationStore(archivedApplication); @@ -1262,7 +1284,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> ExecutionGraphInfo ::getJobId, executionGraphInfo -> - executionGraphInfo))))); + executionGraphInfo)), + application.getExceptionHistory()))); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandler.java new file mode 100644 index 00000000000..6dc04cd6d88 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter; +import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler returning the exception history for the specified application. */ +public class ApplicationExceptionsHandler + extends AbstractRestHandler< + RestfulGateway, + EmptyRequestBody, + ApplicationExceptionsInfoWithHistory, + ApplicationMessageParameters> + implements ApplicationJsonArchivist { + + public ApplicationExceptionsHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Duration timeout, + Map<String, String> responseHeaders, + MessageHeaders< + EmptyRequestBody, + ApplicationExceptionsInfoWithHistory, + ApplicationMessageParameters> + messageHeaders) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + public CompletableFuture<ApplicationExceptionsInfoWithHistory> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) { + ApplicationID applicationId = request.getPathParameter(ApplicationIDPathParameter.class); + + return gateway.requestApplication(applicationId, timeout) + .thenApply( + archivedApplication -> + ApplicationExceptionsInfoWithHistory + .fromApplicationExceptionHistory( + archivedApplication.getExceptionHistory())); + } + + @Override + public Collection<ArchivedJson> archiveApplicationWithPath( + ArchivedApplication archivedApplication) throws IOException { + String path = + getMessageHeaders() + .getTargetRestEndpointURL() + .replace( + ':' + ApplicationIDPathParameter.KEY, + archivedApplication.getApplicationId().toHexString()); + return Collections.singleton( + new ArchivedJson( + path, + ApplicationExceptionsInfoWithHistory.fromApplicationExceptionHistory( + archivedApplication.getExceptionHistory()))); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistory.java new file mode 100644 index 00000000000..15be60cf38c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistory.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.application.ApplicationExceptionHistoryEntry; +import org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler; +import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.stream.Collectors; + +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code ApplicationExceptionsInfoWithHistory} providing a history of previously caused failures. + * It's the response type of the {@link ApplicationExceptionsHandler}. + */ +public class ApplicationExceptionsInfoWithHistory implements ResponseBody { + + public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory"; + + @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) + private final ApplicationExceptionHistory exceptionHistory; + + @JsonCreator + public ApplicationExceptionsInfoWithHistory( + @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) + ApplicationExceptionHistory exceptionHistory) { + this.exceptionHistory = exceptionHistory; + } + + @JsonIgnore + public ApplicationExceptionHistory getExceptionHistory() { + return exceptionHistory; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ApplicationExceptionsInfoWithHistory that = (ApplicationExceptionsInfoWithHistory) o; + return Objects.equals(exceptionHistory, that.exceptionHistory); + } + + @Override + public int hashCode() { + return Objects.hash(exceptionHistory); + } + + @Override + public String toString() { + return new StringJoiner( + ", ", ApplicationExceptionsInfoWithHistory.class.getSimpleName() + "[", "]") + .add("exceptionHistory=" + exceptionHistory) + .toString(); + } + + public static ApplicationExceptionsInfoWithHistory fromApplicationExceptionHistory( + Collection<ApplicationExceptionHistoryEntry> exceptions) { + return new ApplicationExceptionsInfoWithHistory( + new ApplicationExceptionHistory( + exceptions.stream() + .map( + exception -> + new ExceptionInfo( + exception + .getException() + .getOriginalErrorClassName(), + exception.getExceptionAsString(), + exception.getTimestamp(), + exception.getJobId().orElse(null))) + .collect(Collectors.toList()))); + } + + public static final class ApplicationExceptionHistory { + + public static final String FIELD_NAME_ENTRIES = "entries"; + + @JsonProperty(FIELD_NAME_ENTRIES) + private final List<ExceptionInfo> entries; + + @JsonCreator + public ApplicationExceptionHistory( + @JsonProperty(FIELD_NAME_ENTRIES) List<ExceptionInfo> entries) { + this.entries = entries; + } + + @JsonIgnore + public List<ExceptionInfo> getEntries() { + return entries; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ApplicationExceptionHistory that = (ApplicationExceptionHistory) o; + return Objects.equals(entries, that.entries); + } + + @Override + public int hashCode() { + return Objects.hash(entries); + } + + @Override + public String toString() { + return new StringJoiner( + ", ", ApplicationExceptionHistory.class.getSimpleName() + "[", "]") + .add("entries=" + entries) + .toString(); + } + } + + public static class ExceptionInfo { + + public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName"; + public static final String FIELD_NAME_EXCEPTION_STACKTRACE = "stacktrace"; + public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = "timestamp"; + public static final String FIELD_NAME_JOB_ID = "jobId"; + + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) + private final String exceptionName; + + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) + private final String stacktrace; + + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) + private final long timestamp; + + @JsonInclude(NON_NULL) + @JsonProperty(FIELD_NAME_JOB_ID) + @JsonSerialize(using = JobIDSerializer.class) + @Nullable + private final JobID jobId; + + public ExceptionInfo(String exceptionName, String stacktrace, long timestamp) { + this(exceptionName, stacktrace, timestamp, null); + } + + @JsonCreator + public ExceptionInfo( + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName, + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace, + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp, + @JsonProperty(FIELD_NAME_JOB_ID) + @Nullable + @JsonDeserialize(using = JobIDDeserializer.class) + JobID jobId) { + this.exceptionName = checkNotNull(exceptionName); + this.stacktrace = checkNotNull(stacktrace); + this.timestamp = timestamp; + this.jobId = jobId; + } + + @JsonIgnore + public String getExceptionName() { + return exceptionName; + } + + @JsonIgnore + public String getStacktrace() { + return stacktrace; + } + + @JsonIgnore + public long getTimestamp() { + return timestamp; + } + + @JsonIgnore + @Nullable + public JobID getJobId() { + return jobId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExceptionInfo that = (ExceptionInfo) o; + return exceptionName.equals(that.exceptionName) + && stacktrace.equals(that.stacktrace) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(jobId, that.jobId); + } + + @Override + public int hashCode() { + return Objects.hash(exceptionName, stacktrace, timestamp, jobId); + } + + @Override + public String toString() { + return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() + "[", "]") + .add("exceptionName='" + exceptionName + "'") + .add("stacktrace='" + stacktrace + "'") + .add("timestamp=" + timestamp) + .add("jobId=" + jobId) + .toString(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsHeaders.java new file mode 100644 index 00000000000..22a0ad90e03 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsHeaders.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.application; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler; +import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter; +import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Message headers for the {@link ApplicationExceptionsHandler}. */ +public class ApplicationExceptionsHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, + ApplicationExceptionsInfoWithHistory, + ApplicationMessageParameters> { + + private static final ApplicationExceptionsHeaders INSTANCE = new ApplicationExceptionsHeaders(); + + public static final String URL = + "/applications/:" + ApplicationIDPathParameter.KEY + "/exceptions"; + + private ApplicationExceptionsHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<ApplicationExceptionsInfoWithHistory> getResponseClass() { + return ApplicationExceptionsInfoWithHistory.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public ApplicationMessageParameters getUnresolvedMessageParameters() { + return new ApplicationMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static ApplicationExceptionsHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns exception history for an application."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java new file mode 100644 index 00000000000..cee07b8b209 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.application; + +import org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler; +import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter; + +import java.util.Collection; +import java.util.List; + +/** {@link MessageParameters} for {@link ApplicationExceptionsHandler}. */ +public class ApplicationExceptionsMessageParameters extends ApplicationMessageParameters { + + private final UpperLimitExceptionParameter upperLimitExceptionParameter = + new UpperLimitExceptionParameter(); + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return List.of(upperLimitExceptionParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 2ce3206d69a..6c4fd91b3db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.application.ApplicationCancellationHandler; import org.apache.flink.runtime.rest.handler.application.ApplicationDetailsHandler; +import org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler; import org.apache.flink.runtime.rest.handler.application.ApplicationsOverviewHandler; import org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler; import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler; @@ -138,6 +139,7 @@ import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders; import org.apache.flink.runtime.rest.messages.YarnStopJobTerminationHeaders; import org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders; +import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsHeaders; import org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; @@ -516,6 +518,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp JobManagerApplicationConfigurationHeaders.getInstance(), clusterConfiguration); + ApplicationExceptionsHandler applicationExceptionsHandler = + new ApplicationExceptionsHandler( + leaderRetriever, + timeout, + responseHeaders, + ApplicationExceptionsHeaders.getInstance()); + JobAccumulatorsHandler jobAccumulatorsHandler = new JobAccumulatorsHandler( leaderRetriever, @@ -832,6 +841,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp Tuple2.of( jobManagerApplicationConfigurationHandler.getMessageHeaders(), jobManagerApplicationConfigurationHandler)); + handlers.add( + Tuple2.of( + applicationExceptionsHandler.getMessageHeaders(), + applicationExceptionsHandler)); handlers.add(Tuple2.of(jobAccumulatorsHandler.getMessageHeaders(), jobAccumulatorsHandler)); handlers.add(Tuple2.of(taskManagersHandler.getMessageHeaders(), taskManagersHandler)); handlers.add( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java index a68b5453a3c..f0090507b3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java @@ -229,6 +229,40 @@ public class AbstractApplicationTest { listener.getTargetStates()); } + @Test + void testInitialExceptionHistory() { + AbstractApplication application = new MockApplication(new ApplicationID()); + assertTrue(application.getExceptionHistory().isEmpty()); + } + + @Test + void testAddExceptionHistoryEntryWithoutJobId() { + AbstractApplication application = new MockApplication(new ApplicationID()); + Throwable exception = new RuntimeException("Test exception"); + + application.addExceptionHistoryEntry(exception, null); + + assertEquals(1, application.getExceptionHistory().size()); + ApplicationExceptionHistoryEntry entry = application.getExceptionHistory().get(0); + assertTrue(entry.getExceptionAsString().contains("Test exception")); + assertFalse(entry.getJobId().isPresent()); + } + + @Test + void testAddExceptionHistoryEntryWithJobId() { + AbstractApplication application = new MockApplication(new ApplicationID()); + JobID jobId = JobID.generate(); + Throwable exception = new RuntimeException("Test exception with job"); + + application.addExceptionHistoryEntry(exception, jobId); + + assertEquals(1, application.getExceptionHistory().size()); + ApplicationExceptionHistoryEntry entry = application.getExceptionHistory().get(0); + assertTrue(entry.getExceptionAsString().contains("Test exception with job")); + assertTrue(entry.getJobId().isPresent()); + assertEquals(jobId, entry.getJobId().get()); + } + private static class MockApplication extends AbstractApplication { public MockApplication(ApplicationID applicationId) { super(applicationId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java index 70d508c2242..a2721626f4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java @@ -130,7 +130,8 @@ public class ArchivedApplicationStoreTestUtils { "test-application-" + i, state, new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, - jobs)); + jobs, + Collections.emptyList())); } return archivedApplications; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java index 055cc412c02..1929e677895 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java @@ -80,7 +80,8 @@ class ApplicationDetailsHandlerTest { "Test Application", ApplicationState.FINISHED, new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, - Collections.emptyMap()); + Collections.emptyMap(), + Collections.emptyList()); handlerRequest = createRequest(archivedApplication.getApplicationId()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandlerTest.java new file mode 100644 index 00000000000..27b0e42eab9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationExceptionsHandlerTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.application.ApplicationExceptionHistoryEntry; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter; +import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.testutils.TestingUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the {@link ApplicationExceptionsHandler}. */ +class ApplicationExceptionsHandlerTest { + + private ApplicationExceptionsHandler handler; + private HandlerRequest<EmptyRequestBody> handlerRequest; + private ArchivedApplication archivedApplication; + private TestingRestfulGateway testingRestfulGateway; + + private static HandlerRequest<EmptyRequestBody> createRequest(ApplicationID applicationId) + throws HandlerRequestException { + Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put(ApplicationIDPathParameter.KEY, applicationId.toString()); + return HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new ApplicationMessageParameters(), + pathParameters, + Collections.emptyMap(), + Collections.emptyList()); + } + + @BeforeEach + void setUp() throws HandlerRequestException { + GatewayRetriever<RestfulGateway> leaderRetriever = + () -> CompletableFuture.completedFuture(null); + handler = + new ApplicationExceptionsHandler( + leaderRetriever, + TestingUtils.TIMEOUT, + Collections.emptyMap(), + ApplicationExceptionsHeaders.getInstance()); + + archivedApplication = + new ArchivedApplication( + ApplicationID.generate(), + "Test Application", + ApplicationState.FINISHED, + new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, + Collections.emptyMap(), + Collections.emptyList()); + + handlerRequest = createRequest(archivedApplication.getApplicationId()); + + testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setRequestApplicationFunction( + applicationId -> + CompletableFuture.completedFuture(archivedApplication)) + .build(); + } + + @Test + void testNoException() throws Exception { + ApplicationExceptionsInfoWithHistory response = + handler.handleRequest(handlerRequest, testingRestfulGateway).get(); + + assertThat(response.getExceptionHistory().getEntries()).isEmpty(); + } + + @Test + void testExceptionWithJobId() throws Exception { + final RuntimeException rootCause = new RuntimeException("exception #0"); + final long rootCauseTimestamp = System.currentTimeMillis(); + final JobID jobId = new JobID(); + + final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>(); + exceptionHistory.add( + new ApplicationExceptionHistoryEntry(rootCause, rootCauseTimestamp, jobId)); + + final ArchivedApplication applicationWithException = + new ArchivedApplication( + archivedApplication.getApplicationId(), + archivedApplication.getApplicationName(), + ApplicationState.FAILED, + new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, + Collections.emptyMap(), + exceptionHistory); + + testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setRequestApplicationFunction( + applicationId -> + CompletableFuture.completedFuture(applicationWithException)) + .build(); + + final ApplicationExceptionsInfoWithHistory response = + handler.handleRequest(handlerRequest, testingRestfulGateway).get(); + + assertThat(response.getExceptionHistory().getEntries()).hasSize(1); + final ApplicationExceptionsInfoWithHistory.ExceptionInfo exceptionInfo = + response.getExceptionHistory().getEntries().get(0); + assertThat(exceptionInfo.getExceptionName()).isEqualTo(rootCause.getClass().getName()); + assertThat(exceptionInfo.getTimestamp()).isEqualTo(rootCauseTimestamp); + assertThat(exceptionInfo.getJobId()).isNotNull(); + assertThat(exceptionInfo.getJobId()).isEqualTo(jobId); + } + + @Test + void testExceptionWithoutJobId() throws Exception { + final RuntimeException rootCause = new RuntimeException("exception #0"); + final long rootCauseTimestamp = System.currentTimeMillis(); + + final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>(); + exceptionHistory.add( + new ApplicationExceptionHistoryEntry(rootCause, rootCauseTimestamp, null)); + + final ArchivedApplication applicationWithException = + new ArchivedApplication( + archivedApplication.getApplicationId(), + archivedApplication.getApplicationName(), + ApplicationState.FAILED, + new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, + Collections.emptyMap(), + exceptionHistory); + + testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setRequestApplicationFunction( + applicationId -> + CompletableFuture.completedFuture(applicationWithException)) + .build(); + + final ApplicationExceptionsInfoWithHistory response = + handler.handleRequest(handlerRequest, testingRestfulGateway).get(); + + assertThat(response.getExceptionHistory().getEntries()).hasSize(1); + final ApplicationExceptionsInfoWithHistory.ExceptionInfo exceptionInfo = + response.getExceptionHistory().getEntries().get(0); + assertThat(exceptionInfo.getExceptionName()).isEqualTo(rootCause.getClass().getName()); + assertThat(exceptionInfo.getTimestamp()).isEqualTo(rootCauseTimestamp); + assertThat(exceptionInfo.getJobId()).isNull(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java index be699f88425..9b08453e798 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java @@ -77,7 +77,8 @@ class ApplicationsOverviewHandlerTest { "Test Application", ApplicationState.FINISHED, new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, - Collections.emptyMap()); + Collections.emptyMap(), + Collections.emptyList()); testingRestfulGateway = new TestingRestfulGateway.Builder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistoryTest.java new file mode 100644 index 00000000000..7073d7d5822 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ApplicationExceptionsInfoWithHistoryTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that the {@link ApplicationExceptionsInfoWithHistory} can be marshalled and unmarshalled. + */ +@ExtendWith(NoOpTestExtension.class) +class ApplicationExceptionsInfoWithHistoryTest + extends RestResponseMarshallingTestBase<ApplicationExceptionsInfoWithHistory> { + + @Override + protected Class<ApplicationExceptionsInfoWithHistory> getTestResponseClass() { + return ApplicationExceptionsInfoWithHistory.class; + } + + @Override + protected ApplicationExceptionsInfoWithHistory getTestResponseInstance() throws Exception { + return new ApplicationExceptionsInfoWithHistory( + new ApplicationExceptionsInfoWithHistory.ApplicationExceptionHistory( + Arrays.asList( + new ApplicationExceptionsInfoWithHistory.ExceptionInfo( + "exception #0", "stacktrace #0", 0L, new JobID()), + new ApplicationExceptionsInfoWithHistory.ExceptionInfo( + "exception #1", "stacktrace #1", 1L, null)))); + } + + /** + * {@code jobId} should not be exposed if not set. + * + * @throws JsonProcessingException is not expected to be thrown + */ + @Test + void testNullFieldsNotSet() throws JsonProcessingException { + ObjectMapper objMapper = RestMapperUtils.getStrictObjectMapper(); + String json = + objMapper.writeValueAsString( + new ApplicationExceptionsInfoWithHistory.ExceptionInfo( + "exception name", "stacktrace", 0L)); + + assertThat(json).doesNotContain("jobId"); + } +}
