je-ik commented on code in PR #30185:
URL: https://github.com/apache/beam/pull/30185#discussion_r1477891610


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java:
##########
@@ -159,91 +160,100 @@ private static <K, V> Partitioner getPartitioner(
    * @param <K> type of key iterator emits
    * @param <V> type of value iterator emits
    */
-  static class GroupByKeyIterator<K, V, W extends BoundedWindow>
+  abstract static class GroupByKeyIterator<K, V>
       implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
 
     private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
-    private final Coder<K> keyCoder;
-    private final WindowingStrategy<?, W> windowingStrategy;
-    private final FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
+    final Coder<K> keyCoder;
+    private ByteArray previousKey = null;
 
-    private boolean hasNext = true;
-    private ByteArray currentKey = null;
-
-    GroupByKeyIterator(
-        Iterator<Tuple2<ByteArray, byte[]>> inner,
-        Coder<K> keyCoder,
-        WindowingStrategy<?, W> windowingStrategy,
-        WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder)
-        throws Coder.NonDeterministicException {
+    GroupByKeyIterator(Iterator<Tuple2<ByteArray, byte[]>> inner, Coder<K> 
keyCoder) {
 
       this.inner = Iterators.peekingIterator(inner);
       this.keyCoder = keyCoder;
-      this.windowingStrategy = windowingStrategy;
-      this.windowedValueCoder = windowedValueCoder;
     }
 
     @Override
     public boolean hasNext() {
-      return hasNext;
+      return inner.hasNext();
     }
 
     @Override
     public WindowedValue<KV<K, Iterable<V>>> next() {
-      while (inner.hasNext()) {
-        final ByteArray nextKey = inner.peek()._1;
-        if (nextKey.equals(currentKey)) {
-          // we still did not see all values for a given key
+      while (hasNext()) {
+
+        final ByteArray currentKey = inner.peek()._1;
+
+        if (currentKey.equals(previousKey)) {
+          // inner iterator did not consume all values for a given key, we 
need to skip ahead until
+          // we find value for the next key
           inner.next();
           continue;
         }
-        currentKey = nextKey;
+        previousKey = currentKey;
+
         final WindowedValue<KV<K, V>> decodedItem = decodeItem(inner.peek());
         return decodedItem.withValue(
-            KV.of(decodedItem.getValue().getKey(), new ValueIterator(inner, 
currentKey)));
+            KV.of(
+                decodedItem.getValue().getKey(),
+                new Iterable<V>() {
+                  boolean consumed = false;
+
+                  @Override
+                  public Iterator<V> iterator() {
+                    if (consumed) {
+                      throw new IllegalStateException(
+                          "ValueIterator can't be iterated more than once 
otherwise there could be data lost");
+                    }
+                    consumed = true;
+                    return new AbstractIterator<V>() {
+
+                      @Override
+                      public V computeNext() {
+                        if (inner.hasNext() && 
inner.peek()._1.equals(currentKey)) {
+                          return decodeValue(inner.next()._2);
+                        }
+                        return endOfData();
+                      }
+                    };
+                  }
+                }));
       }
-      hasNext = false;
-      return null;
+      throw new NoSuchElementException();
     }
 
-    class ValueIterator implements Iterable<V> {
+    abstract V decodeValue(byte[] windowedValueBytes);
 
-      boolean consumed = false;
-      private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
-      private final ByteArray currentKey;
+    abstract WindowedValue<KV<K, V>> decodeItem(Tuple2<ByteArray, byte[]> 
item);
+  }
 
-      ValueIterator(PeekingIterator<Tuple2<ByteArray, byte[]>> inner, 
ByteArray currentKey) {
-        this.inner = inner;
-        this.currentKey = currentKey;
-      }
+  /**
+   * From Iterator<K, V> transform to <K, Iterator<V>> where V (value) 
contains W (bounded window).
+   */
+  static class WindowedGroupByKeyIterator<K, V, W extends BoundedWindow>
+      extends GroupByKeyIterator<K, V> {
+    private final WindowingStrategy<?, W> windowingStrategy;
+    private final FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
 
-      @Override
-      public Iterator<V> iterator() {
-        if (consumed) {
-          throw new IllegalStateException(
-              "ValueIterator can't be iterated more than once,"
-                  + "otherwise there could be data lost");
-        }
-        consumed = true;
-        return new AbstractIterator<V>() {
-          @Override
-          protected V computeNext() {
-            if (inner.hasNext() && currentKey.equals(inner.peek()._1)) {
-              return decodeValue(inner.next()._2);
-            }
-            return endOfData();
-          }
-        };
-      }
+    WindowedGroupByKeyIterator(
+        Iterator<Tuple2<ByteArray, byte[]>> inner,
+        Coder<K> keyCoder,
+        WindowingStrategy<?, W> windowingStrategy,
+        FullWindowedValueCoder<KV<K, V>> windowedValueCoder) {
+      super(inner, keyCoder);
+      this.windowingStrategy = windowingStrategy;
+      this.windowedValueCoder = windowedValueCoder;
     }
 
-    private V decodeValue(byte[] windowedValueBytes) {
+    @Override
+    V decodeValue(byte[] windowedValueBytes) {
       final WindowedValue<KV<K, V>> windowedValue =
           CoderHelpers.fromByteArray(windowedValueBytes, windowedValueCoder);
       return windowedValue.getValue().getValue();
     }
 
-    private WindowedValue<KV<K, V>> decodeItem(Tuple2<ByteArray, byte[]> item) 
{
+    @Override
+    WindowedValue<KV<K, V>> decodeItem(Tuple2<ByteArray, byte[]> item) {

Review Comment:
   maybe decodeWindowedValue?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java:
##########
@@ -159,91 +160,100 @@ private static <K, V> Partitioner getPartitioner(
    * @param <K> type of key iterator emits
    * @param <V> type of value iterator emits
    */
-  static class GroupByKeyIterator<K, V, W extends BoundedWindow>
+  abstract static class GroupByKeyIterator<K, V>
       implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
 
     private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
-    private final Coder<K> keyCoder;
-    private final WindowingStrategy<?, W> windowingStrategy;
-    private final FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
+    final Coder<K> keyCoder;
+    private ByteArray previousKey = null;
 
-    private boolean hasNext = true;
-    private ByteArray currentKey = null;
-
-    GroupByKeyIterator(
-        Iterator<Tuple2<ByteArray, byte[]>> inner,
-        Coder<K> keyCoder,
-        WindowingStrategy<?, W> windowingStrategy,
-        WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder)
-        throws Coder.NonDeterministicException {
+    GroupByKeyIterator(Iterator<Tuple2<ByteArray, byte[]>> inner, Coder<K> 
keyCoder) {
 
       this.inner = Iterators.peekingIterator(inner);
       this.keyCoder = keyCoder;
-      this.windowingStrategy = windowingStrategy;
-      this.windowedValueCoder = windowedValueCoder;
     }
 
     @Override
     public boolean hasNext() {
-      return hasNext;
+      return inner.hasNext();
     }
 
     @Override
     public WindowedValue<KV<K, Iterable<V>>> next() {
-      while (inner.hasNext()) {
-        final ByteArray nextKey = inner.peek()._1;
-        if (nextKey.equals(currentKey)) {
-          // we still did not see all values for a given key
+      while (hasNext()) {
+
+        final ByteArray currentKey = inner.peek()._1;
+
+        if (currentKey.equals(previousKey)) {
+          // inner iterator did not consume all values for a given key, we 
need to skip ahead until
+          // we find value for the next key

Review Comment:
   We should not silently drop data here. We can:
    - either throw exception as this indicates error in the processing logic, or
    - return Iterator with the same key, which would still cause error in the 
runner logic, but at least would not loose data and would be consistent with 
Iterator contract
    
    I'm a little in favor of the first option to fail-fast.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java:
##########
@@ -159,91 +160,100 @@ private static <K, V> Partitioner getPartitioner(
    * @param <K> type of key iterator emits
    * @param <V> type of value iterator emits
    */
-  static class GroupByKeyIterator<K, V, W extends BoundedWindow>
+  abstract static class GroupByKeyIterator<K, V>
       implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
 
     private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
-    private final Coder<K> keyCoder;
-    private final WindowingStrategy<?, W> windowingStrategy;
-    private final FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
+    final Coder<K> keyCoder;
+    private ByteArray previousKey = null;
 
-    private boolean hasNext = true;
-    private ByteArray currentKey = null;
-
-    GroupByKeyIterator(
-        Iterator<Tuple2<ByteArray, byte[]>> inner,
-        Coder<K> keyCoder,
-        WindowingStrategy<?, W> windowingStrategy,
-        WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder)
-        throws Coder.NonDeterministicException {
+    GroupByKeyIterator(Iterator<Tuple2<ByteArray, byte[]>> inner, Coder<K> 
keyCoder) {
 
       this.inner = Iterators.peekingIterator(inner);
       this.keyCoder = keyCoder;
-      this.windowingStrategy = windowingStrategy;
-      this.windowedValueCoder = windowedValueCoder;
     }
 
     @Override
     public boolean hasNext() {
-      return hasNext;
+      return inner.hasNext();
     }
 
     @Override
     public WindowedValue<KV<K, Iterable<V>>> next() {
-      while (inner.hasNext()) {
-        final ByteArray nextKey = inner.peek()._1;
-        if (nextKey.equals(currentKey)) {
-          // we still did not see all values for a given key
+      while (hasNext()) {
+
+        final ByteArray currentKey = inner.peek()._1;
+
+        if (currentKey.equals(previousKey)) {
+          // inner iterator did not consume all values for a given key, we 
need to skip ahead until
+          // we find value for the next key
           inner.next();
           continue;
         }
-        currentKey = nextKey;
+        previousKey = currentKey;
+
         final WindowedValue<KV<K, V>> decodedItem = decodeItem(inner.peek());
         return decodedItem.withValue(
-            KV.of(decodedItem.getValue().getKey(), new ValueIterator(inner, 
currentKey)));
+            KV.of(
+                decodedItem.getValue().getKey(),
+                new Iterable<V>() {
+                  boolean consumed = false;
+
+                  @Override
+                  public Iterator<V> iterator() {
+                    if (consumed) {
+                      throw new IllegalStateException(
+                          "ValueIterator can't be iterated more than once 
otherwise there could be data lost");
+                    }
+                    consumed = true;
+                    return new AbstractIterator<V>() {
+
+                      @Override
+                      public V computeNext() {
+                        if (inner.hasNext() && 
inner.peek()._1.equals(currentKey)) {

Review Comment:
   This implies that Key must use deterministic Coder, otherwise this might not 
work. Is there any check that the Coder is deterministic before resorting to 
this optimization?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java:
##########
@@ -159,91 +160,100 @@ private static <K, V> Partitioner getPartitioner(
    * @param <K> type of key iterator emits
    * @param <V> type of value iterator emits
    */
-  static class GroupByKeyIterator<K, V, W extends BoundedWindow>
+  abstract static class GroupByKeyIterator<K, V>
       implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
 
     private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
-    private final Coder<K> keyCoder;
-    private final WindowingStrategy<?, W> windowingStrategy;
-    private final FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
+    final Coder<K> keyCoder;
+    private ByteArray previousKey = null;
 
-    private boolean hasNext = true;
-    private ByteArray currentKey = null;
-
-    GroupByKeyIterator(
-        Iterator<Tuple2<ByteArray, byte[]>> inner,
-        Coder<K> keyCoder,
-        WindowingStrategy<?, W> windowingStrategy,
-        WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder)
-        throws Coder.NonDeterministicException {
+    GroupByKeyIterator(Iterator<Tuple2<ByteArray, byte[]>> inner, Coder<K> 
keyCoder) {
 
       this.inner = Iterators.peekingIterator(inner);
       this.keyCoder = keyCoder;
-      this.windowingStrategy = windowingStrategy;
-      this.windowedValueCoder = windowedValueCoder;
     }
 
     @Override
     public boolean hasNext() {
-      return hasNext;
+      return inner.hasNext();
     }
 
     @Override
     public WindowedValue<KV<K, Iterable<V>>> next() {
-      while (inner.hasNext()) {
-        final ByteArray nextKey = inner.peek()._1;
-        if (nextKey.equals(currentKey)) {
-          // we still did not see all values for a given key
+      while (hasNext()) {
+
+        final ByteArray currentKey = inner.peek()._1;
+
+        if (currentKey.equals(previousKey)) {
+          // inner iterator did not consume all values for a given key, we 
need to skip ahead until
+          // we find value for the next key
           inner.next();
           continue;
         }
-        currentKey = nextKey;
+        previousKey = currentKey;
+
         final WindowedValue<KV<K, V>> decodedItem = decodeItem(inner.peek());
         return decodedItem.withValue(
-            KV.of(decodedItem.getValue().getKey(), new ValueIterator(inner, 
currentKey)));
+            KV.of(
+                decodedItem.getValue().getKey(),
+                new Iterable<V>() {
+                  boolean consumed = false;
+
+                  @Override
+                  public Iterator<V> iterator() {
+                    if (consumed) {
+                      throw new IllegalStateException(
+                          "ValueIterator can't be iterated more than once 
otherwise there could be data lost");
+                    }
+                    consumed = true;
+                    return new AbstractIterator<V>() {
+
+                      @Override
+                      public V computeNext() {
+                        if (inner.hasNext() && 
inner.peek()._1.equals(currentKey)) {
+                          return decodeValue(inner.next()._2);
+                        }
+                        return endOfData();

Review Comment:
   +1



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java:
##########
@@ -159,91 +160,100 @@ private static <K, V> Partitioner getPartitioner(
    * @param <K> type of key iterator emits
    * @param <V> type of value iterator emits
    */
-  static class GroupByKeyIterator<K, V, W extends BoundedWindow>
+  abstract static class GroupByKeyIterator<K, V>
       implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {

Review Comment:
   We do not want to use AbstractIterator here? It might make the logic a 
little simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to