[hotfix] [jobmanager] Reduce complexits when archiving ExecutionVertex

This fixes the inefficiency where the archiving operation iterated over the 
entire
evicted history of prior execution attempts when converting them to
archived executions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4820b413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4820b413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4820b413

Branch: refs/heads/master
Commit: 4820b413a55a3bbb1853251ecdb94b4c70dc5e2b
Parents: 10e4e32
Author: Stephan Ewen <[email protected]>
Authored: Tue Jan 31 19:52:57 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../executiongraph/ArchivedExecutionVertex.java | 22 ++++-
 .../flink/runtime/util/EvictingBoundedList.java | 73 ++++++++++++++-
 .../runtime/util/EvictingBoundedListTest.java   | 97 +++++++++++++++++++-
 3 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 56fc7a6..5053cae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 public class ArchivedExecutionVertex implements AccessExecutionVertex, 
Serializable {
 
        private static final long serialVersionUID = -6708241535015028576L;
+
        private final int subTaskIndex;
 
        private final EvictingBoundedList<ArchivedExecution> priorExecutions;
@@ -35,13 +36,11 @@ public class ArchivedExecutionVertex implements 
AccessExecutionVertex, Serializa
 
        private final ArchivedExecution currentExecution;    // this field must 
never be null
 
+       // 
------------------------------------------------------------------------
+
        public ArchivedExecutionVertex(ExecutionVertex vertex) {
                this.subTaskIndex = vertex.getParallelSubtaskIndex();
-               EvictingBoundedList<Execution> copyOfPriorExecutionsList = 
vertex.getCopyOfPriorExecutionsList();
-               priorExecutions = new 
EvictingBoundedList<>(copyOfPriorExecutionsList.getSizeLimit());
-               for (Execution priorExecution : copyOfPriorExecutionsList) {
-                       priorExecutions.add(priorExecution != null ? 
priorExecution.archive() : null);
-               }
+               this.priorExecutions = 
vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
                this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
                this.currentExecution = 
vertex.getCurrentExecutionAttempt().archive();
        }
@@ -93,4 +92,17 @@ public class ArchivedExecutionVertex implements 
AccessExecutionVertex, Serializa
                        throw new IllegalArgumentException("attempt does not 
exist");
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private static final EvictingBoundedList.Function<Execution, 
ArchivedExecution> ARCHIVER =
+                       new EvictingBoundedList.Function<Execution, 
ArchivedExecution>() {
+
+               @Override
+               public ArchivedExecution apply(Execution value) {
+                       return value.archive();
+               }
+       };
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index f4c155a..2c5b6a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -29,8 +29,9 @@ import java.util.NoSuchElementException;
  * This class implements a list (array based) that is physically bounded in 
maximum size, but can virtually grow beyond
  * the bounded size. When the list grows beyond the size bound, elements are 
dropped from the head of the list (FIFO
  * order). If dropped elements are accessed, a default element is returned 
instead.
- * <p>
- * TODO this class could eventually implement the whole actual List interface.
+ * 
+ * <p>The list by itself is serializable, but a full list can only be 
serialized if the values
+ * are also serializable.
  *
  * @param <T> type of the list elements
  */
@@ -38,12 +39,25 @@ public class EvictingBoundedList<T> implements Iterable<T>, 
Serializable {
 
        private static final long serialVersionUID = -1863961980953613146L;
 
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
+       /** the default element returned for positions that were evicted */
        private final T defaultElement;
+
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
+       /** the array (viewed as a circular buffer) that holds the latest (= 
non-evicted) elements */
        private final Object[] elements;
+
+       /** The next index to put an element in the array */
        private int idx;
+
+       /** The current number of (virtual) elements in the list */
        private int count;
+
+       /** Modification count for fail-fast iterators */
        private long modCount;
 
+       // 
------------------------------------------------------------------------
+
        public EvictingBoundedList(int sizeLimit) {
                this(sizeLimit, null);
        }
@@ -65,6 +79,8 @@ public class EvictingBoundedList<T> implements Iterable<T>, 
Serializable {
                this.modCount = 0L;
        }
 
+       // 
------------------------------------------------------------------------
+
        public int size() {
                return count;
        }
@@ -93,8 +109,11 @@ public class EvictingBoundedList<T> implements Iterable<T>, 
Serializable {
        }
 
        public T get(int index) {
-               Preconditions.checkArgument(index >= 0 && index < count);
-               return isDroppedIndex(index) ? getDefaultElement() : 
accessInternal(index % elements.length);
+               if (index >= 0 && index < count) {
+                       return isDroppedIndex(index) ? getDefaultElement() : 
accessInternal(index % elements.length);
+               } else {
+                       throw new 
IndexOutOfBoundsException(String.valueOf(index));
+               }
        }
 
        public int getSizeLimit() {
@@ -157,4 +176,50 @@ public class EvictingBoundedList<T> implements 
Iterable<T>, Serializable {
                        }
                };
        }
+
+       /**
+        * Creates a new list that replaces its elements with transformed 
elements.
+        * The list retains the same size and position-to-element mapping.
+        * 
+        * <p>Note that null values are automatically mapped to null values.
+        * 
+        * @param transform The function used to transform each element
+        * @param <R> The type of the elements in the result list.
+        * 
+        * @return The list with the mapped elements
+        */
+       public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
+               // map the default element
+               final R newDefault = defaultElement == null ? null : 
transform.apply(defaultElement);
+
+               // copy the list with the new default
+               final EvictingBoundedList<R> result = new 
EvictingBoundedList<>(elements.length, newDefault);
+               result.count = count;
+               result.idx = idx;
+
+               // map all the entries in the list
+               final int numElements = Math.min(elements.length, count);
+               for (int i = 0; i < numElements; i++) {
+                       result.elements[i] = transform.apply(accessInternal(i));
+               }
+
+               return result;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A simple unary function that can be used to transform elements via 
the
+        * {@link EvictingBoundedList#map(Function)} method.
+        */
+       public interface Function<I, O> {
+
+               /**
+                * Transforms the value.
+                * 
+                * @param value The value to transform
+                * @return The transformed value
+                */
+               O apply(I value);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4820b413/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index e0a1c70..7109dac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,9 +28,12 @@ import java.util.NoSuchElementException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class EvictingBoundedListTest {
 
        @Test
@@ -114,7 +117,7 @@ public class EvictingBoundedListTest {
                try {
                        list.get(0);
                        fail();
-               } catch (IllegalArgumentException ignore) {
+               } catch (IndexOutOfBoundsException ignore) {
                }
        }
 
@@ -161,4 +164,96 @@ public class EvictingBoundedListTest {
 
                }
        }
+
+       @Test
+       public void testMapWithHalfFullList() {
+               final Object[] originals = { new Object(), new Object(), new 
Object() };
+               final Object defaultValue = new Object();
+
+               final EvictingBoundedList<Object> original = new 
EvictingBoundedList<>(5, defaultValue);
+               for (Object o : originals) {
+                       original.add(o);
+               }
+
+               final EvictingBoundedList<TransformedObject> transformed = 
original.map(new Mapper());
+
+               assertEquals(original.size(), transformed.size());
+               assertEquals(original.getSizeLimit(), 
transformed.getSizeLimit());
+               assertEquals(defaultValue, 
transformed.getDefaultElement().original);
+
+               int i = 0;
+               for (TransformedObject to : transformed) {
+                       assertEquals(originals[i++], to.original); 
+               }
+
+               try {
+                       transformed.get(originals.length);
+                       fail("should have failed with an exception");
+               } catch (IndexOutOfBoundsException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testMapWithEvictedElements() {
+               final Object[] originals = { new Object(), new Object(), new 
Object(), new Object(), new Object() };
+               final Object defaultValue = new Object();
+
+               final EvictingBoundedList<Object> original = new 
EvictingBoundedList<>(2, defaultValue);
+               for (Object o : originals) {
+                       original.add(o);
+               }
+
+               final EvictingBoundedList<TransformedObject> transformed = 
original.map(new Mapper());
+
+               assertEquals(originals.length, transformed.size());
+               assertEquals(original.size(), transformed.size());
+               assertEquals(original.getSizeLimit(), 
transformed.getSizeLimit());
+               assertEquals(defaultValue, 
transformed.getDefaultElement().original);
+
+               for (int i = 0; i < originals.length; i++) {
+                       if (i < originals.length - transformed.getSizeLimit()) {
+                               assertEquals(transformed.getDefaultElement(), 
transformed.get(i));
+                       } else {
+                               assertEquals(originals[i], 
transformed.get(i).original);
+                       }
+               }
+
+               try {
+                       transformed.get(originals.length);
+                       fail("should have failed with an exception");
+               } catch (IndexOutOfBoundsException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testMapWithNullDefault() {
+               final EvictingBoundedList<Object> original = new 
EvictingBoundedList<>(5, null);
+               final EvictingBoundedList<TransformedObject> transformed = 
original.map(new Mapper());
+
+               assertEquals(original.size(), transformed.size());
+               assertNull(transformed.getDefaultElement());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class TransformedObject {
+
+               final Object original;
+
+               TransformedObject(Object original) {
+                       this.original = checkNotNull(original);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class Mapper implements 
EvictingBoundedList.Function<Object, TransformedObject> {
+
+               @Override
+               public TransformedObject apply(Object value) {
+                       return new TransformedObject(value);
+               }
+       }
 }

Reply via email to