Repository: flink Updated Branches: refs/heads/master 486f7249c -> d5f2647af
[FLINK-5883] [core] Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time This closes #3392 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5f2647a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5f2647a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5f2647a Branch: refs/heads/master Commit: d5f2647afaf69a3d9a4f2b525b1e70e3d728d6a5 Parents: 3f700ca Author: lincoln-lil <[email protected]> Authored: Wed Feb 22 21:57:38 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Mar 20 13:20:03 2017 +0100 ---------------------------------------------------------------------- .../common/operators/util/ListKeyGroupedIterator.java | 14 ++++++++++++-- .../CoGroupConnectedComponentsSecondITCase.java | 6 ++++-- 2 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d5f2647a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java index ece41a5..df2f42e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.util; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.TraversableOnceException; import java.io.IOException; import java.util.Iterator; @@ -78,6 +79,7 @@ public final class ListKeyGroupedIterator<E> { this.comparator.setReference(this.lookahead); this.valuesIterator.next = this.lookahead; this.lookahead = null; + this.valuesIterator.iteratorAvailable = true; return true; } @@ -96,6 +98,7 @@ public final class ListKeyGroupedIterator<E> { // the keys do not match, so we have a new group. store the current key this.comparator.setReference(next); this.valuesIterator.next = next; + this.valuesIterator.iteratorAvailable = true; return true; } } @@ -160,7 +163,9 @@ public final class ListKeyGroupedIterator<E> { public final class ValuesIterator implements Iterator<E>, Iterable<E> { private E next; - + + private boolean iteratorAvailable = true; + private final TypeSerializer<E> serializer; private ValuesIterator(E first, TypeSerializer<E> serializer) { @@ -191,7 +196,12 @@ public final class ListKeyGroupedIterator<E> { @Override public Iterator<E> iterator() { - return this; + if (iteratorAvailable) { + iteratorAvailable = false; + return this; + } else { + throw new TraversableOnceException(); + } } public E getCurrent() { http://git-wip-us.apache.org/repos/asf/flink/blob/d5f2647a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java index cd6345b..7f5d194 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.iterative; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.functions.RichCoGroupFunction; @@ -115,11 +116,12 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase @Override public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) { - if (!current.iterator().hasNext()) { + Iterator<Tuple2<Long, Long>> iterator = current.iterator(); + if (!iterator.hasNext()) { throw new RuntimeException("Error: Id not encountered before."); } - Tuple2<Long, Long> old = current.iterator().next(); + Tuple2<Long, Long> old = iterator.next(); long minimumComponentID = Long.MAX_VALUE;
