Repository: flink
Updated Branches:
  refs/heads/master 486f7249c -> d5f2647af


[FLINK-5883] [core] Re-adding the Exception-thrown code for 
ListKeyGroupedIterator when the iterator is requested the second time

This closes #3392


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5f2647a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5f2647a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5f2647a

Branch: refs/heads/master
Commit: d5f2647afaf69a3d9a4f2b525b1e70e3d728d6a5
Parents: 3f700ca
Author: lincoln-lil <[email protected]>
Authored: Wed Feb 22 21:57:38 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Mon Mar 20 13:20:03 2017 +0100

----------------------------------------------------------------------
 .../common/operators/util/ListKeyGroupedIterator.java | 14 ++++++++++++--
 .../CoGroupConnectedComponentsSecondITCase.java       |  6 ++++--
 2 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5f2647a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
index ece41a5..df2f42e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.TraversableOnceException;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -78,6 +79,7 @@ public final class ListKeyGroupedIterator<E> {
                        this.comparator.setReference(this.lookahead);
                        this.valuesIterator.next = this.lookahead;
                        this.lookahead = null;
+                       this.valuesIterator.iteratorAvailable = true;
                        return true;
                }
 
@@ -96,6 +98,7 @@ public final class ListKeyGroupedIterator<E> {
                                                // the keys do not match, so we 
have a new group. store the current key
                                                
this.comparator.setReference(next);
                                                this.valuesIterator.next = next;
+                                               
this.valuesIterator.iteratorAvailable = true;
                                                return true;
                                        }
                                }
@@ -160,7 +163,9 @@ public final class ListKeyGroupedIterator<E> {
        public final class ValuesIterator implements Iterator<E>, Iterable<E> {
 
                private E next;
-               
+
+               private boolean iteratorAvailable = true;
+
                private final TypeSerializer<E> serializer;
 
                private ValuesIterator(E first, TypeSerializer<E> serializer) {
@@ -191,7 +196,12 @@ public final class ListKeyGroupedIterator<E> {
 
                @Override
                public Iterator<E> iterator() {
-                       return this;
+                       if (iteratorAvailable) {
+                               iteratorAvailable = false;
+                               return this;
+                       } else {
+                               throw new TraversableOnceException();
+                       }
                }
 
                public E getCurrent() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5f2647a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index cd6345b..7f5d194 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.iterative;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -115,11 +116,12 @@ public class CoGroupConnectedComponentsSecondITCase 
extends JavaProgramTestBase
                
                @Override
                public void coGroup(Iterable<Tuple2<Long, Long>> candidates, 
Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
-                       if (!current.iterator().hasNext()) {
+                       Iterator<Tuple2<Long, Long>> iterator = 
current.iterator();
+                       if (!iterator.hasNext()) {
                                throw new RuntimeException("Error: Id not 
encountered before.");
                        }
                        
-                       Tuple2<Long, Long> old = current.iterator().next();
+                       Tuple2<Long, Long> old = iterator.next();
                        
                        long minimumComponentID = Long.MAX_VALUE;
 

Reply via email to