Repository: flink
Updated Branches:
  refs/heads/release-1.1 a79290c28 -> 871de0bf7


[FLINK-5107] Introduced limit for prior execution attempt history

This closes #2837.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8989a9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8989a9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8989a9ff

Branch: refs/heads/release-1.1
Commit: 8989a9ff509c39a520aacf5836c7d3d6ff22ad61
Parents: a79290c
Author: Stefan Richter <[email protected]>
Authored: Fri Nov 18 19:07:56 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 23 00:29:20 2016 +0100

----------------------------------------------------------------------
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +-
 .../executiongraph/ExecutionJobVertex.java      |  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |  42 +++--
 .../runtime/jobmanager/JobManagerOptions.java   |  38 +++++
 .../flink/runtime/util/EvictingBoundedList.java | 160 ++++++++++++++++++
 .../executiongraph/AllVerticesIteratorTest.java |   9 +-
 .../runtime/util/EvictingBoundedListTest.java   | 164 +++++++++++++++++++
 7 files changed, 416 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 14ccc0c..f661126 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -39,6 +38,12 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
        @Override
        public String handleRequest(Execution execAttempt, Map<String, String> 
params) throws Exception {
+
+               // return empty string for pruned (== null) execution attempts
+               if (null == execAttempt) {
+                       return "";
+               }
+
                final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
                
                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 65259ed..7af9868 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -37,6 +38,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -150,9 +152,16 @@ public class ExecutionJobVertex implements Serializable {
                                        result.getResultType());
                }
 
+               Configuration jobConfiguration = graph.getJobConfiguration();
+               int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
+                               
jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
+                               
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
+
                // create all task vertices
                for (int i = 0; i < numTaskVertices; i++) {
-                       ExecutionVertex vertex = new ExecutionVertex(this, i, 
this.producedDataSets, timeout, createTimestamp);
+                       ExecutionVertex vertex = new ExecutionVertex(
+                                       this, i, this.producedDataSets, 
timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+
                        this.taskVertices[i] = vertex;
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b48fa27..6e76d8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -36,12 +36,14 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
@@ -56,7 +58,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
@@ -84,7 +85,7 @@ public class ExecutionVertex implements Serializable {
 
        private final int subTaskIndex;
 
-       private final List<Execution> priorExecutions;
+       private final EvictingBoundedList<Execution> priorExecutions;
 
        private final FiniteDuration timeout;
 
@@ -103,7 +104,13 @@ public class ExecutionVertex implements Serializable {
                        int subTaskIndex,
                        IntermediateResult[] producedDataSets,
                        FiniteDuration timeout) {
-               this(jobVertex, subTaskIndex, producedDataSets, timeout, 
System.currentTimeMillis());
+               this(
+                               jobVertex,
+                               subTaskIndex,
+                               producedDataSets,
+                               timeout,
+                               System.currentTimeMillis(),
+                               
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
        }
 
        public ExecutionVertex(
@@ -111,7 +118,17 @@ public class ExecutionVertex implements Serializable {
                        int subTaskIndex,
                        IntermediateResult[] producedDataSets,
                        FiniteDuration timeout,
-                       long createTimestamp) {
+                       int maxPriorExecutionHistoryLength) {
+               this(jobVertex, subTaskIndex, producedDataSets, timeout, 
System.currentTimeMillis(), maxPriorExecutionHistoryLength);
+       }
+
+       public ExecutionVertex(
+                       ExecutionJobVertex jobVertex,
+                       int subTaskIndex,
+                       IntermediateResult[] producedDataSets,
+                       Time timeout,
+                       long createTimestamp,
+                       int maxPriorExecutionHistoryLength) {
                this.jobVertex = jobVertex;
                this.subTaskIndex = subTaskIndex;
 
@@ -126,7 +143,7 @@ public class ExecutionVertex implements Serializable {
 
                this.inputEdges = new 
ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
 
-               this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+               this.priorExecutions = new 
EvictingBoundedList<>(maxPriorExecutionHistoryLength);
 
                this.currentExecution = new Execution(
                        getExecutionGraph().getFutureExecutionContext(),
@@ -224,14 +241,19 @@ public class ExecutionVertex implements Serializable {
        }
        
        public Execution getPriorExecutionAttempt(int attemptNumber) {
-               if (attemptNumber >= 0 && attemptNumber < 
priorExecutions.size()) {
-                       return priorExecutions.get(attemptNumber);
-               }
-               else {
-                       throw new IllegalArgumentException("attempt does not 
exist");
+               synchronized (priorExecutions) {
+                       if (attemptNumber >= 0 && attemptNumber < 
priorExecutions.size()) {
+                               return priorExecutions.get(attemptNumber);
+                       } else {
+                               throw new IllegalArgumentException("attempt 
does not exist");
+                       }
                }
        }
        
+       EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+               synchronized (priorExecutions) {
+                       return new EvictingBoundedList<>(priorExecutions);
+               }
        public ExecutionGraph getExecutionGraph() {
                return this.jobVertex.getGraph();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
new file mode 100644
index 0000000..279a70e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class JobManagerOptions {
+
+       /**
+        * The maximum number of prior execution attempts kept in history.
+        */
+       public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
+                       
key("job-manager.max-attempts-history-size").defaultValue(16);
+
+       private JobManagerOptions() {
+               throw new IllegalAccessError();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
new file mode 100644
index 0000000..f4c155a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * This class implements a list (array based) that is physically bounded in 
maximum size, but can virtually grow beyond
+ * the bounded size. When the list grows beyond the size bound, elements are 
dropped from the head of the list (FIFO
+ * order). If dropped elements are accessed, a default element is returned 
instead.
+ * <p>
+ * TODO this class could eventually implement the whole actual List interface.
+ *
+ * @param <T> type of the list elements
+ */
+public class EvictingBoundedList<T> implements Iterable<T>, Serializable {
+
+       private static final long serialVersionUID = -1863961980953613146L;
+
+       private final T defaultElement;
+       private final Object[] elements;
+       private int idx;
+       private int count;
+       private long modCount;
+
+       public EvictingBoundedList(int sizeLimit) {
+               this(sizeLimit, null);
+       }
+
+       public EvictingBoundedList(EvictingBoundedList<T> other) {
+               Preconditions.checkNotNull(other);
+               this.defaultElement = other.defaultElement;
+               this.elements = other.elements.clone();
+               this.idx = other.idx;
+               this.count = other.count;
+               this.modCount = 0L;
+       }
+
+       public EvictingBoundedList(int sizeLimit, T defaultElement) {
+               this.elements = new Object[sizeLimit];
+               this.defaultElement = defaultElement;
+               this.idx = 0;
+               this.count = 0;
+               this.modCount = 0L;
+       }
+
+       public int size() {
+               return count;
+       }
+
+       public boolean isEmpty() {
+               return 0 == count;
+       }
+
+       public boolean add(T t) {
+               elements[idx] = t;
+               idx = (idx + 1) % elements.length;
+               ++count;
+               ++modCount;
+               return true;
+       }
+
+       public void clear() {
+               if (!isEmpty()) {
+                       for (int i = 0; i < elements.length; ++i) {
+                               elements[i] = null;
+                       }
+                       count = 0;
+                       idx = 0;
+                       ++modCount;
+               }
+       }
+
+       public T get(int index) {
+               Preconditions.checkArgument(index >= 0 && index < count);
+               return isDroppedIndex(index) ? getDefaultElement() : 
accessInternal(index % elements.length);
+       }
+
+       public int getSizeLimit() {
+               return elements.length;
+       }
+
+       public T set(int index, T element) {
+               Preconditions.checkArgument(index >= 0 && index < count);
+               ++modCount;
+               if (isDroppedIndex(index)) {
+                       return getDefaultElement();
+               } else {
+                       int idx = index % elements.length;
+                       T old = accessInternal(idx);
+                       elements[idx] = element;
+                       return old;
+               }
+       }
+
+       public T getDefaultElement() {
+               return defaultElement;
+       }
+
+       private boolean isDroppedIndex(int idx) {
+               return idx < count - elements.length;
+       }
+
+       @SuppressWarnings("unchecked")
+       private T accessInternal(int arrayIndex) {
+               return (T) elements[arrayIndex];
+       }
+
+       @Override
+       public Iterator<T> iterator() {
+               return new Iterator<T>() {
+
+                       int pos = 0;
+                       final long oldModCount = modCount;
+
+                       @Override
+                       public boolean hasNext() {
+                               return pos < count;
+                       }
+
+                       @Override
+                       public T next() {
+                               if (oldModCount != modCount) {
+                                       throw new 
ConcurrentModificationException();
+                               }
+                               if (pos < count) {
+                                       return get(pos++);
+                               } else {
+                                       throw new 
NoSuchElementException("Iterator exhausted.");
+                               }
+                       }
+
+                       @Override
+                       public void remove() {
+                               throw new 
UnsupportedOperationException("Read-only iterator");
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index a9a9d4b..9ace87d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import java.util.Arrays;
-
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -28,6 +27,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Arrays;
+
 public class AllVerticesIteratorTest {
 
        @Test
@@ -50,8 +51,10 @@ public class AllVerticesIteratorTest {
                        v4.setParallelism(2);
                        
                        ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+                       Configuration jobConf = new Configuration();
                        
Mockito.when(eg.getFutureExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
-                                       
+                       
Mockito.when(eg.getJobConfiguration()).thenReturn(jobConf);
+
                        ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, 
v1, 1,
                                        AkkaUtils.getDefaultTimeout());
                        ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, 
v2, 1,

http://git-wip-us.apache.org/repos/asf/flink/blob/8989a9ff/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
new file mode 100644
index 0000000..e0a1c70
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class EvictingBoundedListTest {
+
+       @Test
+       public void testAddGet() {
+               int insertSize = 17;
+               int boundSize = 5;
+               Integer defaultElement = 4711;
+
+               EvictingBoundedList<Integer> list = new 
EvictingBoundedList<>(boundSize, defaultElement);
+               assertTrue(list.isEmpty());
+
+               for (int i = 0; i < insertSize; ++i) {
+                       list.add(i);
+               }
+
+               assertEquals(17, list.size());
+
+               for (int i = 0; i < insertSize; ++i) {
+                       int exp = i < (insertSize - boundSize) ? defaultElement 
: i;
+                       int act = list.get(i);
+                       assertEquals(exp, act);
+               }
+       }
+
+       @Test
+       public void testSet() {
+               int insertSize = 17;
+               int boundSize = 5;
+               Integer defaultElement = 4711;
+               List<Integer> reference = new ArrayList<>(insertSize);
+               EvictingBoundedList<Integer> list = new 
EvictingBoundedList<>(boundSize, defaultElement);
+               for (int i = 0; i < insertSize; ++i) {
+                       reference.add(i);
+                       list.add(i);
+               }
+
+               assertEquals(reference.size(), list.size());
+
+               list.set(0, 123);
+               list.set(insertSize - boundSize - 1, 123);
+
+               list.set(insertSize - boundSize, 42);
+               reference.set(insertSize - boundSize, 42);
+               list.set(13, 43);
+               reference.set(13, 43);
+               list.set(16, 44);
+               reference.set(16, 44);
+
+               try {
+                       list.set(insertSize, 23);
+                       fail("Illegal index in set not detected.");
+               } catch (IllegalArgumentException ignored) {
+
+               }
+
+               for (int i = 0; i < insertSize; ++i) {
+                       int exp = i < (insertSize - boundSize) ? defaultElement 
: reference.get(i);
+                       int act = list.get(i);
+                       assertEquals(exp, act);
+               }
+
+               assertEquals(reference.size(), list.size());
+       }
+
+       @Test
+       public void testClear() {
+               int insertSize = 17;
+               int boundSize = 5;
+               Integer defaultElement = 4711;
+
+               EvictingBoundedList<Integer> list = new 
EvictingBoundedList<>(boundSize, defaultElement);
+               for (int i = 0; i < insertSize; ++i) {
+                       list.add(i);
+               }
+
+               list.clear();
+
+               assertEquals(0, list.size());
+               assertTrue(list.isEmpty());
+
+               try {
+                       list.get(0);
+                       fail();
+               } catch (IllegalArgumentException ignore) {
+               }
+       }
+
+       @Test
+       public void testIterator() {
+               int insertSize = 17;
+               int boundSize = 5;
+               Integer defaultElement = 4711;
+
+               EvictingBoundedList<Integer> list = new 
EvictingBoundedList<>(boundSize, defaultElement);
+               assertTrue(list.isEmpty());
+
+               for (int i = 0; i < insertSize; ++i) {
+                       list.add(i);
+               }
+
+               Iterator<Integer> iterator = list.iterator();
+
+               for (int i = 0; i < insertSize; ++i) {
+                       assertTrue(iterator.hasNext());
+                       int exp = i < (insertSize - boundSize) ? defaultElement 
: i;
+                       int act = iterator.next();
+                       assertEquals(exp, act);
+               }
+
+               assertFalse(iterator.hasNext());
+
+               try {
+                       iterator.next();
+                       fail("Next on exhausted iterator did not trigger 
exception.");
+               } catch (NoSuchElementException ignored) {
+
+               }
+
+               iterator = list.iterator();
+               assertTrue(iterator.hasNext());
+               iterator.next();
+               list.add(123);
+               assertTrue(iterator.hasNext());
+               try {
+                       iterator.next();
+                       fail("Concurrent modification not detected.");
+               } catch (ConcurrentModificationException ignored) {
+
+               }
+       }
+}

Reply via email to