[hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex This fixes the inefficiency where the archiving operation iterated over the entire evicted history of prior execution attempts when converting them to archived executions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4820b413 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4820b413 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4820b413 Branch: refs/heads/master Commit: 4820b413a55a3bbb1853251ecdb94b4c70dc5e2b Parents: 10e4e32 Author: Stephan Ewen <[email protected]> Authored: Tue Jan 31 19:52:57 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 3 10:28:23 2017 +0100 ---------------------------------------------------------------------- .../executiongraph/ArchivedExecutionVertex.java | 22 ++++- .../flink/runtime/util/EvictingBoundedList.java | 73 ++++++++++++++- .../runtime/util/EvictingBoundedListTest.java | 97 +++++++++++++++++++- 3 files changed, 182 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java index 56fc7a6..5053cae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java @@ -26,6 +26,7 @@ import java.io.Serializable; public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable { private static final long serialVersionUID = -6708241535015028576L; + private final int subTaskIndex; private final EvictingBoundedList<ArchivedExecution> priorExecutions; @@ -35,13 +36,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa private final ArchivedExecution currentExecution; // this field must never be null + // ------------------------------------------------------------------------ + public ArchivedExecutionVertex(ExecutionVertex vertex) { this.subTaskIndex = vertex.getParallelSubtaskIndex(); - EvictingBoundedList<Execution> copyOfPriorExecutionsList = vertex.getCopyOfPriorExecutionsList(); - priorExecutions = new EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit()); - for (Execution priorExecution : copyOfPriorExecutionsList) { - priorExecutions.add(priorExecution != null ? priorExecution.archive() : null); - } + this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER); this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex(); this.currentExecution = vertex.getCurrentExecutionAttempt().archive(); } @@ -93,4 +92,17 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa throw new IllegalArgumentException("attempt does not exist"); } } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER = + new EvictingBoundedList.Function<Execution, ArchivedExecution>() { + + @Override + public ArchivedExecution apply(Execution value) { + return value.archive(); + } + }; } http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java index f4c155a..2c5b6a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java @@ -29,8 +29,9 @@ import java.util.NoSuchElementException; * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO * order). If dropped elements are accessed, a default element is returned instead. - * <p> - * TODO this class could eventually implement the whole actual List interface. + * + * <p>The list by itself is serializable, but a full list can only be serialized if the values + * are also serializable. * * @param <T> type of the list elements */ @@ -38,12 +39,25 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable { private static final long serialVersionUID = -1863961980953613146L; + @SuppressWarnings("NonSerializableFieldInSerializableClass") + /** the default element returned for positions that were evicted */ private final T defaultElement; + + @SuppressWarnings("NonSerializableFieldInSerializableClass") + /** the array (viewed as a circular buffer) that holds the latest (= non-evicted) elements */ private final Object[] elements; + + /** The next index to put an element in the array */ private int idx; + + /** The current number of (virtual) elements in the list */ private int count; + + /** Modification count for fail-fast iterators */ private long modCount; + // ------------------------------------------------------------------------ + public EvictingBoundedList(int sizeLimit) { this(sizeLimit, null); } @@ -65,6 +79,8 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable { this.modCount = 0L; } + // ------------------------------------------------------------------------ + public int size() { return count; } @@ -93,8 +109,11 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable { } public T get(int index) { - Preconditions.checkArgument(index >= 0 && index < count); - return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length); + if (index >= 0 && index < count) { + return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length); + } else { + throw new IndexOutOfBoundsException(String.valueOf(index)); + } } public int getSizeLimit() { @@ -157,4 +176,50 @@ public class EvictingBoundedList<T> implements Iterable<T>, Serializable { } }; } + + /** + * Creates a new list that replaces its elements with transformed elements. + * The list retains the same size and position-to-element mapping. + * + * <p>Note that null values are automatically mapped to null values. + * + * @param transform The function used to transform each element + * @param <R> The type of the elements in the result list. + * + * @return The list with the mapped elements + */ + public <R> EvictingBoundedList<R> map(Function<T, R> transform) { + // map the default element + final R newDefault = defaultElement == null ? null : transform.apply(defaultElement); + + // copy the list with the new default + final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault); + result.count = count; + result.idx = idx; + + // map all the entries in the list + final int numElements = Math.min(elements.length, count); + for (int i = 0; i < numElements; i++) { + result.elements[i] = transform.apply(accessInternal(i)); + } + + return result; + } + + // ------------------------------------------------------------------------ + + /** + * A simple unary function that can be used to transform elements via the + * {@link EvictingBoundedList#map(Function)} method. + */ + public interface Function<I, O> { + + /** + * Transforms the value. + * + * @param value The value to transform + * @return The transformed value + */ + O apply(I value); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java index e0a1c70..7109dac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java @@ -28,9 +28,12 @@ import java.util.NoSuchElementException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.apache.flink.util.Preconditions.checkNotNull; + public class EvictingBoundedListTest { @Test @@ -114,7 +117,7 @@ public class EvictingBoundedListTest { try { list.get(0); fail(); - } catch (IllegalArgumentException ignore) { + } catch (IndexOutOfBoundsException ignore) { } } @@ -161,4 +164,96 @@ public class EvictingBoundedListTest { } } + + @Test + public void testMapWithHalfFullList() { + final Object[] originals = { new Object(), new Object(), new Object() }; + final Object defaultValue = new Object(); + + final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue); + for (Object o : originals) { + original.add(o); + } + + final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper()); + + assertEquals(original.size(), transformed.size()); + assertEquals(original.getSizeLimit(), transformed.getSizeLimit()); + assertEquals(defaultValue, transformed.getDefaultElement().original); + + int i = 0; + for (TransformedObject to : transformed) { + assertEquals(originals[i++], to.original); + } + + try { + transformed.get(originals.length); + fail("should have failed with an exception"); + } catch (IndexOutOfBoundsException e) { + // expected + } + } + + @Test + public void testMapWithEvictedElements() { + final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() }; + final Object defaultValue = new Object(); + + final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue); + for (Object o : originals) { + original.add(o); + } + + final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper()); + + assertEquals(originals.length, transformed.size()); + assertEquals(original.size(), transformed.size()); + assertEquals(original.getSizeLimit(), transformed.getSizeLimit()); + assertEquals(defaultValue, transformed.getDefaultElement().original); + + for (int i = 0; i < originals.length; i++) { + if (i < originals.length - transformed.getSizeLimit()) { + assertEquals(transformed.getDefaultElement(), transformed.get(i)); + } else { + assertEquals(originals[i], transformed.get(i).original); + } + } + + try { + transformed.get(originals.length); + fail("should have failed with an exception"); + } catch (IndexOutOfBoundsException e) { + // expected + } + } + + @Test + public void testMapWithNullDefault() { + final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null); + final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper()); + + assertEquals(original.size(), transformed.size()); + assertNull(transformed.getDefaultElement()); + } + + // ------------------------------------------------------------------------ + + private static final class TransformedObject { + + final Object original; + + TransformedObject(Object original) { + this.original = checkNotNull(original); + } + } + + // ------------------------------------------------------------------------ + + private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> { + + @Override + public TransformedObject apply(Object value) { + return new TransformedObject(value); + } + } }
