zentol commented on a change in pull request #15049:
URL: https://github.com/apache/flink/pull/15049#discussion_r590166713



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * {@code BoundedFIFOQueue} collects elements up to given amount. Reaching 
this limit will result in
+ * removing the oldest element from this queue (First-In/First-Out; FIFO).
+ *
+ * @param <T> The type of elements collected.
+ */
+public class BoundedFIFOQueue<T> implements Iterable<T>, Serializable {
+
+    private static final long serialVersionUID = -890727339944580409L;
+
+    private final int maxSize;
+    private final Queue<T> elements;
+
+    private int elementCount;
+
+    /**
+     * Creates an immutable copy of the passed {@code BoundedFIFOQueue}.
+     *
+     * @param boundedFIFOQueue The queue for which an immutable copy shall be 
created.
+     * @param <E> The type this queue collects.
+     * @return An immutable copy of the passed {@code BoundedFIFOQueue}.
+     */
+    public static <E> BoundedFIFOQueue<E> immutableCopy(BoundedFIFOQueue<E> 
boundedFIFOQueue) {
+        return new BoundedFIFOQueue<E>(
+                boundedFIFOQueue.maxSize,
+                boundedFIFOQueue.elements,
+                boundedFIFOQueue.elementCount) {
+            private static final long serialVersionUID = -2763611608109517436L;
+
+            @Override
+            public void add(E element) {
+                throw new UnsupportedOperationException(
+                        "This BoundedFIFOQueue is immutable. add is not 
supported.");
+            }
+        };
+    }
+
+    /**
+     * Creates a {@code BoundedFIFOQueue} with the given maximum size.
+     *
+     * @param maxSize The maximum size of this queue. Exceeding this limit 
would result in removing
+     *     the oldest element (FIFO).
+     * @throws IllegalArgumentException If {@code maxSize} is less than 0.
+     */
+    public BoundedFIFOQueue(int maxSize) {
+        this(maxSize, new LinkedList<>(), 0);
+    }
+
+    /**
+     * Creates full {@code BoundedFIFOQueue} from passed {@link Collection}.
+     *
+     * @param initialElements the initial elements the queue is filled with.
+     */
+    @VisibleForTesting
+    public BoundedFIFOQueue(Collection<T> initialElements) {
+        this(initialElements.size(), initialElements, initialElements.size());
+    }
+
+    private BoundedFIFOQueue(int maxSize, Iterable<T> initialElements, int 
initialElementCount) {
+        Preconditions.checkArgument(maxSize >= 0, "The maximum size should be 
at least 0.");
+
+        this.maxSize = maxSize;
+        this.elements = new LinkedList<>();
+        initialElements.forEach(elements::add);
+        this.elementCount = initialElementCount;
+    }
+
+    /**
+     * Adds an element to the end of the queue. An element will be removed 
from the head of the
+     * queue if the queue would exceed its maximum size by adding the new 
element.
+     *
+     * @param element The element that should be added to the end of the queue.
+     * @throws NullPointerException If {@code null} is passed as an element.
+     */
+    public void add(T element) {
+        elementCount++;

Review comment:
       This should only be done after the checkNotNull precondition has passed

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -132,7 +143,61 @@ private static JobExceptionsInfo createJobExceptionsInfo(
             }
         }
 
+        final ErrorInfo rootCause = executionGraph.getFailureInfo();
+        return new JobExceptionsInfoWithHistory(
+                rootCause.getException().getOriginalErrorClassName(),
+                rootCause.getExceptionAsString(),
+                rootCause.getTimestamp(),
+                taskExceptionList,
+                truncated,
+                createJobExceptionHistory(
+                        executionGraphInfo.getExceptionHistory(), 
exceptionToReportMaxSize));
+    }
+
+    static JobExceptionsInfoWithHistory.JobExceptionHistory 
createJobExceptionHistory(
+            BoundedFIFOQueue<ExceptionHistoryEntry> historyEntries, int limit) 
{
+        // we need to reverse the history to have a stable result when doing 
paging on it
+        final List<ExceptionHistoryEntry> reversedHistoryEntries = new 
ArrayList<>();
+        Iterables.addAll(reversedHistoryEntries, historyEntries);
+        Collections.reverse(reversedHistoryEntries);
+
+        List<JobExceptionsInfo> exceptionHistoryEntries =
+                reversedHistoryEntries.stream()
+                        .limit(limit)
+                        .map(JobExceptionsHandler::createJobExceptionInfo)
+                        .collect(Collectors.toList());
+
+        return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                exceptionHistoryEntries,
+                historyEntries.getElementCount(),
+                exceptionHistoryEntries.size() < historyEntries.size());
+    }
+
+    private static JobExceptionsInfo 
createJobExceptionInfo(ExceptionHistoryEntry historyEntry) {
         return new JobExceptionsInfo(
-                rootExceptionMessage, rootTimestamp, taskExceptionList, 
truncated);
+                historyEntry.getException().getOriginalErrorClassName(),
+                historyEntry.getExceptionAsString(),
+                historyEntry.getTimestamp(),
+                Collections.singletonList(
+                        new JobExceptionsInfo.ExecutionExceptionInfo(
+                                historyEntry.getExceptionAsString(),
+                                
taskNameToString(historyEntry.getFailingTaskName()),
+                                taskManagerLocationToString(
+                                        
historyEntry.getAssignedResourceLocation()),
+                                historyEntry.getTimestamp())),
+                false);
+    }
+
+    static String taskNameToString(String taskName) {

Review comment:
       `@VisibleForTesting`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * {@code BoundedFIFOQueue} collects elements up to given amount. Reaching 
this limit will result in
+ * removing the oldest element from this queue (First-In/First-Out; FIFO).
+ *
+ * @param <T> The type of elements collected.
+ */
+public class BoundedFIFOQueue<T> implements Iterable<T>, Serializable {
+
+    private static final long serialVersionUID = -890727339944580409L;
+
+    private final int maxSize;
+    private final Queue<T> elements;
+
+    private int elementCount;
+
+    /**
+     * Creates an immutable copy of the passed {@code BoundedFIFOQueue}.
+     *
+     * @param boundedFIFOQueue The queue for which an immutable copy shall be 
created.
+     * @param <E> The type this queue collects.
+     * @return An immutable copy of the passed {@code BoundedFIFOQueue}.
+     */
+    public static <E> BoundedFIFOQueue<E> immutableCopy(BoundedFIFOQueue<E> 
boundedFIFOQueue) {
+        return new BoundedFIFOQueue<E>(
+                boundedFIFOQueue.maxSize,
+                boundedFIFOQueue.elements,
+                boundedFIFOQueue.elementCount) {
+            private static final long serialVersionUID = -2763611608109517436L;
+
+            @Override
+            public void add(E element) {

Review comment:
       this is pretty jank; if some consumer is not allowed to add elements 
then he just should not have access to this method. (for example, by just 
returning an iterable)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * {@code BoundedFIFOQueue} collects elements up to given amount. Reaching 
this limit will result in
+ * removing the oldest element from this queue (First-In/First-Out; FIFO).
+ *
+ * @param <T> The type of elements collected.
+ */
+public class BoundedFIFOQueue<T> implements Iterable<T>, Serializable {
+
+    private static final long serialVersionUID = -890727339944580409L;
+
+    private final int maxSize;
+    private final Queue<T> elements;
+
+    private int elementCount;
+
+    /**
+     * Creates an immutable copy of the passed {@code BoundedFIFOQueue}.
+     *
+     * @param boundedFIFOQueue The queue for which an immutable copy shall be 
created.
+     * @param <E> The type this queue collects.
+     * @return An immutable copy of the passed {@code BoundedFIFOQueue}.
+     */
+    public static <E> BoundedFIFOQueue<E> immutableCopy(BoundedFIFOQueue<E> 
boundedFIFOQueue) {
+        return new BoundedFIFOQueue<E>(
+                boundedFIFOQueue.maxSize,
+                boundedFIFOQueue.elements,
+                boundedFIFOQueue.elementCount) {
+            private static final long serialVersionUID = -2763611608109517436L;
+
+            @Override
+            public void add(E element) {
+                throw new UnsupportedOperationException(
+                        "This BoundedFIFOQueue is immutable. add is not 
supported.");
+            }
+        };
+    }
+
+    /**
+     * Creates a {@code BoundedFIFOQueue} with the given maximum size.
+     *
+     * @param maxSize The maximum size of this queue. Exceeding this limit 
would result in removing
+     *     the oldest element (FIFO).
+     * @throws IllegalArgumentException If {@code maxSize} is less than 0.
+     */
+    public BoundedFIFOQueue(int maxSize) {
+        this(maxSize, new LinkedList<>(), 0);
+    }
+
+    /**
+     * Creates full {@code BoundedFIFOQueue} from passed {@link Collection}.
+     *
+     * @param initialElements the initial elements the queue is filled with.
+     */
+    @VisibleForTesting
+    public BoundedFIFOQueue(Collection<T> initialElements) {
+        this(initialElements.size(), initialElements, initialElements.size());
+    }
+
+    private BoundedFIFOQueue(int maxSize, Iterable<T> initialElements, int 
initialElementCount) {
+        Preconditions.checkArgument(maxSize >= 0, "The maximum size should be 
at least 0.");
+
+        this.maxSize = maxSize;
+        this.elements = new LinkedList<>();
+        initialElements.forEach(elements::add);
+        this.elementCount = initialElementCount;
+    }

Review comment:
       The initialElementCount argument seems unnecessary; `this.elementCount = 
this.elements.size();` would do the trick already and properly handle cases 
where the number of initial elements exceeds maxSize.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** {@code BoundedFIFOTest} tests {@link BoundedFIFOQueue}. */
+public class BoundedFIFOQueueTest extends TestLogger {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorFailing() {
+        new BoundedFIFOQueue<>(-1);
+    }
+
+    @Test
+    public void testCollectionConstructor() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(Arrays.asList(1, 2));
+        assertThat(testInstance, contains(1, 2));
+        assertThat(testInstance.size(), is(2));
+        assertThat(testInstance.getElementCount(), is(2));
+    }
+
+    @Test
+    public void testEmptyQueue() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance, iterableWithSize(0));
+        testInstance.add(1);
+        assertThat(testInstance, iterableWithSize(0));
+    }
+
+    @Test
+    public void testQueue() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
+        assertThat(testInstance, iterableWithSize(0));
+
+        testInstance.add(1);
+        assertThat(testInstance, contains(1));
+
+        testInstance.add(2);
+        assertThat(testInstance, contains(1, 2));
+
+        testInstance.add(3);
+        assertThat(testInstance, contains(2, 3));
+    }
+
+    @Test
+    public void testElementCount() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance.getElementCount(), is(0));
+
+        testInstance.add(1);
+        assertThat(testInstance.getElementCount(), is(1));
+    }
+
+    @Test
+    public void testSize() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance.size(), is(0));
+
+        testInstance.add(1);
+        assertThat(testInstance.size(), is(0));

Review comment:
       doesn't seem to actually be testing size() since the queue is always 
empty

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -132,7 +143,61 @@ private static JobExceptionsInfo createJobExceptionsInfo(
             }
         }
 
+        final ErrorInfo rootCause = executionGraph.getFailureInfo();
+        return new JobExceptionsInfoWithHistory(
+                rootCause.getException().getOriginalErrorClassName(),
+                rootCause.getExceptionAsString(),
+                rootCause.getTimestamp(),
+                taskExceptionList,
+                truncated,
+                createJobExceptionHistory(
+                        executionGraphInfo.getExceptionHistory(), 
exceptionToReportMaxSize));
+    }
+
+    static JobExceptionsInfoWithHistory.JobExceptionHistory 
createJobExceptionHistory(
+            BoundedFIFOQueue<ExceptionHistoryEntry> historyEntries, int limit) 
{
+        // we need to reverse the history to have a stable result when doing 
paging on it
+        final List<ExceptionHistoryEntry> reversedHistoryEntries = new 
ArrayList<>();
+        Iterables.addAll(reversedHistoryEntries, historyEntries);
+        Collections.reverse(reversedHistoryEntries);
+
+        List<JobExceptionsInfo> exceptionHistoryEntries =
+                reversedHistoryEntries.stream()
+                        .limit(limit)
+                        .map(JobExceptionsHandler::createJobExceptionInfo)
+                        .collect(Collectors.toList());
+
+        return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                exceptionHistoryEntries,
+                historyEntries.getElementCount(),
+                exceptionHistoryEntries.size() < historyEntries.size());
+    }
+
+    private static JobExceptionsInfo 
createJobExceptionInfo(ExceptionHistoryEntry historyEntry) {
         return new JobExceptionsInfo(
-                rootExceptionMessage, rootTimestamp, taskExceptionList, 
truncated);
+                historyEntry.getException().getOriginalErrorClassName(),
+                historyEntry.getExceptionAsString(),
+                historyEntry.getTimestamp(),
+                Collections.singletonList(
+                        new JobExceptionsInfo.ExecutionExceptionInfo(
+                                historyEntry.getExceptionAsString(),
+                                
taskNameToString(historyEntry.getFailingTaskName()),
+                                taskManagerLocationToString(
+                                        
historyEntry.getAssignedResourceLocation()),
+                                historyEntry.getTimestamp())),
+                false);
+    }
+
+    static String taskNameToString(String taskName) {
+        return taskName == null ? FALLBACK_TASK_NAME : taskName;
+    }
+
+    static String taskManagerLocationToString(TaskManagerLocation 
taskManagerLocation) {

Review comment:
       `@VisibleForTesting`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
##########
@@ -402,11 +405,36 @@ public boolean matches(Object o) {
                             
thatExecutionGraph.getCheckpointCoordinatorConfiguration())
                     && thisExecutionGraph.getAllVertices().size()
                             == thatExecutionGraph.getAllVertices().size()
-                    && Objects.equals(
+                    && equals(
                             expectedExecutionGraphInfo.getExceptionHistory(),
                             that.getExceptionHistory());
         }
 
+        private static boolean equals(
+                BoundedFIFOQueue<ExceptionHistoryEntry> left,
+                BoundedFIFOQueue<ExceptionHistoryEntry> right) {
+            if (left.getElementCount() != right.getElementCount() || 
left.size() != right.size()) {
+                return false;
+            }
+
+            final Iterator<ExceptionHistoryEntry> leftIt = left.iterator();
+            final Iterator<ExceptionHistoryEntry> rightIt = right.iterator();
+            while (true) {
+                if (leftIt.hasNext() ^ rightIt.hasNext()) {
+                    return false;
+                } else if (!leftIt.hasNext() && !rightIt.hasNext()) {
+                    return true;
+                }

Review comment:
       I would use `leftIt.hasNext() && rightIt.hasNext()` as the loop 
condition, and check leftIt.hasNext() ^ rightIt.hasNext() afterwards.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** {@code BoundedFIFOTest} tests {@link BoundedFIFOQueue}. */

Review comment:
       BoundedFIFOTest does not exist

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -46,20 +52,27 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /** Handler serving the job exceptions. */
 public class JobExceptionsHandler
-        extends AbstractAccessExecutionGraphHandler<
-                JobExceptionsInfo, JobExceptionsMessageParameters>
-        implements OnlyExecutionGraphJsonArchivist {
+        extends AbstractExecutionGraphHandler<
+                JobExceptionsInfoWithHistory, JobExceptionsMessageParameters>
+        implements JsonArchivist {
+
+    static final String FALLBACK_TASK_NAME = "(global failure)";
+    static final String FALLBACK_LOCATION_NAME = "(unassigned)";

Review comment:
       `@VisibleForTesting`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -132,7 +143,61 @@ private static JobExceptionsInfo createJobExceptionsInfo(
             }
         }
 
+        final ErrorInfo rootCause = executionGraph.getFailureInfo();
+        return new JobExceptionsInfoWithHistory(
+                rootCause.getException().getOriginalErrorClassName(),
+                rootCause.getExceptionAsString(),
+                rootCause.getTimestamp(),
+                taskExceptionList,
+                truncated,
+                createJobExceptionHistory(
+                        executionGraphInfo.getExceptionHistory(), 
exceptionToReportMaxSize));
+    }
+
+    static JobExceptionsInfoWithHistory.JobExceptionHistory 
createJobExceptionHistory(
+            BoundedFIFOQueue<ExceptionHistoryEntry> historyEntries, int limit) 
{
+        // we need to reverse the history to have a stable result when doing 
paging on it
+        final List<ExceptionHistoryEntry> reversedHistoryEntries = new 
ArrayList<>();
+        Iterables.addAll(reversedHistoryEntries, historyEntries);
+        Collections.reverse(reversedHistoryEntries);
+
+        List<JobExceptionsInfo> exceptionHistoryEntries =
+                reversedHistoryEntries.stream()
+                        .limit(limit)
+                        .map(JobExceptionsHandler::createJobExceptionInfo)
+                        .collect(Collectors.toList());
+
+        return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                exceptionHistoryEntries,
+                historyEntries.getElementCount(),

Review comment:
       what purpose is the total exception count supposed to fulfill?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
##########
@@ -70,14 +80,32 @@ public boolean equals(Object o) {
         }
         JobExceptionsInfo that = (JobExceptionsInfo) o;
         return truncated == that.truncated
+                && Objects.equals(rootExceptionName, that.rootExceptionName)
                 && Objects.equals(rootException, that.rootException)
                 && Objects.equals(rootTimestamp, that.rootTimestamp)
                 && Objects.equals(allExceptions, that.allExceptions);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(rootException, rootTimestamp, allExceptions, 
truncated);
+        return Objects.hash(
+                rootExceptionName, rootException, rootTimestamp, 
allExceptions, truncated);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)

Review comment:
       why not just use the generated version from IntelliJ?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** {@code BoundedFIFOTest} tests {@link BoundedFIFOQueue}. */
+public class BoundedFIFOQueueTest extends TestLogger {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorFailing() {
+        new BoundedFIFOQueue<>(-1);
+    }
+
+    @Test
+    public void testCollectionConstructor() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(Arrays.asList(1, 2));
+        assertThat(testInstance, contains(1, 2));
+        assertThat(testInstance.size(), is(2));
+        assertThat(testInstance.getElementCount(), is(2));
+    }
+
+    @Test
+    public void testEmptyQueue() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance, iterableWithSize(0));
+        testInstance.add(1);
+        assertThat(testInstance, iterableWithSize(0));

Review comment:
       This seems to not be testing the behavior of an emptyQueue, but the 
behavior of queue with maxSize==0

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@code ExceptionHistoryEntry} collects information about a single task 
failure that should be
+ * exposed through the exception history.
+ */
+public class ExceptionHistoryEntry extends ErrorInfo {
+
+    private static final long serialVersionUID = -3855285510064263701L;
+
+    @Nullable private final String failingTaskName;
+    @Nullable private final TaskManagerLocation assignedResourceLocation;
+
+    public static ExceptionHistoryEntry fromGlobalFailure(Throwable cause, 
long timestamp) {
+        return new ExceptionHistoryEntry(cause, timestamp, null, null);
+    }
+
+    public static ExceptionHistoryEntry fromFailedExecution(Execution 
execution) {

Review comment:
       You could make your life easier by working against the AccessExecution 
interface and separately passing in the task name.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
##########
@@ -37,68 +36,258 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
+import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BoundedFIFOQueue;
 import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
+import static 
org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.taskManagerLocationToString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 
 /** Test for the {@link JobExceptionsHandler}. */
 public class JobExceptionsHandlerTest extends TestLogger {
 
+    private final JobExceptionsHandler testInstance =
+            new JobExceptionsHandler(
+                    CompletableFuture::new,
+                    TestingUtils.TIMEOUT(),
+                    Collections.emptyMap(),
+                    JobExceptionsHeaders.getInstance(),
+                    new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
+                    TestingUtils.defaultExecutor());
+
+    @Test
+    public void testNoExceptions() throws HandlerRequestException {
+        final ExecutionGraphInfo executionGraphInfo =
+                new ExecutionGraphInfo(new 
ArchivedExecutionGraphBuilder().build());
+
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response.getRootException(), is(nullValue()));
+        assertThat(response.getRootTimestamp(), is(nullValue()));
+        assertFalse(response.isTruncated());
+        assertThat(response.getAllExceptions(), empty());
+        assertThat(response, hasHistoryEntryCount(0));
+    }
+
+    @Test
+    public void testOnlyRootCause() throws HandlerRequestException {
+        final Throwable rootCause = new RuntimeException("root cause");
+        final long rootCauseTimestamp = System.currentTimeMillis();
+
+        final ExecutionGraphInfo executionGraphInfo =
+                createExecutionGraphInfo(
+                        ExceptionHistoryEntry.fromGlobalFailure(rootCause, 
rootCauseTimestamp));
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response, containsRootCause(rootCause, rootCauseTimestamp));
+        assertFalse(response.isTruncated());
+        assertThat(response.getAllExceptions(), empty());
+
+        assertThat(response, hasHistoryEntryCount(1));
+        assertThat(response, containsGlobalFailure(rootCause, 
rootCauseTimestamp));
+    }
+
+    @Test
+    public void testWithExceptionHistory() throws HandlerRequestException {
+        final ExceptionHistoryEntry rootCause =
+                ExceptionHistoryEntry.fromGlobalFailure(
+                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+        final ExceptionHistoryEntry otherFailure =
+                new ExceptionHistoryEntry(
+                        new RuntimeException("exception #1"),
+                        System.currentTimeMillis(),
+                        "task name",
+                        new LocalTaskManagerLocation());
+
+        final ExecutionGraphInfo executionGraphInfo =
+                createExecutionGraphInfo(rootCause, otherFailure);
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response, containsRootCause(rootCause.getException(), 
rootCause.getTimestamp()));
+        assertThat(
+                response,
+                containsGlobalFailure(rootCause.getException(), 
rootCause.getTimestamp()));
+        assertThat(
+                response,
+                historyContainsJobExceptionInfo(
+                        otherFailure.getException(),
+                        otherFailure.getTimestamp(),
+                        otherFailure.getFailingTaskName(),
+                        
taskManagerLocationToString(otherFailure.getAssignedResourceLocation())));
+        assertThat(response, hasHistoryEntryCount(2));
+        assertThat(response, not(hasTruncatedHistory()));
+        assertThat(response, hasTotal(2));
+    }
+
+    @Test
+    public void testWithExceptionHistoryWithTruncationThroughParameter()
+            throws HandlerRequestException {
+        final ExceptionHistoryEntry rootCause =
+                ExceptionHistoryEntry.fromGlobalFailure(
+                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+        final ExceptionHistoryEntry otherFailure =
+                new ExceptionHistoryEntry(
+                        new RuntimeException("exception #1"),
+                        System.currentTimeMillis(),
+                        "task name",
+                        new LocalTaskManagerLocation());
+
+        final ExecutionGraphInfo executionGraphInfo =
+                createExecutionGraphInfo(rootCause, otherFailure);
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 1);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response, containsRootCause(rootCause.getException(), 
rootCause.getTimestamp()));
+        assertThat(
+                response,
+                containsGlobalFailure(rootCause.getException(), 
rootCause.getTimestamp()));
+        assertThat(response, hasHistoryEntryCount(1));
+        assertThat(response, hasTruncatedHistory());
+        assertThat(response, hasTotal(2));
+    }
+
+    @Test
+    public void testWithExceptionHistoryWithInternalTruncation() throws 
HandlerRequestException {
+        final ExceptionHistoryEntry rootCause =
+                ExceptionHistoryEntry.fromGlobalFailure(
+                        new RuntimeException("exception #0"), 
System.currentTimeMillis());
+
+        final ExecutionGraphInfo executionGraphInfo = 
createExecutionGraphInfo(rootCause);
+        // adding the rootCause a second time here will result in the 
underlying BoundedFIFOQueue to
+        // be truncated
+        executionGraphInfo.getExceptionHistory().add(rootCause);
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> 
request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response, containsRootCause(rootCause.getException(), 
rootCause.getTimestamp()));
+        assertThat(
+                response,
+                containsGlobalFailure(rootCause.getException(), 
rootCause.getTimestamp()));
+        assertThat(response, hasHistoryEntryCount(1));
+        assertThat(response, not(hasTruncatedHistory()));

Review comment:
       but it _is_ truncated because 1 < 2? It seems a bit unintuitive

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** {@code BoundedFIFOTest} tests {@link BoundedFIFOQueue}. */
+public class BoundedFIFOQueueTest extends TestLogger {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorFailing() {
+        new BoundedFIFOQueue<>(-1);
+    }
+
+    @Test
+    public void testCollectionConstructor() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(Arrays.asList(1, 2));
+        assertThat(testInstance, contains(1, 2));
+        assertThat(testInstance.size(), is(2));
+        assertThat(testInstance.getElementCount(), is(2));
+    }
+
+    @Test
+    public void testEmptyQueue() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance, iterableWithSize(0));
+        testInstance.add(1);
+        assertThat(testInstance, iterableWithSize(0));
+    }
+
+    @Test
+    public void testQueue() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(2);
+        assertThat(testInstance, iterableWithSize(0));
+
+        testInstance.add(1);
+        assertThat(testInstance, contains(1));
+
+        testInstance.add(2);
+        assertThat(testInstance, contains(1, 2));
+
+        testInstance.add(3);
+        assertThat(testInstance, contains(2, 3));
+    }
+
+    @Test
+    public void testElementCount() {
+        final BoundedFIFOQueue<Integer> testInstance = new 
BoundedFIFOQueue<>(0);
+        assertThat(testInstance.getElementCount(), is(0));
+
+        testInstance.add(1);
+        assertThat(testInstance.getElementCount(), is(1));

Review comment:
       this test contradicts testEmptyQueue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to