zentol commented on a change in pull request #15049:
URL: https://github.com/apache/flink/pull/15049#discussion_r592520040
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -764,9 +768,17 @@ public final void notifyPartitionDataAvailable(final
ResultPartitionID partition
protected void notifyPartitionDataAvailableInternal(
IntermediateResultPartitionID resultPartitionId) {}
+ /**
+ * Returns a copy of the current history of task failures.
+ *
+ * @return a copy of the current history of task failures.
+ */
@VisibleForTesting
- protected List<ErrorInfo> getExceptionHistory() {
- return taskFailureHistory;
+ protected Iterable<ExceptionHistoryEntry> getExceptionHistory() {
+ final Collection<ExceptionHistoryEntry> copy = new ArrayList<>();
Review comment:
```suggestion
final Collection<ExceptionHistoryEntry> copy = new
ArrayList<>(exceptionHistory.size());
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** {@code ExceptionHistoryEntryTest} tests the instantiation of {@link
ExceptionHistoryEntry}. */
+public class ExceptionHistoryEntryTest extends TestLogger {
+
+ @Test
+ public void testFromGlobalFailure() {
+ final Throwable failureCause = new RuntimeException("failure cause");
+ final long timestamp = System.currentTimeMillis();
+
+ final ExceptionHistoryEntry testInstance =
+ ExceptionHistoryEntry.fromGlobalFailure(failureCause,
timestamp);
+
+ assertThat(
+
testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ is(failureCause));
+ assertThat(testInstance.getTimestamp(), is(timestamp));
+ assertThat(testInstance.getFailingTaskName(), is(nullValue()));
+ assertThat(testInstance.getAssignedResourceLocation(),
is(nullValue()));
+ }
+
+ @Test
+ public void testFromFailedExecution() {
+ final Throwable failureCause = new RuntimeException("Expected
failure");
+ final long failureTimestamp = System.currentTimeMillis();
+ final String taskNameWithSubTaskIndex = "task name";
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+
+ final ExceptionHistoryEntry testInstance =
+ ExceptionHistoryEntry.fromFailedExecution(
+ createExecutionWithFailure(
+ failureCause, failureTimestamp,
taskManagerLocation),
+ taskNameWithSubTaskIndex);
+
+ assertThat(
+
testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ is(failureCause));
+ assertThat(testInstance.getTimestamp(), is(failureTimestamp));
+ assertThat(testInstance.getFailingTaskName(),
is(taskNameWithSubTaskIndex));
+ assertThat(testInstance.getAssignedResourceLocation(),
is(taskManagerLocation));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromFailedExecutionWithoutFailure() {
+
ExceptionHistoryEntry.fromFailedExecution(createExecutionWithoutFailure(),
"task name");
+ }
+
+ /**
+ * Creates a {@link TestingExecution} that overwrites the relevant methods
to emulate a {@link
+ * AccessExecution} without a failure.
+ *
+ * @return The {@code TestingExecution} instance.
+ */
+ private static TestingExecution createExecutionWithoutFailure() {
+ return new TestingExecution(null, new LocalTaskManagerLocation());
Review comment:
we could inline this method
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
##########
@@ -154,4 +342,285 @@ private static ArchivedExecutionJobVertex
createArchivedExecutionJobVertex(
pathParameters,
queryParameters);
}
+
+ // -------- factory methods for instantiating new Matchers --------
+
+ private static Matcher<JobExceptionsInfoWithHistory>
historyContainsJobExceptionInfo(
+ Throwable expectedFailureCause,
+ long expectedFailureTimestamp,
+ String expectedTaskNameWithSubtaskId,
+ String expectedTaskManagerLocation) {
+ return new JobExceptionInfoMatcher(
+ expectedFailureCause,
+ expectedFailureTimestamp,
+ expectedTaskNameWithSubtaskId,
+ expectedTaskManagerLocation);
+ }
+
+ private static Matcher<JobExceptionsInfoWithHistory> containsGlobalFailure(
+ Throwable expectedFailureCause, long expectedFailureTimestamp) {
+ return historyContainsJobExceptionInfo(
+ expectedFailureCause,
+ expectedFailureTimestamp,
+ JobExceptionsHandler.FALLBACK_TASK_NAME,
+ JobExceptionsHandler.FALLBACK_LOCATION_NAME);
+ }
+
+ private static Matcher<JobExceptionsInfoWithHistory> containsRootCause(
+ Throwable expectedRootCause, long expectedRootCauseTimestamp) {
+ return new JobExceptionRootCauseMatcher(expectedRootCause,
expectedRootCauseTimestamp);
+ }
+
+ private static Matcher<JobExceptionsInfoWithHistory> hasHistoryEntryCount(
+ int expectedEntryCount) {
+ return new SingleValueMatcher<>(
+ o -> o.getExceptionHistory().getEntries().size(),
+ "history entry count",
+ expectedEntryCount);
+ }
+
+ private static Matcher<JobExceptionsInfoWithHistory> hasTruncatedHistory()
{
+ return new SingleValueMatcher<>(
+ o -> o.getExceptionHistory().isTruncated(), "history
truncated", true);
+ }
+
+ // -------- Matcher implementations used in this test class --------
+
+ /**
+ * Generic implementation checking a single value of the passed {@link
+ * JobExceptionsInfoWithHistory} instance.
+ *
+ * @param <T> The type of the value that's going to be checked
+ */
+ private static class SingleValueMatcher<T> extends
JobExceptionsHandlerMatcher {
+
+ private final Function<JobExceptionsInfoWithHistory, T> extractor;
+ private final T expected;
+
+ private SingleValueMatcher(
+ Function<JobExceptionsInfoWithHistory, T> extractor,
+ String propertyName,
+ T expected) {
+ super(propertyName, expected);
+ this.extractor = extractor;
+ this.expected = expected;
+ }
+
+ @Override
+ protected boolean matchesSafely(
+ JobExceptionsInfoWithHistory response, Description
description) {
+ final T actual = extractor.apply(response);
+ return matchesWithDescription(
+ response, description, r -> actual.equals(expected),
actual);
+ }
+ }
+
+ /** Checks whether root-exception, root-exception-name, and timestamp are
set properly. */
+ private static class JobExceptionRootCauseMatcher extends
JobExceptionsHandlerMatcher {
+
+ private final Throwable expectedFailureCause;
+ private final long expectedFailureTimestamp;
+
+ private JobExceptionRootCauseMatcher(
+ Throwable expectedFailureCause, long expectedFailureTimestamp)
{
+ super("root-exception", expectedFailureCause, "timestamp",
expectedFailureTimestamp);
+ this.expectedFailureCause =
deserializeSerializedThrowable(expectedFailureCause);
+ this.expectedFailureTimestamp = expectedFailureTimestamp;
+ }
+
+ @Override
+ protected boolean matchesSafely(
+ JobExceptionsInfoWithHistory response, Description
description) {
+ return matchesRootException(response, description)
+ && matchesRootExceptionTimestamp(response, description);
+ }
+
+ private boolean matchesRootExceptionTimestamp(
+ JobExceptionsInfoWithHistory response, Description
description) {
+ return matchesWithDescription(
+ response,
+ description,
+ info -> info.getRootTimestamp() ==
expectedFailureTimestamp,
+ expectedFailureTimestamp);
+ }
+
+ private boolean matchesRootException(
+ JobExceptionsInfoWithHistory response, Description
description) {
+ return matchesWithDescription(
+ response,
+ description,
+ info -> {
+ final String expectedExceptionStr =
+
ExceptionUtils.stringifyException(expectedFailureCause);
+ return
info.getRootException().equals(expectedExceptionStr)
+ && info.getRootExceptionName()
+
.equals(expectedFailureCause.getClass().getName());
+ },
+ expectedFailureCause);
+ }
+ }
+
+ /**
+ * Checks whether a {@link JobExceptionsInfo} entry with the given
attributes exists in the
+ * exception-history.entries field.
+ */
+ private static class JobExceptionInfoMatcher extends
JobExceptionsHandlerMatcher {
+
+ // the current implementation does not consider other exceptions
besides the root cause
+ private static final boolean DEFAULT_IS_TRUNCATED = false;
+
+ private final Throwable expectedFailureCause;
+ private final long expectedFailureTimestamp;
+ private final String expectedTaskNameWithSubtaskId;
+ private final String expectedTaskManagerLocation;
+
+ public JobExceptionInfoMatcher(
+ Throwable expectedFailureCause,
+ long expectedFailureTimestamp,
+ String expectedTaskNameWithSubtaskId,
+ String expectedTaskManagerLocation) {
+ super(
+ "failureCause",
+ expectedFailureCause,
+ "failureTimestamp",
+ expectedFailureTimestamp,
+ "taskName",
+ expectedTaskNameWithSubtaskId,
+ "taskManagerLocation",
+ expectedTaskManagerLocation,
+ "isTruncated",
+ DEFAULT_IS_TRUNCATED);
+ this.expectedFailureCause =
deserializeSerializedThrowable(expectedFailureCause);
+ this.expectedFailureTimestamp = expectedFailureTimestamp;
+ this.expectedTaskNameWithSubtaskId = expectedTaskNameWithSubtaskId;
+ this.expectedTaskManagerLocation = expectedTaskManagerLocation;
+ }
+
+ @Override
+ protected boolean matchesSafely(
+ JobExceptionsInfoWithHistory response, Description
description) {
+ return matchesWithDescription(
+ response,
+ description,
+ r ->
+ r.getExceptionHistory().getEntries().stream()
+ .anyMatch(
+ info ->
+
matchesAllExceptionsSize(info)
+ &&
matchesTruncated(info)
+ &&
matchesRootException(info)
+ &&
matchesRootExceptionTimestamp(info)
+ &&
matchesTaskManagerLocation(info)
+ &&
matchesTaskName(info)),
+ response.getExceptionHistory().getEntries());
+ }
+
+ private boolean matchesAllExceptionsSize(JobExceptionsInfo info) {
+ return info.getAllExceptions().size() == 1;
+ }
+
+ private boolean matchesTruncated(JobExceptionsInfo info) {
+ return info.isTruncated() == DEFAULT_IS_TRUNCATED;
+ }
+
+ private boolean matchesRootException(JobExceptionsInfo info) {
+ final String expectedExceptionStr =
+ ExceptionUtils.stringifyException(expectedFailureCause);
+ return info.getRootException().equals(expectedExceptionStr)
+ &&
info.getRootExceptionName().equals(expectedFailureCause.getClass().getName())
+ &&
getOnlyEntry(info).getException().equals(expectedExceptionStr);
+ }
+
+ private boolean matchesRootExceptionTimestamp(JobExceptionsInfo info) {
+ return info.getRootTimestamp() == expectedFailureTimestamp
+ && getOnlyEntry(info).getTimestamp() ==
expectedFailureTimestamp;
+ }
+
+ private boolean matchesTaskManagerLocation(JobExceptionsInfo info) {
+ return
getOnlyEntry(info).getLocation().equals(expectedTaskManagerLocation);
+ }
+
+ private boolean matchesTaskName(JobExceptionsInfo info) {
+ return
getOnlyEntry(info).getTask().equals(expectedTaskNameWithSubtaskId);
+ }
+
+ private JobExceptionsInfo.ExecutionExceptionInfo
getOnlyEntry(JobExceptionsInfo info) {
+ return info.getAllExceptions().get(0);
+ }
+ }
+
+ /**
+ * Generic parent class collecting common utility functionality for all
Matcher implementations
+ * used in this test.
+ */
+ private abstract static class JobExceptionsHandlerMatcher
+ extends TypeSafeDiagnosingMatcher<JobExceptionsInfoWithHistory> {
+
+ private final String genericExpectedMessage;
+
+ /**
+ * Initializes the expected values using the {@code keyValuePairs}
array.
+ *
+ * @param keyValuePairs The arrays is expected to have an even number
of elements. Odd
+ * elements are considered the attribute name whereas even
elements are the
+ * corresponding value.
+ */
+ protected JobExceptionsHandlerMatcher(Object... keyValuePairs) {
+ Preconditions.checkArgument(keyValuePairs.length % 2 == 0);
+
+ final StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < keyValuePairs.length; i += 2) {
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(", ");
+ }
+
stringBuilder.append(keyValuePairs[i]).append("=").append(keyValuePairs[i + 1]);
+ }
+ this.genericExpectedMessage = stringBuilder.toString().trim();
+ }
+
+ @Override
Review comment:
why are we overriding this method here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@code ExceptionHistoryEntry} collects information about a single task
failure that should be
Review comment:
misleading since it is not necessarily about a _task_ failure
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/** {@code BoundedFIFOQueueTest} tests {@link BoundedFIFOQueue}. */
+public class BoundedFIFOQueueTest extends TestLogger {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConstructorFailing() {
+ new BoundedFIFOQueue<>(-1);
+ }
+
+ /** Tests the behavior of {@link BoundedFIFOQueue#iterator()} with a
{@code maxSize} of 0. */
Review comment:
this doesn't really add value
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+/**
+ * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo}
providing a history of
+ * previously caused failures. It's the response type of the {@link
JobExceptionsHandler}.
+ */
+public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements
ResponseBody {
+
+ public static final String FIELD_NAME_EXCEPTION_HISTORY =
"exception-history";
+
+ @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+ private final JobExceptionHistory exceptionHistory;
+
+ @JsonCreator
+ public JobExceptionsInfoWithHistory(
+ @JsonProperty(FIELD_NAME_ROOT_EXCEPTION_NAME) String
rootExceptionName,
+ @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
+ @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
+ @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
List<ExecutionExceptionInfo> allExceptions,
+ @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated,
+ @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory
exceptionHistory) {
+ super(rootExceptionName, rootException, rootTimestamp, allExceptions,
truncated);
+ this.exceptionHistory = exceptionHistory;
+ }
+
+ public JobExceptionsInfoWithHistory() {
+ this(
+ null,
+ null,
+ null,
+ Collections.emptyList(),
+ false,
+ new JobExceptionHistory(Collections.emptyList(), false));
+ }
+
+ @JsonIgnore
+ public JobExceptionHistory getExceptionHistory() {
+ return exceptionHistory;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
+ return this.isTruncated() == that.isTruncated()
+ && Objects.equals(this.getRootExceptionName(),
that.getRootExceptionName())
+ && Objects.equals(this.getRootException(),
that.getRootException())
+ && Objects.equals(this.getRootTimestamp(),
that.getRootTimestamp())
+ && Objects.equals(this.getAllExceptions(),
that.getAllExceptions())
+ && Objects.equals(exceptionHistory, that.exceptionHistory);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ isTruncated(),
+ getRootExceptionName(),
+ getRootException(),
+ getRootTimestamp(),
+ getAllExceptions(),
+ exceptionHistory);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
+ .add("exceptionHistory=" + exceptionHistory)
+ .toString();
+ }
+
+ /** {@code JobExceptionHistory} collects all previously caught errors. */
+ public static final class JobExceptionHistory {
+
+ public static final String FIELD_NAME_ENTRIES = "entries";
+ public static final String FIELD_NAME_MAX_SIZE_REACHED =
"max-size-reached";
+ public static final String FIELD_NAME_TRUNCATED = "truncated";
+
+ @JsonProperty(FIELD_NAME_ENTRIES)
+ private final List<JobExceptionsInfo> entries;
+
+ @JsonProperty(FIELD_NAME_TRUNCATED)
+ private final boolean truncated;
+
+ @JsonCreator
+ public JobExceptionHistory(
+ @JsonProperty(FIELD_NAME_ENTRIES) List<JobExceptionsInfo>
entries,
+ @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
+ this.entries = entries;
+ this.truncated = truncated;
+ }
+
+ @JsonIgnore
+ public List<JobExceptionsInfo> getEntries() {
+ return entries;
+ }
+
+ @JsonIgnore
+ public boolean isTruncated() {
+ return truncated;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobExceptionHistory that = (JobExceptionHistory) o;
+ return this.isTruncated() == that.isTruncated()
+ && Objects.equals(entries, that.entries);
+ }
+
+ @Override
+ public int hashCode() {
Review comment:
do we actually need this?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@code ExceptionHistoryEntry} collects information about a single task
failure that should be
+ * exposed through the exception history.
+ */
+public class ExceptionHistoryEntry extends ErrorInfo {
+
+ private static final long serialVersionUID = -3855285510064263701L;
+
+ @Nullable private final String failingTaskName;
+ @Nullable private final TaskManagerLocation assignedResourceLocation;
Review comment:
maybe consider immediately extracting all required information from the
TaskManagerLocation. There is no guarantee that serialized InetAddress (which
the TML contains) returns the same hostName/FQDN as the original.
(Even if it did, it's work we don't need to do (InetAddresses cache a bunch
of stuff in transient fields))
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** {@code ExceptionHistoryEntryTest} tests the instantiation of {@link
ExceptionHistoryEntry}. */
+public class ExceptionHistoryEntryTest extends TestLogger {
+
+ @Test
+ public void testFromGlobalFailure() {
+ final Throwable failureCause = new RuntimeException("failure cause");
+ final long timestamp = System.currentTimeMillis();
+
+ final ExceptionHistoryEntry testInstance =
+ ExceptionHistoryEntry.fromGlobalFailure(failureCause,
timestamp);
+
+ assertThat(
+
testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ is(failureCause));
+ assertThat(testInstance.getTimestamp(), is(timestamp));
+ assertThat(testInstance.getFailingTaskName(), is(nullValue()));
+ assertThat(testInstance.getAssignedResourceLocation(),
is(nullValue()));
+ }
+
+ @Test
+ public void testFromFailedExecution() {
+ final Throwable failureCause = new RuntimeException("Expected
failure");
+ final long failureTimestamp = System.currentTimeMillis();
+ final String taskNameWithSubTaskIndex = "task name";
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+
+ final ExceptionHistoryEntry testInstance =
+ ExceptionHistoryEntry.fromFailedExecution(
+ createExecutionWithFailure(
+ failureCause, failureTimestamp,
taskManagerLocation),
+ taskNameWithSubTaskIndex);
+
+ assertThat(
+
testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ is(failureCause));
+ assertThat(testInstance.getTimestamp(), is(failureTimestamp));
+ assertThat(testInstance.getFailingTaskName(),
is(taskNameWithSubTaskIndex));
+ assertThat(testInstance.getAssignedResourceLocation(),
is(taskManagerLocation));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromFailedExecutionWithoutFailure() {
+
ExceptionHistoryEntry.fromFailedExecution(createExecutionWithoutFailure(),
"task name");
+ }
+
+ /**
+ * Creates a {@link TestingExecution} that overwrites the relevant methods
to emulate a {@link
+ * AccessExecution} without a failure.
+ *
+ * @return The {@code TestingExecution} instance.
+ */
+ private static TestingExecution createExecutionWithoutFailure() {
+ return new TestingExecution(null, new LocalTaskManagerLocation());
+ }
+
+ /**
+ * Creates a {@link TestingExecution} that overwrites the relevant methods
to emulate a {@link
+ * AccessExecution} with a failure.
+ *
+ * @return The {@code TestingExecution} instance.
+ * @param failureCause the actual failure cause
+ * @param failureTimestamp the time of the failure
+ * @param taskManagerLocation the {@link TaskManagerLocation} the {@code
Execution} is supposed
+ * to run on
+ */
+ private static TestingExecution createExecutionWithFailure(
+ Throwable failureCause,
+ long failureTimestamp,
+ TaskManagerLocation taskManagerLocation) {
+ return new TestingExecution(
Review comment:
same as above
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
##########
@@ -37,68 +36,231 @@
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
+import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import static
org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.taskManagerLocationToString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
/** Test for the {@link JobExceptionsHandler}. */
public class JobExceptionsHandlerTest extends TestLogger {
+ private final JobExceptionsHandler testInstance =
+ new JobExceptionsHandler(
+ CompletableFuture::new,
+ TestingUtils.TIMEOUT(),
+ Collections.emptyMap(),
+ JobExceptionsHeaders.getInstance(),
+ new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(),
TestingUtils.TIMEOUT()),
+ TestingUtils.defaultExecutor());
+
+ @Test
+ public void testNoExceptions() throws HandlerRequestException {
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(new
ArchivedExecutionGraphBuilder().build());
+
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response.getRootException(), is(nullValue()));
+ assertThat(response.getRootTimestamp(), is(nullValue()));
+ assertFalse(response.isTruncated());
+ assertThat(response.getAllExceptions(), empty());
+ assertThat(response, hasHistoryEntryCount(0));
+ }
+
+ @Test
+ public void testOnlyRootCause() throws HandlerRequestException {
+ final Throwable rootCause = new RuntimeException("root cause");
+ final long rootCauseTimestamp = System.currentTimeMillis();
+
+ final ExecutionGraphInfo executionGraphInfo =
+ createExecutionGraphInfo(
+ ExceptionHistoryEntry.fromGlobalFailure(rootCause,
rootCauseTimestamp));
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause, rootCauseTimestamp));
+ assertFalse(response.isTruncated());
+ assertThat(response.getAllExceptions(), empty());
+
+ assertThat(response, hasHistoryEntryCount(1));
+ assertThat(response, containsGlobalFailure(rootCause,
rootCauseTimestamp));
+ }
+
+ @Test
+ public void testWithExceptionHistory() throws HandlerRequestException {
+ final ExceptionHistoryEntry rootCause =
+ ExceptionHistoryEntry.fromGlobalFailure(
+ new RuntimeException("exception #0"),
System.currentTimeMillis());
+ final ExceptionHistoryEntry otherFailure =
+ new ExceptionHistoryEntry(
+ new RuntimeException("exception #1"),
+ System.currentTimeMillis(),
+ "task name",
+ new LocalTaskManagerLocation());
+
+ final ExecutionGraphInfo executionGraphInfo =
+ createExecutionGraphInfo(rootCause, otherFailure);
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ containsGlobalFailure(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ historyContainsJobExceptionInfo(
+ otherFailure.getException(),
+ otherFailure.getTimestamp(),
+ otherFailure.getFailingTaskName(),
+
taskManagerLocationToString(otherFailure.getAssignedResourceLocation())));
+ assertThat(response, hasHistoryEntryCount(2));
+ assertThat(response, not(hasTruncatedHistory()));
+ }
+
+ @Test
+ public void testWithExceptionHistoryWithTruncationThroughParameter()
+ throws HandlerRequestException {
+ final ExceptionHistoryEntry rootCause =
+ ExceptionHistoryEntry.fromGlobalFailure(
+ new RuntimeException("exception #0"),
System.currentTimeMillis());
+ final ExceptionHistoryEntry otherFailure =
+ new ExceptionHistoryEntry(
+ new RuntimeException("exception #1"),
+ System.currentTimeMillis(),
+ "task name",
+ new LocalTaskManagerLocation());
+
+ final ExecutionGraphInfo executionGraphInfo =
+ createExecutionGraphInfo(rootCause, otherFailure);
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
request =
+ createRequest(executionGraphInfo.getJobId(), 1);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(response, containsRootCause(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(
+ response,
+ containsGlobalFailure(rootCause.getException(),
rootCause.getTimestamp()));
+ assertThat(response, hasHistoryEntryCount(1));
+ assertThat(response, hasTruncatedHistory());
+ }
+
+ @Test
+ public void testTaskNameFallbackHandling() {
+ assertThat(
+ JobExceptionsHandler.taskNameToString(null),
+ is(JobExceptionsHandler.FALLBACK_TASK_NAME));
+ }
+
+ @Test
+ public void testTaskNameHandling() {
+ final String taskName = "task name";
+ assertThat(JobExceptionsHandler.taskNameToString(taskName),
is(taskName));
+ }
+
+ @Test
+ public void testTaskManagerLocationFallbackHandling() {
+ assertThat(
+ taskManagerLocationToString(null),
is(JobExceptionsHandler.FALLBACK_LOCATION_NAME));
+ }
+
+ @Test
+ public void testTaskManagerLocationHandling() {
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ assertThat(
+ taskManagerLocationToString(taskManagerLocation),
+ is(
+ String.format(
+ "%s:%s",
+ taskManagerLocation.getFQDNHostname(),
+ taskManagerLocation.dataPort())));
+ }
+
@Test
public void testGetJobExceptionsInfo() throws HandlerRequestException {
- final JobExceptionsHandler jobExceptionsHandler =
- new JobExceptionsHandler(
- () -> null,
- TestingUtils.TIMEOUT(),
- Collections.emptyMap(),
- JobExceptionsHeaders.getInstance(),
- new DefaultExecutionGraphCache(
- TestingUtils.TIMEOUT(),
TestingUtils.TIMEOUT()),
- TestingUtils.defaultExecutor());
final int numExceptions = 20;
- final AccessExecutionGraph archivedExecutionGraph =
- createAccessExecutionGraph(numExceptions);
- checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph,
numExceptions, 10);
- checkExceptionLimit(
- jobExceptionsHandler, archivedExecutionGraph, numExceptions,
numExceptions);
- checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph,
numExceptions, 30);
+ final ExecutionGraphInfo archivedExecutionGraph =
createAccessExecutionGraph(numExceptions);
+ checkExceptionLimit(testInstance, archivedExecutionGraph,
numExceptions, 10);
+ checkExceptionLimit(testInstance, archivedExecutionGraph,
numExceptions, numExceptions);
+ checkExceptionLimit(testInstance, archivedExecutionGraph,
numExceptions, 30);
}
private static void checkExceptionLimit(
JobExceptionsHandler jobExceptionsHandler,
- AccessExecutionGraph graph,
+ ExecutionGraphInfo graph,
int maxNumExceptions,
int numExpectedException)
throws HandlerRequestException {
final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters>
handlerRequest =
- createRequest(graph.getJobID(), numExpectedException);
+ createRequest(graph.getJobId(), numExpectedException);
final JobExceptionsInfo jobExceptionsInfo =
jobExceptionsHandler.handleRequest(handlerRequest, graph);
- final int numReportedException =
- maxNumExceptions >= numExpectedException ?
numExpectedException : maxNumExceptions;
+ final int numReportedException = Math.min(maxNumExceptions,
numExpectedException);
assertEquals(jobExceptionsInfo.getAllExceptions().size(),
numReportedException);
}
- private static AccessExecutionGraph createAccessExecutionGraph(int
numTasks) {
+ private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks)
{
Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
for (int i = 0; i < numTasks; i++) {
final JobVertexID jobVertexId = new JobVertexID();
tasks.put(jobVertexId,
createArchivedExecutionJobVertex(jobVertexId));
}
- return new ArchivedExecutionGraphBuilder().setTasks(tasks).build();
+
+ final Throwable failureCause = new RuntimeException("root cause");
+ final long failureTimestamp = System.currentTimeMillis();
+ final List<ExceptionHistoryEntry> exceptionHistory = new ArrayList<>();
Review comment:
could use Arrays.asList
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -132,7 +142,62 @@ private static JobExceptionsInfo createJobExceptionsInfo(
}
}
+ final ErrorInfo rootCause = executionGraph.getFailureInfo();
+ return new JobExceptionsInfoWithHistory(
+ rootCause.getException().getOriginalErrorClassName(),
+ rootCause.getExceptionAsString(),
+ rootCause.getTimestamp(),
+ taskExceptionList,
+ truncated,
+ createJobExceptionHistory(
+ executionGraphInfo.getExceptionHistory(),
exceptionToReportMaxSize));
+ }
+
+ static JobExceptionsInfoWithHistory.JobExceptionHistory
createJobExceptionHistory(
+ Iterable<ExceptionHistoryEntry> historyEntries, int limit) {
+ // we need to reverse the history to have a stable result when doing
paging on it
+ final List<ExceptionHistoryEntry> reversedHistoryEntries = new
ArrayList<>();
+ Iterables.addAll(reversedHistoryEntries, historyEntries);
+ Collections.reverse(reversedHistoryEntries);
+
+ List<JobExceptionsInfo> exceptionHistoryEntries =
+ reversedHistoryEntries.stream()
+ .limit(limit)
+ .map(JobExceptionsHandler::createJobExceptionInfo)
+ .collect(Collectors.toList());
+
+ return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+ exceptionHistoryEntries,
+ exceptionHistoryEntries.size() <
reversedHistoryEntries.size());
+ }
+
+ private static JobExceptionsInfo
createJobExceptionInfo(ExceptionHistoryEntry historyEntry) {
return new JobExceptionsInfo(
- rootExceptionMessage, rootTimestamp, taskExceptionList,
truncated);
+ historyEntry.getException().getOriginalErrorClassName(),
+ historyEntry.getExceptionAsString(),
+ historyEntry.getTimestamp(),
+ Collections.singletonList(
Review comment:
Why is this not an empty list in case no task was involved in the
failure?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+/**
+ * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo}
providing a history of
+ * previously caused failures. It's the response type of the {@link
JobExceptionsHandler}.
+ */
+public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements
ResponseBody {
+
+ public static final String FIELD_NAME_EXCEPTION_HISTORY =
"exception-history";
+
+ @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
+ private final JobExceptionHistory exceptionHistory;
+
+ @JsonCreator
+ public JobExceptionsInfoWithHistory(
+ @JsonProperty(FIELD_NAME_ROOT_EXCEPTION_NAME) String
rootExceptionName,
+ @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
+ @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
+ @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
List<ExecutionExceptionInfo> allExceptions,
+ @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated,
+ @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory
exceptionHistory) {
+ super(rootExceptionName, rootException, rootTimestamp, allExceptions,
truncated);
+ this.exceptionHistory = exceptionHistory;
+ }
+
+ public JobExceptionsInfoWithHistory() {
+ this(
+ null,
+ null,
+ null,
+ Collections.emptyList(),
+ false,
+ new JobExceptionHistory(Collections.emptyList(), false));
+ }
+
+ @JsonIgnore
+ public JobExceptionHistory getExceptionHistory() {
+ return exceptionHistory;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
+ return this.isTruncated() == that.isTruncated()
+ && Objects.equals(this.getRootExceptionName(),
that.getRootExceptionName())
+ && Objects.equals(this.getRootException(),
that.getRootException())
+ && Objects.equals(this.getRootTimestamp(),
that.getRootTimestamp())
+ && Objects.equals(this.getAllExceptions(),
that.getAllExceptions())
+ && Objects.equals(exceptionHistory, that.exceptionHistory);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ isTruncated(),
+ getRootExceptionName(),
+ getRootException(),
+ getRootTimestamp(),
+ getAllExceptions(),
+ exceptionHistory);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
+ .add("exceptionHistory=" + exceptionHistory)
+ .toString();
+ }
+
+ /** {@code JobExceptionHistory} collects all previously caught errors. */
+ public static final class JobExceptionHistory {
+
+ public static final String FIELD_NAME_ENTRIES = "entries";
+ public static final String FIELD_NAME_MAX_SIZE_REACHED =
"max-size-reached";
Review comment:
unused; also use camelCase for field names
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * {@code BoundedFIFOQueue} collects elements up to given amount. Reaching
this limit will result in
+ * removing the oldest element from this queue (First-In/First-Out; FIFO).
+ *
+ * @param <T> The type of elements collected.
+ */
+public class BoundedFIFOQueue<T> implements Iterable<T>, Serializable {
+
+ private static final long serialVersionUID = -890727339944580409L;
+
+ private final int maxSize;
+ private final Queue<T> elements;
+
+ /**
+ * Creates a {@code BoundedFIFOQueue} with the given maximum size.
+ *
+ * @param maxSize The maximum size of this queue. Exceeding this limit
would result in removing
+ * the oldest element (FIFO).
+ * @throws IllegalArgumentException If {@code maxSize} is less than 0.
+ */
+ public BoundedFIFOQueue(int maxSize) {
+ Preconditions.checkArgument(maxSize >= 0, "The maximum size should be
at least 0.");
+
+ this.maxSize = maxSize;
+ this.elements = new LinkedList<>();
+ }
+
+ /**
+ * Adds an element to the end of the queue. An element will be removed
from the head of the
+ * queue if the queue would exceed its maximum size by adding the new
element.
+ *
+ * @param element The element that should be added to the end of the queue.
+ * @throws NullPointerException If {@code null} is passed as an element.
+ */
+ public void add(T element) {
+ if (elements.add(Preconditions.checkNotNull(element)) &&
elements.size() > maxSize) {
Review comment:
nit: I'd move the preconditions check out of the condition
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -667,16 +672,15 @@ protected final void archiveFromFailureHandlingResult(
if (executionOptional.isPresent()) {
final Execution failedExecution = executionOptional.get();
- failedExecution
- .getFailureInfo()
- .ifPresent(
- failureInfo -> {
- taskFailureHistory.add(failureInfo);
- log.debug(
- "Archive local failure causing attempt
{} to fail: {}",
- failedExecution.getAttemptId(),
- failureInfo.getExceptionAsString());
- });
+ final ExceptionHistoryEntry exceptionHistoryEntry =
+ ExceptionHistoryEntry.fromFailedExecution(
+ executionOptional.get(),
+
executionOptional.get().getVertex().getTaskNameWithSubtaskIndex());
Review comment:
```suggestion
failedExecution,
failedExecution.getVertex().getTaskNameWithSubtaskIndex());
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]