Repository: flink Updated Branches: refs/heads/release-1.1 a79290c28 -> 871de0bf7
[FLINK-5107] Introduced limit for prior execution attempt history This closes #2837. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8989a9ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8989a9ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8989a9ff Branch: refs/heads/release-1.1 Commit: 8989a9ff509c39a520aacf5836c7d3d6ff22ad61 Parents: a79290c Author: Stefan Richter <[email protected]> Authored: Fri Nov 18 19:07:56 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Wed Nov 23 00:29:20 2016 +0100 ---------------------------------------------------------------------- ...taskExecutionAttemptAccumulatorsHandler.java | 7 +- .../executiongraph/ExecutionJobVertex.java | 11 +- .../runtime/executiongraph/ExecutionVertex.java | 42 +++-- .../runtime/jobmanager/JobManagerOptions.java | 38 +++++ .../flink/runtime/util/EvictingBoundedList.java | 160 ++++++++++++++++++ .../executiongraph/AllVerticesIteratorTest.java | 9 +- .../runtime/util/EvictingBoundedListTest.java | 164 +++++++++++++++++++ 7 files changed, 416 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index 14ccc0c..f661126 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; - import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -39,6 +38,12 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA @Override public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception { + + // return empty string for pruned (== null) execution attempts + if (null == execAttempt) { + return ""; + } + final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 65259ed..7af9868 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.io.StrictlyLocalAssignment; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; @@ -150,9 +152,16 @@ public class ExecutionJobVertex implements Serializable { result.getResultType()); } + Configuration jobConfiguration = graph.getJobConfiguration(); + int maxPriorAttemptsHistoryLength = jobConfiguration != null ? + jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) : + JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(); + // create all task vertices for (int i = 0; i < numTaskVertices; i++) { - ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp); + ExecutionVertex vertex = new ExecutionVertex( + this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength); + this.taskVertices[i] = vertex; } http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index b48fa27..6e76d8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -36,12 +36,14 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.EvictingBoundedList; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; @@ -56,7 +58,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.flink.runtime.execution.ExecutionState.CANCELED; import static org.apache.flink.runtime.execution.ExecutionState.FAILED; @@ -84,7 +85,7 @@ public class ExecutionVertex implements Serializable { private final int subTaskIndex; - private final List<Execution> priorExecutions; + private final EvictingBoundedList<Execution> priorExecutions; private final FiniteDuration timeout; @@ -103,7 +104,13 @@ public class ExecutionVertex implements Serializable { int subTaskIndex, IntermediateResult[] producedDataSets, FiniteDuration timeout) { - this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis()); + this( + jobVertex, + subTaskIndex, + producedDataSets, + timeout, + System.currentTimeMillis(), + JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue()); } public ExecutionVertex( @@ -111,7 +118,17 @@ public class ExecutionVertex implements Serializable { int subTaskIndex, IntermediateResult[] producedDataSets, FiniteDuration timeout, - long createTimestamp) { + int maxPriorExecutionHistoryLength) { + this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength); + } + + public ExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex, + IntermediateResult[] producedDataSets, + Time timeout, + long createTimestamp, + int maxPriorExecutionHistoryLength) { this.jobVertex = jobVertex; this.subTaskIndex = subTaskIndex; @@ -126,7 +143,7 @@ public class ExecutionVertex implements Serializable { this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][]; - this.priorExecutions = new CopyOnWriteArrayList<Execution>(); + this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength); this.currentExecution = new Execution( getExecutionGraph().getFutureExecutionContext(), @@ -224,14 +241,19 @@ public class ExecutionVertex implements Serializable { } public Execution getPriorExecutionAttempt(int attemptNumber) { - if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) { - return priorExecutions.get(attemptNumber); - } - else { - throw new IllegalArgumentException("attempt does not exist"); + synchronized (priorExecutions) { + if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) { + return priorExecutions.get(attemptNumber); + } else { + throw new IllegalArgumentException("attempt does not exist"); + } } } + EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() { + synchronized (priorExecutions) { + return new EvictingBoundedList<>(priorExecutions); + } public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java new file mode 100644 index 0000000..279a70e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +@PublicEvolving +public class JobManagerOptions { + + /** + * The maximum number of prior execution attempts kept in history. + */ + public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = + key("job-manager.max-attempts-history-size").defaultValue(16); + + private JobManagerOptions() { + throw new IllegalAccessError(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java new file mode 100644 index 0000000..f4c155a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond + * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO + * order). If dropped elements are accessed, a default element is returned instead. + * <p> + * TODO this class could eventually implement the whole actual List interface. + * + * @param <T> type of the list elements + */ +public class EvictingBoundedList<T> implements Iterable<T>, Serializable { + + private static final long serialVersionUID = -1863961980953613146L; + + private final T defaultElement; + private final Object[] elements; + private int idx; + private int count; + private long modCount; + + public EvictingBoundedList(int sizeLimit) { + this(sizeLimit, null); + } + + public EvictingBoundedList(EvictingBoundedList<T> other) { + Preconditions.checkNotNull(other); + this.defaultElement = other.defaultElement; + this.elements = other.elements.clone(); + this.idx = other.idx; + this.count = other.count; + this.modCount = 0L; + } + + public EvictingBoundedList(int sizeLimit, T defaultElement) { + this.elements = new Object[sizeLimit]; + this.defaultElement = defaultElement; + this.idx = 0; + this.count = 0; + this.modCount = 0L; + } + + public int size() { + return count; + } + + public boolean isEmpty() { + return 0 == count; + } + + public boolean add(T t) { + elements[idx] = t; + idx = (idx + 1) % elements.length; + ++count; + ++modCount; + return true; + } + + public void clear() { + if (!isEmpty()) { + for (int i = 0; i < elements.length; ++i) { + elements[i] = null; + } + count = 0; + idx = 0; + ++modCount; + } + } + + public T get(int index) { + Preconditions.checkArgument(index >= 0 && index < count); + return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length); + } + + public int getSizeLimit() { + return elements.length; + } + + public T set(int index, T element) { + Preconditions.checkArgument(index >= 0 && index < count); + ++modCount; + if (isDroppedIndex(index)) { + return getDefaultElement(); + } else { + int idx = index % elements.length; + T old = accessInternal(idx); + elements[idx] = element; + return old; + } + } + + public T getDefaultElement() { + return defaultElement; + } + + private boolean isDroppedIndex(int idx) { + return idx < count - elements.length; + } + + @SuppressWarnings("unchecked") + private T accessInternal(int arrayIndex) { + return (T) elements[arrayIndex]; + } + + @Override + public Iterator<T> iterator() { + return new Iterator<T>() { + + int pos = 0; + final long oldModCount = modCount; + + @Override + public boolean hasNext() { + return pos < count; + } + + @Override + public T next() { + if (oldModCount != modCount) { + throw new ConcurrentModificationException(); + } + if (pos < count) { + return get(pos++); + } else { + throw new NoSuchElementException("Iterator exhausted."); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only iterator"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java index a9a9d4b..9ace87d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java @@ -18,8 +18,7 @@ package org.apache.flink.runtime.executiongraph; -import java.util.Arrays; - +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -28,6 +27,8 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import java.util.Arrays; + public class AllVerticesIteratorTest { @Test @@ -50,8 +51,10 @@ public class AllVerticesIteratorTest { v4.setParallelism(2); ExecutionGraph eg = Mockito.mock(ExecutionGraph.class); + Configuration jobConf = new Configuration(); Mockito.when(eg.getFutureExecutionContext()).thenReturn(TestingUtils.directExecutionContext()); - + Mockito.when(eg.getJobConfiguration()).thenReturn(jobConf); + ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1, AkkaUtils.getDefaultTimeout()); ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1, http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java new file mode 100644 index 0000000..e0a1c70 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class EvictingBoundedListTest { + + @Test + public void testAddGet() { + int insertSize = 17; + int boundSize = 5; + Integer defaultElement = 4711; + + EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement); + assertTrue(list.isEmpty()); + + for (int i = 0; i < insertSize; ++i) { + list.add(i); + } + + assertEquals(17, list.size()); + + for (int i = 0; i < insertSize; ++i) { + int exp = i < (insertSize - boundSize) ? defaultElement : i; + int act = list.get(i); + assertEquals(exp, act); + } + } + + @Test + public void testSet() { + int insertSize = 17; + int boundSize = 5; + Integer defaultElement = 4711; + List<Integer> reference = new ArrayList<>(insertSize); + EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement); + for (int i = 0; i < insertSize; ++i) { + reference.add(i); + list.add(i); + } + + assertEquals(reference.size(), list.size()); + + list.set(0, 123); + list.set(insertSize - boundSize - 1, 123); + + list.set(insertSize - boundSize, 42); + reference.set(insertSize - boundSize, 42); + list.set(13, 43); + reference.set(13, 43); + list.set(16, 44); + reference.set(16, 44); + + try { + list.set(insertSize, 23); + fail("Illegal index in set not detected."); + } catch (IllegalArgumentException ignored) { + + } + + for (int i = 0; i < insertSize; ++i) { + int exp = i < (insertSize - boundSize) ? defaultElement : reference.get(i); + int act = list.get(i); + assertEquals(exp, act); + } + + assertEquals(reference.size(), list.size()); + } + + @Test + public void testClear() { + int insertSize = 17; + int boundSize = 5; + Integer defaultElement = 4711; + + EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement); + for (int i = 0; i < insertSize; ++i) { + list.add(i); + } + + list.clear(); + + assertEquals(0, list.size()); + assertTrue(list.isEmpty()); + + try { + list.get(0); + fail(); + } catch (IllegalArgumentException ignore) { + } + } + + @Test + public void testIterator() { + int insertSize = 17; + int boundSize = 5; + Integer defaultElement = 4711; + + EvictingBoundedList<Integer> list = new EvictingBoundedList<>(boundSize, defaultElement); + assertTrue(list.isEmpty()); + + for (int i = 0; i < insertSize; ++i) { + list.add(i); + } + + Iterator<Integer> iterator = list.iterator(); + + for (int i = 0; i < insertSize; ++i) { + assertTrue(iterator.hasNext()); + int exp = i < (insertSize - boundSize) ? defaultElement : i; + int act = iterator.next(); + assertEquals(exp, act); + } + + assertFalse(iterator.hasNext()); + + try { + iterator.next(); + fail("Next on exhausted iterator did not trigger exception."); + } catch (NoSuchElementException ignored) { + + } + + iterator = list.iterator(); + assertTrue(iterator.hasNext()); + iterator.next(); + list.add(123); + assertTrue(iterator.hasNext()); + try { + iterator.next(); + fail("Concurrent modification not detected."); + } catch (ConcurrentModificationException ignored) { + + } + } +}
