This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dc40ab70c9 [FLINK-32688][runtime] Removes deprecated 
JobExceptionsInfo. (#25248)
8dc40ab70c9 is described below

commit 8dc40ab70c991d67949d3498b26dd6661e816312
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Sep 12 19:24:02 2024 +0200

    [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248)
---
 .../src/test/resources/rest_api_v1.snapshot        |  36 ----
 .../rest/handler/job/JobExceptionsHandler.java     |  47 -----
 .../runtime/rest/messages/JobExceptionsInfo.java   | 230 ---------------------
 .../messages/JobExceptionsInfoWithHistory.java     |  32 +--
 .../rest/handler/job/JobExceptionsHandlerTest.java |  72 ++++---
 .../JobExceptionsInfoWithHistoryNoRootTest.java    |  22 --
 .../messages/JobExceptionsInfoWithHistoryTest.java |  22 --
 7 files changed, 40 insertions(+), 421 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 6fcee2ff9c1..995662900eb 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1961,42 +1961,6 @@
       "type" : "object",
       "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory",
       "properties" : {
-        "root-exception" : {
-          "type" : "string"
-        },
-        "timestamp" : {
-          "type" : "integer"
-        },
-        "all-exceptions" : {
-          "type" : "array",
-          "items" : {
-            "type" : "object",
-            "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo:ExecutionExceptionInfo",
-            "properties" : {
-              "exception" : {
-                "type" : "string"
-              },
-              "task" : {
-                "type" : "string"
-              },
-              "location" : {
-                "type" : "string"
-              },
-              "endpoint" : {
-                "type" : "string"
-              },
-              "timestamp" : {
-                "type" : "integer"
-              },
-              "taskManagerId" : {
-                "type" : "string"
-              }
-            }
-          }
-        },
-        "truncated" : {
-          "type" : "boolean"
-        },
         "exceptionHistory" : {
           "type" : "object",
           "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 6d5f49d55b4..c8580fa753b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -20,15 +20,9 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -56,7 +50,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -130,47 +123,7 @@ public class JobExceptionsHandler
             ExecutionGraphInfo executionGraphInfo,
             int exceptionToReportMaxSize,
             List<FailureLabelFilterParameter.FailureLabel> failureLabelFilter) 
{
-        final ArchivedExecutionGraph executionGraph =
-                executionGraphInfo.getArchivedExecutionGraph();
-        if (executionGraph.getFailureInfo() == null) {
-            return new JobExceptionsInfoWithHistory(
-                    createJobExceptionHistory(
-                            executionGraphInfo.getExceptionHistory(),
-                            exceptionToReportMaxSize,
-                            failureLabelFilter));
-        }
-
-        List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new 
ArrayList<>();
-        boolean truncated = false;
-        for (AccessExecutionVertex task : 
executionGraph.getAllExecutionVertices()) {
-            for (AccessExecution execution : task.getCurrentExecutions()) {
-                Optional<ErrorInfo> failure = execution.getFailureInfo();
-                if (failure.isPresent()) {
-                    if (taskExceptionList.size() >= exceptionToReportMaxSize) {
-                        truncated = true;
-                        break;
-                    }
-
-                    TaskManagerLocation location = 
execution.getAssignedResourceLocation();
-                    String locationString = toString(location);
-                    long timestamp = 
execution.getStateTimestamp(ExecutionState.FAILED);
-                    taskExceptionList.add(
-                            new JobExceptionsInfo.ExecutionExceptionInfo(
-                                    failure.get().getExceptionAsString(),
-                                    task.getTaskNameWithSubtaskIndex(),
-                                    locationString,
-                                    timestamp == 0 ? -1 : timestamp,
-                                    toTaskManagerId(location)));
-                }
-            }
-        }
-
-        final ErrorInfo rootCause = executionGraph.getFailureInfo();
         return new JobExceptionsInfoWithHistory(
-                rootCause.getExceptionAsString(),
-                rootCause.getTimestamp(),
-                taskExceptionList,
-                truncated,
                 createJobExceptionHistory(
                         executionGraphInfo.getExceptionHistory(),
                         exceptionToReportMaxSize,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
deleted file mode 100644
index 0acc64d4b47..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.util.Preconditions;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.StringJoiner;
-
-/**
- * {@code JobExceptionInfo} holds the information for single failure which 
caused a (maybe partial)
- * job restart.
- */
-public class JobExceptionsInfo {
-
-    public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception";
-    public static final String FIELD_NAME_TIMESTAMP = "timestamp";
-    public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions";
-    public static final String FIELD_NAME_TRUNCATED = "truncated";
-
-    /**
-     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
-     */
-    @Deprecated
-    @JsonProperty(FIELD_NAME_ROOT_EXCEPTION)
-    private final String rootException;
-
-    /**
-     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
-     */
-    @Deprecated
-    @JsonProperty(FIELD_NAME_TIMESTAMP)
-    private final Long rootTimestamp;
-
-    /**
-     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
-     */
-    @Deprecated
-    @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
-    private final List<ExecutionExceptionInfo> allExceptions;
-
-    /**
-     * @deprecated Use {@link 
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
-     */
-    @Deprecated
-    @JsonProperty(FIELD_NAME_TRUNCATED)
-    private final boolean truncated;
-
-    @JsonCreator
-    public JobExceptionsInfo(
-            @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
-            @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
-            @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) 
List<ExecutionExceptionInfo> allExceptions,
-            @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
-        this.rootException = rootException;
-        this.rootTimestamp = rootTimestamp;
-        this.allExceptions = Preconditions.checkNotNull(allExceptions);
-        this.truncated = truncated;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        JobExceptionsInfo that = (JobExceptionsInfo) o;
-        return truncated == that.truncated
-                && Objects.equals(rootException, that.rootException)
-                && Objects.equals(rootTimestamp, that.rootTimestamp)
-                && Objects.equals(allExceptions, that.allExceptions);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(rootException, rootTimestamp, allExceptions, 
truncated);
-    }
-
-    @Override
-    public String toString() {
-        return new StringJoiner(", ", JobExceptionsInfo.class.getSimpleName() 
+ "[", "]")
-                .add("rootException='" + rootException + "'")
-                .add("rootTimestamp=" + rootTimestamp)
-                .add("allExceptions=" + allExceptions)
-                .add("truncated=" + truncated)
-                .toString();
-    }
-
-    @JsonIgnore
-    public String getRootException() {
-        return rootException;
-    }
-
-    @JsonIgnore
-    public Long getRootTimestamp() {
-        return rootTimestamp;
-    }
-
-    @JsonIgnore
-    public List<ExecutionExceptionInfo> getAllExceptions() {
-        return allExceptions;
-    }
-
-    @JsonIgnore
-    public boolean isTruncated() {
-        return truncated;
-    }
-
-    // 
---------------------------------------------------------------------------------
-    // Static helper classes
-    // 
---------------------------------------------------------------------------------
-
-    /**
-     * Nested class to encapsulate the task execution exception.
-     *
-     * @deprecated {@code ExecutionExceptionInfo} will be replaced by {@link
-     *     JobExceptionsInfoWithHistory.ExceptionInfo} as part of the effort 
of deprecating {@link
-     *     JobExceptionsInfo#allExceptions}.
-     */
-    @Deprecated
-    public static final class ExecutionExceptionInfo {
-        public static final String FIELD_NAME_EXCEPTION = "exception";
-        public static final String FIELD_NAME_TASK = "task";
-        @Deprecated public static final String FIELD_NAME_LOCATION = 
"location";
-        public static final String FIELD_NAME_ENDPOINT = "endpoint";
-        public static final String FIELD_NAME_TIMESTAMP = "timestamp";
-        public static final String FIELD_NAME_TASK_MANAGER_ID = 
"taskManagerId";
-
-        @JsonProperty(FIELD_NAME_EXCEPTION)
-        private final String exception;
-
-        @JsonProperty(FIELD_NAME_TASK)
-        private final String task;
-
-        @JsonProperty(FIELD_NAME_LOCATION)
-        private final String location;
-
-        @JsonProperty(FIELD_NAME_ENDPOINT)
-        private final String endpoint;
-
-        @JsonProperty(FIELD_NAME_TIMESTAMP)
-        private final long timestamp;
-
-        @JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
-        private final String taskManagerId;
-
-        public ExecutionExceptionInfo(
-                String exception,
-                String task,
-                String endpoint,
-                long timestamp,
-                String taskManagerId) {
-            this(exception, task, endpoint, endpoint, timestamp, 
taskManagerId);
-        }
-
-        @JsonCreator
-        public ExecutionExceptionInfo(
-                @JsonProperty(FIELD_NAME_EXCEPTION) String exception,
-                @JsonProperty(FIELD_NAME_TASK) String task,
-                @JsonProperty(FIELD_NAME_LOCATION) String location,
-                @JsonProperty(FIELD_NAME_ENDPOINT) String endpoint,
-                @JsonProperty(FIELD_NAME_TIMESTAMP) long timestamp,
-                @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String 
taskManagerId) {
-            this.exception = Preconditions.checkNotNull(exception);
-            this.task = Preconditions.checkNotNull(task);
-            this.location = Preconditions.checkNotNull(location);
-            this.endpoint = Preconditions.checkNotNull(endpoint);
-            this.timestamp = timestamp;
-            this.taskManagerId = taskManagerId;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            JobExceptionsInfo.ExecutionExceptionInfo that =
-                    (JobExceptionsInfo.ExecutionExceptionInfo) o;
-            return timestamp == that.timestamp
-                    && Objects.equals(exception, that.exception)
-                    && Objects.equals(task, that.task)
-                    && Objects.equals(location, that.location)
-                    && Objects.equals(endpoint, that.endpoint)
-                    && Objects.equals(taskManagerId, that.taskManagerId);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(timestamp, exception, task, location, 
endpoint, taskManagerId);
-        }
-
-        @Override
-        public String toString() {
-            return new StringJoiner(", ", 
ExecutionExceptionInfo.class.getSimpleName() + "[", "]")
-                    .add("exception='" + exception + "'")
-                    .add("task='" + task + "'")
-                    .add("location='" + location + "'")
-                    .add("endpoint='" + endpoint + "'")
-                    .add("timestamp=" + timestamp)
-                    .add("taskManagerId=" + taskManagerId)
-                    .toString();
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
index cb15825cc6b..fbf17723f6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -38,10 +38,10 @@ import static 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo} 
providing a history of
- * previously caused failures. It's the response type of the {@link 
JobExceptionsHandler}.
+ * {@code JobExceptionsInfoWithHistory} providing a history of previously 
caused failures. It's the
+ * response type of the {@link JobExceptionsHandler}.
  */
-public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements 
ResponseBody {
+public class JobExceptionsInfoWithHistory implements ResponseBody {
 
     public static final String FIELD_NAME_EXCEPTION_HISTORY = 
"exceptionHistory";
 
@@ -50,19 +50,10 @@ public class JobExceptionsInfoWithHistory extends 
JobExceptionsInfo implements R
 
     @JsonCreator
     public JobExceptionsInfoWithHistory(
-            @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
-            @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
-            @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) 
List<ExecutionExceptionInfo> allExceptions,
-            @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated,
             @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory 
exceptionHistory) {
-        super(rootException, rootTimestamp, allExceptions, truncated);
         this.exceptionHistory = exceptionHistory;
     }
 
-    public JobExceptionsInfoWithHistory(JobExceptionHistory exceptionHistory) {
-        this(null, null, Collections.emptyList(), false, exceptionHistory);
-    }
-
     @JsonIgnore
     public JobExceptionHistory getExceptionHistory() {
         return exceptionHistory;
@@ -79,30 +70,17 @@ public class JobExceptionsInfoWithHistory extends 
JobExceptionsInfo implements R
             return false;
         }
         JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
-        return this.isTruncated() == that.isTruncated()
-                && Objects.equals(this.getRootException(), 
that.getRootException())
-                && Objects.equals(this.getRootTimestamp(), 
that.getRootTimestamp())
-                && Objects.equals(this.getAllExceptions(), 
that.getAllExceptions())
-                && Objects.equals(exceptionHistory, that.exceptionHistory);
+        return Objects.equals(exceptionHistory, that.exceptionHistory);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(
-                isTruncated(),
-                getRootException(),
-                getRootTimestamp(),
-                getAllExceptions(),
-                exceptionHistory);
+        return Objects.hash(exceptionHistory);
     }
 
     @Override
     public String toString() {
         return new StringJoiner(", ", 
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
-                .add("rootException='" + getRootException() + "'")
-                .add("rootTimestamp=" + getRootTimestamp())
-                .add("allExceptions=" + getAllExceptions())
-                .add("truncated=" + isTruncated())
                 .add("exceptionHistory=" + exceptionHistory)
                 .toString();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index c40bdb2c8e8..55c6d2e84ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
-import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo;
 import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
@@ -101,10 +100,6 @@ class JobExceptionsHandlerTest {
         final JobExceptionsInfoWithHistory response =
                 testInstance.handleRequest(request, executionGraphInfo);
 
-        assertThat(response.getRootException()).isNull();
-        assertThat(response.getRootTimestamp()).isNull();
-        assertThat(response.isTruncated()).isFalse();
-        assertThat(response.getAllExceptions()).isEmpty();
         assertThat(response.getExceptionHistory().getEntries()).isEmpty();
     }
 
@@ -121,12 +116,6 @@ class JobExceptionsHandlerTest {
         final JobExceptionsInfoWithHistory response =
                 testInstance.handleRequest(request, executionGraphInfo);
 
-        assertThat(response.getRootException())
-                .isEqualTo(ExceptionUtils.stringifyException(rootCause));
-        assertThat(response.getRootTimestamp()).isEqualTo(rootCauseTimestamp);
-        assertThat(response.isTruncated()).isFalse();
-        assertThat(response.getAllExceptions()).isEmpty();
-
         assertThat(response.getExceptionHistory().getEntries())
                 .satisfies(
                         matching(
@@ -148,9 +137,6 @@ class JobExceptionsHandlerTest {
         final JobExceptionsInfoWithHistory response =
                 testInstance.handleRequest(request, executionGraphInfo);
 
-        assertThat(response.getRootException()).isNull();
-        assertThat(response.getRootTimestamp()).isNull();
-
         assertThat(response.getExceptionHistory().getEntries())
                 .satisfies(
                         matching(
@@ -172,9 +158,6 @@ class JobExceptionsHandlerTest {
         final JobExceptionsInfoWithHistory response =
                 testInstance.handleRequest(request, executionGraphInfo);
 
-        assertThat(response.getRootException()).isNull();
-        assertThat(response.getRootTimestamp()).isNull();
-
         assertThat(response.getExceptionHistory().getEntries()).isEmpty();
     }
 
@@ -451,30 +434,47 @@ class JobExceptionsHandlerTest {
             throws HandlerRequestException {
         final HandlerRequest<EmptyRequestBody> handlerRequest =
                 createRequest(graph.getJobId(), numExpectedException);
-        final JobExceptionsInfo jobExceptionsInfo =
+        final JobExceptionsInfoWithHistory jobExceptionsInfo =
                 jobExceptionsHandler.handleRequest(handlerRequest, graph);
         final int numReportedException = Math.min(maxNumExceptions, 
numExpectedException);
-        
assertThat(numReportedException).isEqualTo(jobExceptionsInfo.getAllExceptions().size());
+        assertThat(numReportedException)
+                
.isEqualTo(jobExceptionsInfo.getExceptionHistory().getEntries().size());
     }
 
     private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks) 
{
+        final Throwable failureCause = new RuntimeException("root cause");
+        final long failureTimestamp = System.currentTimeMillis();
+        final List<RootExceptionHistoryEntry> exceptionHistory = new 
ArrayList<>();
+        exceptionHistory.add(
+                new RootExceptionHistoryEntry(
+                        failureCause,
+                        failureTimestamp,
+                        FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                        "test task #1",
+                        new LocalTaskManagerLocation(),
+                        Collections.emptySet()));
+
         Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
-        for (int i = 0; i < numTasks; i++) {
+        for (int i = 1; i < numTasks; i++) {
             final JobVertexID jobVertexId = new JobVertexID();
-            tasks.put(jobVertexId, 
createArchivedExecutionJobVertex(jobVertexId));
+            final String taskName = "task name #" + i;
+            final Exception otherCause = new Exception("Expected exception #" 
+ i);
+            final long otherFailureTimestamp = System.currentTimeMillis();
+            tasks.put(
+                    jobVertexId,
+                    createArchivedExecutionJobVertex(
+                            jobVertexId, taskName, otherCause, 
otherFailureTimestamp));
+
+            exceptionHistory.add(
+                    new RootExceptionHistoryEntry(
+                            otherCause,
+                            otherFailureTimestamp,
+                            FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                            taskName,
+                            new LocalTaskManagerLocation(),
+                            Collections.emptyList()));
         }
 
-        final Throwable failureCause = new RuntimeException("root cause");
-        final long failureTimestamp = System.currentTimeMillis();
-        final List<RootExceptionHistoryEntry> exceptionHistory =
-                Collections.singletonList(
-                        new RootExceptionHistoryEntry(
-                                failureCause,
-                                failureTimestamp,
-                                FailureEnricherUtils.EMPTY_FAILURE_LABELS,
-                                "test task #1",
-                                new LocalTaskManagerLocation(),
-                                Collections.emptySet()));
         return new ExecutionGraphInfo(
                 new ArchivedExecutionGraphBuilder()
                         .setFailureCause(new ErrorInfo(failureCause, 
failureTimestamp))
@@ -484,7 +484,7 @@ class JobExceptionsHandlerTest {
     }
 
     private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex(
-            JobVertexID jobVertexID) {
+            JobVertexID jobVertexID, String taskName, Exception cause, long 
failureTimestamp) {
         final StringifiedAccumulatorResult[] emptyAccumulators =
                 new StringifiedAccumulatorResult[0];
         final long[] timestamps = new long[ExecutionState.values().length];
@@ -500,15 +500,13 @@ class JobExceptionsHandlerTest {
                 new ArchivedExecutionVertex[] {
                     new ArchivedExecutionVertex(
                             subtaskIndex,
-                            "test task",
+                            taskName,
                             new ArchivedExecution(
                                     new StringifiedAccumulatorResult[0],
                                     null,
                                     createExecutionAttemptId(jobVertexID, 
subtaskIndex, attempt),
                                     expectedState,
-                                    new ErrorInfo(
-                                            new RuntimeException("error"),
-                                            System.currentTimeMillis()),
+                                    new ErrorInfo(cause, failureTimestamp),
                                     assignedResourceLocation,
                                     allocationID,
                                     timestamps,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
index 3bb7d1be0b9..ac83d0be123 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
@@ -22,10 +22,8 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtensi
 
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * Tests that the {@link JobExceptionsInfoWithHistory} with no root exception 
can be marshalled and
@@ -41,27 +39,7 @@ class JobExceptionsInfoWithHistoryNoRootTest
 
     @Override
     protected JobExceptionsInfoWithHistory getTestResponseInstance() throws 
Exception {
-        List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList =
-                new ArrayList<>();
-        executionTaskExceptionInfoList.add(
-                new JobExceptionsInfo.ExecutionExceptionInfo(
-                        "exception1",
-                        "task1",
-                        "location1",
-                        System.currentTimeMillis(),
-                        "taskManagerId1"));
-        executionTaskExceptionInfoList.add(
-                new JobExceptionsInfo.ExecutionExceptionInfo(
-                        "exception2",
-                        "task2",
-                        "location2",
-                        System.currentTimeMillis(),
-                        "taskManagerId2"));
         return new JobExceptionsInfoWithHistory(
-                null,
-                null,
-                executionTaskExceptionInfoList,
-                false,
                 new JobExceptionsInfoWithHistory.JobExceptionHistory(
                         Arrays.asList(
                                 new 
JobExceptionsInfoWithHistory.RootExceptionInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
index 04e4eaeb3fa..20098d97622 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
@@ -27,9 +27,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -44,27 +42,7 @@ class JobExceptionsInfoWithHistoryTest
 
     @Override
     protected JobExceptionsInfoWithHistory getTestResponseInstance() throws 
Exception {
-        List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList =
-                new ArrayList<>();
-        executionTaskExceptionInfoList.add(
-                new JobExceptionsInfo.ExecutionExceptionInfo(
-                        "exception1",
-                        "task1",
-                        "location1",
-                        System.currentTimeMillis(),
-                        "taskManagerId1"));
-        executionTaskExceptionInfoList.add(
-                new JobExceptionsInfo.ExecutionExceptionInfo(
-                        "exception2",
-                        "task2",
-                        "location2",
-                        System.currentTimeMillis(),
-                        "taskManagerId2"));
         return new JobExceptionsInfoWithHistory(
-                "root exception",
-                System.currentTimeMillis(),
-                executionTaskExceptionInfoList,
-                false,
                 new JobExceptionsInfoWithHistory.JobExceptionHistory(
                         Collections.emptyList(), false));
     }

Reply via email to