This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc40ab70c9 [FLINK-32688][runtime] Removes deprecated
JobExceptionsInfo. (#25248)
8dc40ab70c9 is described below
commit 8dc40ab70c991d67949d3498b26dd6661e816312
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Sep 12 19:24:02 2024 +0200
[FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248)
---
.../src/test/resources/rest_api_v1.snapshot | 36 ----
.../rest/handler/job/JobExceptionsHandler.java | 47 -----
.../runtime/rest/messages/JobExceptionsInfo.java | 230 ---------------------
.../messages/JobExceptionsInfoWithHistory.java | 32 +--
.../rest/handler/job/JobExceptionsHandlerTest.java | 72 ++++---
.../JobExceptionsInfoWithHistoryNoRootTest.java | 22 --
.../messages/JobExceptionsInfoWithHistoryTest.java | 22 --
7 files changed, 40 insertions(+), 421 deletions(-)
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 6fcee2ff9c1..995662900eb 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1961,42 +1961,6 @@
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory",
"properties" : {
- "root-exception" : {
- "type" : "string"
- },
- "timestamp" : {
- "type" : "integer"
- },
- "all-exceptions" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo:ExecutionExceptionInfo",
- "properties" : {
- "exception" : {
- "type" : "string"
- },
- "task" : {
- "type" : "string"
- },
- "location" : {
- "type" : "string"
- },
- "endpoint" : {
- "type" : "string"
- },
- "timestamp" : {
- "type" : "integer"
- },
- "taskManagerId" : {
- "type" : "string"
- }
- }
- }
- },
- "truncated" : {
- "type" : "boolean"
- },
"exceptionHistory" : {
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 6d5f49d55b4..c8580fa753b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -20,15 +20,9 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -56,7 +50,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -130,47 +123,7 @@ public class JobExceptionsHandler
ExecutionGraphInfo executionGraphInfo,
int exceptionToReportMaxSize,
List<FailureLabelFilterParameter.FailureLabel> failureLabelFilter)
{
- final ArchivedExecutionGraph executionGraph =
- executionGraphInfo.getArchivedExecutionGraph();
- if (executionGraph.getFailureInfo() == null) {
- return new JobExceptionsInfoWithHistory(
- createJobExceptionHistory(
- executionGraphInfo.getExceptionHistory(),
- exceptionToReportMaxSize,
- failureLabelFilter));
- }
-
- List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new
ArrayList<>();
- boolean truncated = false;
- for (AccessExecutionVertex task :
executionGraph.getAllExecutionVertices()) {
- for (AccessExecution execution : task.getCurrentExecutions()) {
- Optional<ErrorInfo> failure = execution.getFailureInfo();
- if (failure.isPresent()) {
- if (taskExceptionList.size() >= exceptionToReportMaxSize) {
- truncated = true;
- break;
- }
-
- TaskManagerLocation location =
execution.getAssignedResourceLocation();
- String locationString = toString(location);
- long timestamp =
execution.getStateTimestamp(ExecutionState.FAILED);
- taskExceptionList.add(
- new JobExceptionsInfo.ExecutionExceptionInfo(
- failure.get().getExceptionAsString(),
- task.getTaskNameWithSubtaskIndex(),
- locationString,
- timestamp == 0 ? -1 : timestamp,
- toTaskManagerId(location)));
- }
- }
- }
-
- final ErrorInfo rootCause = executionGraph.getFailureInfo();
return new JobExceptionsInfoWithHistory(
- rootCause.getExceptionAsString(),
- rootCause.getTimestamp(),
- taskExceptionList,
- truncated,
createJobExceptionHistory(
executionGraphInfo.getExceptionHistory(),
exceptionToReportMaxSize,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
deleted file mode 100644
index 0acc64d4b47..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.util.Preconditions;
-
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.StringJoiner;
-
-/**
- * {@code JobExceptionInfo} holds the information for single failure which
caused a (maybe partial)
- * job restart.
- */
-public class JobExceptionsInfo {
-
- public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception";
- public static final String FIELD_NAME_TIMESTAMP = "timestamp";
- public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions";
- public static final String FIELD_NAME_TRUNCATED = "truncated";
-
- /**
- * @deprecated Use {@link
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
- */
- @Deprecated
- @JsonProperty(FIELD_NAME_ROOT_EXCEPTION)
- private final String rootException;
-
- /**
- * @deprecated Use {@link
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
- */
- @Deprecated
- @JsonProperty(FIELD_NAME_TIMESTAMP)
- private final Long rootTimestamp;
-
- /**
- * @deprecated Use {@link
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
- */
- @Deprecated
- @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
- private final List<ExecutionExceptionInfo> allExceptions;
-
- /**
- * @deprecated Use {@link
JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead.
- */
- @Deprecated
- @JsonProperty(FIELD_NAME_TRUNCATED)
- private final boolean truncated;
-
- @JsonCreator
- public JobExceptionsInfo(
- @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
- @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
- @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
List<ExecutionExceptionInfo> allExceptions,
- @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
- this.rootException = rootException;
- this.rootTimestamp = rootTimestamp;
- this.allExceptions = Preconditions.checkNotNull(allExceptions);
- this.truncated = truncated;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- JobExceptionsInfo that = (JobExceptionsInfo) o;
- return truncated == that.truncated
- && Objects.equals(rootException, that.rootException)
- && Objects.equals(rootTimestamp, that.rootTimestamp)
- && Objects.equals(allExceptions, that.allExceptions);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(rootException, rootTimestamp, allExceptions,
truncated);
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ", JobExceptionsInfo.class.getSimpleName()
+ "[", "]")
- .add("rootException='" + rootException + "'")
- .add("rootTimestamp=" + rootTimestamp)
- .add("allExceptions=" + allExceptions)
- .add("truncated=" + truncated)
- .toString();
- }
-
- @JsonIgnore
- public String getRootException() {
- return rootException;
- }
-
- @JsonIgnore
- public Long getRootTimestamp() {
- return rootTimestamp;
- }
-
- @JsonIgnore
- public List<ExecutionExceptionInfo> getAllExceptions() {
- return allExceptions;
- }
-
- @JsonIgnore
- public boolean isTruncated() {
- return truncated;
- }
-
- //
---------------------------------------------------------------------------------
- // Static helper classes
- //
---------------------------------------------------------------------------------
-
- /**
- * Nested class to encapsulate the task execution exception.
- *
- * @deprecated {@code ExecutionExceptionInfo} will be replaced by {@link
- * JobExceptionsInfoWithHistory.ExceptionInfo} as part of the effort
of deprecating {@link
- * JobExceptionsInfo#allExceptions}.
- */
- @Deprecated
- public static final class ExecutionExceptionInfo {
- public static final String FIELD_NAME_EXCEPTION = "exception";
- public static final String FIELD_NAME_TASK = "task";
- @Deprecated public static final String FIELD_NAME_LOCATION =
"location";
- public static final String FIELD_NAME_ENDPOINT = "endpoint";
- public static final String FIELD_NAME_TIMESTAMP = "timestamp";
- public static final String FIELD_NAME_TASK_MANAGER_ID =
"taskManagerId";
-
- @JsonProperty(FIELD_NAME_EXCEPTION)
- private final String exception;
-
- @JsonProperty(FIELD_NAME_TASK)
- private final String task;
-
- @JsonProperty(FIELD_NAME_LOCATION)
- private final String location;
-
- @JsonProperty(FIELD_NAME_ENDPOINT)
- private final String endpoint;
-
- @JsonProperty(FIELD_NAME_TIMESTAMP)
- private final long timestamp;
-
- @JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
- private final String taskManagerId;
-
- public ExecutionExceptionInfo(
- String exception,
- String task,
- String endpoint,
- long timestamp,
- String taskManagerId) {
- this(exception, task, endpoint, endpoint, timestamp,
taskManagerId);
- }
-
- @JsonCreator
- public ExecutionExceptionInfo(
- @JsonProperty(FIELD_NAME_EXCEPTION) String exception,
- @JsonProperty(FIELD_NAME_TASK) String task,
- @JsonProperty(FIELD_NAME_LOCATION) String location,
- @JsonProperty(FIELD_NAME_ENDPOINT) String endpoint,
- @JsonProperty(FIELD_NAME_TIMESTAMP) long timestamp,
- @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String
taskManagerId) {
- this.exception = Preconditions.checkNotNull(exception);
- this.task = Preconditions.checkNotNull(task);
- this.location = Preconditions.checkNotNull(location);
- this.endpoint = Preconditions.checkNotNull(endpoint);
- this.timestamp = timestamp;
- this.taskManagerId = taskManagerId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- JobExceptionsInfo.ExecutionExceptionInfo that =
- (JobExceptionsInfo.ExecutionExceptionInfo) o;
- return timestamp == that.timestamp
- && Objects.equals(exception, that.exception)
- && Objects.equals(task, that.task)
- && Objects.equals(location, that.location)
- && Objects.equals(endpoint, that.endpoint)
- && Objects.equals(taskManagerId, that.taskManagerId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timestamp, exception, task, location,
endpoint, taskManagerId);
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ",
ExecutionExceptionInfo.class.getSimpleName() + "[", "]")
- .add("exception='" + exception + "'")
- .add("task='" + task + "'")
- .add("location='" + location + "'")
- .add("endpoint='" + endpoint + "'")
- .add("timestamp=" + timestamp)
- .add("taskManagerId=" + taskManagerId)
- .toString();
- }
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
index cb15825cc6b..fbf17723f6e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -38,10 +38,10 @@ import static
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo}
providing a history of
- * previously caused failures. It's the response type of the {@link
JobExceptionsHandler}.
+ * {@code JobExceptionsInfoWithHistory} providing a history of previously
caused failures. It's the
+ * response type of the {@link JobExceptionsHandler}.
*/
-public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements
ResponseBody {
+public class JobExceptionsInfoWithHistory implements ResponseBody {
public static final String FIELD_NAME_EXCEPTION_HISTORY =
"exceptionHistory";
@@ -50,19 +50,10 @@ public class JobExceptionsInfoWithHistory extends
JobExceptionsInfo implements R
@JsonCreator
public JobExceptionsInfoWithHistory(
- @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
- @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
- @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
List<ExecutionExceptionInfo> allExceptions,
- @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated,
@JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory
exceptionHistory) {
- super(rootException, rootTimestamp, allExceptions, truncated);
this.exceptionHistory = exceptionHistory;
}
- public JobExceptionsInfoWithHistory(JobExceptionHistory exceptionHistory) {
- this(null, null, Collections.emptyList(), false, exceptionHistory);
- }
-
@JsonIgnore
public JobExceptionHistory getExceptionHistory() {
return exceptionHistory;
@@ -79,30 +70,17 @@ public class JobExceptionsInfoWithHistory extends
JobExceptionsInfo implements R
return false;
}
JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
- return this.isTruncated() == that.isTruncated()
- && Objects.equals(this.getRootException(),
that.getRootException())
- && Objects.equals(this.getRootTimestamp(),
that.getRootTimestamp())
- && Objects.equals(this.getAllExceptions(),
that.getAllExceptions())
- && Objects.equals(exceptionHistory, that.exceptionHistory);
+ return Objects.equals(exceptionHistory, that.exceptionHistory);
}
@Override
public int hashCode() {
- return Objects.hash(
- isTruncated(),
- getRootException(),
- getRootTimestamp(),
- getAllExceptions(),
- exceptionHistory);
+ return Objects.hash(exceptionHistory);
}
@Override
public String toString() {
return new StringJoiner(", ",
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
- .add("rootException='" + getRootException() + "'")
- .add("rootTimestamp=" + getRootTimestamp())
- .add("allExceptions=" + getAllExceptions())
- .add("truncated=" + isTruncated())
.add("exceptionHistory=" + exceptionHistory)
.toString();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index c40bdb2c8e8..55c6d2e84ee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -37,7 +37,6 @@ import
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
-import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo;
import
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
@@ -101,10 +100,6 @@ class JobExceptionsHandlerTest {
final JobExceptionsInfoWithHistory response =
testInstance.handleRequest(request, executionGraphInfo);
- assertThat(response.getRootException()).isNull();
- assertThat(response.getRootTimestamp()).isNull();
- assertThat(response.isTruncated()).isFalse();
- assertThat(response.getAllExceptions()).isEmpty();
assertThat(response.getExceptionHistory().getEntries()).isEmpty();
}
@@ -121,12 +116,6 @@ class JobExceptionsHandlerTest {
final JobExceptionsInfoWithHistory response =
testInstance.handleRequest(request, executionGraphInfo);
- assertThat(response.getRootException())
- .isEqualTo(ExceptionUtils.stringifyException(rootCause));
- assertThat(response.getRootTimestamp()).isEqualTo(rootCauseTimestamp);
- assertThat(response.isTruncated()).isFalse();
- assertThat(response.getAllExceptions()).isEmpty();
-
assertThat(response.getExceptionHistory().getEntries())
.satisfies(
matching(
@@ -148,9 +137,6 @@ class JobExceptionsHandlerTest {
final JobExceptionsInfoWithHistory response =
testInstance.handleRequest(request, executionGraphInfo);
- assertThat(response.getRootException()).isNull();
- assertThat(response.getRootTimestamp()).isNull();
-
assertThat(response.getExceptionHistory().getEntries())
.satisfies(
matching(
@@ -172,9 +158,6 @@ class JobExceptionsHandlerTest {
final JobExceptionsInfoWithHistory response =
testInstance.handleRequest(request, executionGraphInfo);
- assertThat(response.getRootException()).isNull();
- assertThat(response.getRootTimestamp()).isNull();
-
assertThat(response.getExceptionHistory().getEntries()).isEmpty();
}
@@ -451,30 +434,47 @@ class JobExceptionsHandlerTest {
throws HandlerRequestException {
final HandlerRequest<EmptyRequestBody> handlerRequest =
createRequest(graph.getJobId(), numExpectedException);
- final JobExceptionsInfo jobExceptionsInfo =
+ final JobExceptionsInfoWithHistory jobExceptionsInfo =
jobExceptionsHandler.handleRequest(handlerRequest, graph);
final int numReportedException = Math.min(maxNumExceptions,
numExpectedException);
-
assertThat(numReportedException).isEqualTo(jobExceptionsInfo.getAllExceptions().size());
+ assertThat(numReportedException)
+
.isEqualTo(jobExceptionsInfo.getExceptionHistory().getEntries().size());
}
private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks)
{
+ final Throwable failureCause = new RuntimeException("root cause");
+ final long failureTimestamp = System.currentTimeMillis();
+ final List<RootExceptionHistoryEntry> exceptionHistory = new
ArrayList<>();
+ exceptionHistory.add(
+ new RootExceptionHistoryEntry(
+ failureCause,
+ failureTimestamp,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ "test task #1",
+ new LocalTaskManagerLocation(),
+ Collections.emptySet()));
+
Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
- for (int i = 0; i < numTasks; i++) {
+ for (int i = 1; i < numTasks; i++) {
final JobVertexID jobVertexId = new JobVertexID();
- tasks.put(jobVertexId,
createArchivedExecutionJobVertex(jobVertexId));
+ final String taskName = "task name #" + i;
+ final Exception otherCause = new Exception("Expected exception #"
+ i);
+ final long otherFailureTimestamp = System.currentTimeMillis();
+ tasks.put(
+ jobVertexId,
+ createArchivedExecutionJobVertex(
+ jobVertexId, taskName, otherCause,
otherFailureTimestamp));
+
+ exceptionHistory.add(
+ new RootExceptionHistoryEntry(
+ otherCause,
+ otherFailureTimestamp,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ taskName,
+ new LocalTaskManagerLocation(),
+ Collections.emptyList()));
}
- final Throwable failureCause = new RuntimeException("root cause");
- final long failureTimestamp = System.currentTimeMillis();
- final List<RootExceptionHistoryEntry> exceptionHistory =
- Collections.singletonList(
- new RootExceptionHistoryEntry(
- failureCause,
- failureTimestamp,
- FailureEnricherUtils.EMPTY_FAILURE_LABELS,
- "test task #1",
- new LocalTaskManagerLocation(),
- Collections.emptySet()));
return new ExecutionGraphInfo(
new ArchivedExecutionGraphBuilder()
.setFailureCause(new ErrorInfo(failureCause,
failureTimestamp))
@@ -484,7 +484,7 @@ class JobExceptionsHandlerTest {
}
private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex(
- JobVertexID jobVertexID) {
+ JobVertexID jobVertexID, String taskName, Exception cause, long
failureTimestamp) {
final StringifiedAccumulatorResult[] emptyAccumulators =
new StringifiedAccumulatorResult[0];
final long[] timestamps = new long[ExecutionState.values().length];
@@ -500,15 +500,13 @@ class JobExceptionsHandlerTest {
new ArchivedExecutionVertex[] {
new ArchivedExecutionVertex(
subtaskIndex,
- "test task",
+ taskName,
new ArchivedExecution(
new StringifiedAccumulatorResult[0],
null,
createExecutionAttemptId(jobVertexID,
subtaskIndex, attempt),
expectedState,
- new ErrorInfo(
- new RuntimeException("error"),
- System.currentTimeMillis()),
+ new ErrorInfo(cause, failureTimestamp),
assignedResourceLocation,
allocationID,
timestamps,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
index 3bb7d1be0b9..ac83d0be123 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java
@@ -22,10 +22,8 @@ import
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtensi
import org.junit.jupiter.api.extension.ExtendWith;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
/**
* Tests that the {@link JobExceptionsInfoWithHistory} with no root exception
can be marshalled and
@@ -41,27 +39,7 @@ class JobExceptionsInfoWithHistoryNoRootTest
@Override
protected JobExceptionsInfoWithHistory getTestResponseInstance() throws
Exception {
- List<JobExceptionsInfo.ExecutionExceptionInfo>
executionTaskExceptionInfoList =
- new ArrayList<>();
- executionTaskExceptionInfoList.add(
- new JobExceptionsInfo.ExecutionExceptionInfo(
- "exception1",
- "task1",
- "location1",
- System.currentTimeMillis(),
- "taskManagerId1"));
- executionTaskExceptionInfoList.add(
- new JobExceptionsInfo.ExecutionExceptionInfo(
- "exception2",
- "task2",
- "location2",
- System.currentTimeMillis(),
- "taskManagerId2"));
return new JobExceptionsInfoWithHistory(
- null,
- null,
- executionTaskExceptionInfoList,
- false,
new JobExceptionsInfoWithHistory.JobExceptionHistory(
Arrays.asList(
new
JobExceptionsInfoWithHistory.RootExceptionInfo(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
index 04e4eaeb3fa..20098d97622 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java
@@ -27,9 +27,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -44,27 +42,7 @@ class JobExceptionsInfoWithHistoryTest
@Override
protected JobExceptionsInfoWithHistory getTestResponseInstance() throws
Exception {
- List<JobExceptionsInfo.ExecutionExceptionInfo>
executionTaskExceptionInfoList =
- new ArrayList<>();
- executionTaskExceptionInfoList.add(
- new JobExceptionsInfo.ExecutionExceptionInfo(
- "exception1",
- "task1",
- "location1",
- System.currentTimeMillis(),
- "taskManagerId1"));
- executionTaskExceptionInfoList.add(
- new JobExceptionsInfo.ExecutionExceptionInfo(
- "exception2",
- "task2",
- "location2",
- System.currentTimeMillis(),
- "taskManagerId2"));
return new JobExceptionsInfoWithHistory(
- "root exception",
- System.currentTimeMillis(),
- executionTaskExceptionInfoList,
- false,
new JobExceptionsInfoWithHistory.JobExceptionHistory(
Collections.emptyList(), false));
}