masteryhx commented on code in PR #23239:
URL: https://github.com/apache/flink/pull/23239#discussion_r1305077917


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
+public class MultiStateKeyIterator<K> implements CloseableIterator<K> {
     private final List<? extends StateDescriptor<?, ?>> descriptors;
 
     private final KeyedStateBackend<K> backend;
 
-    private final Iterator<K> internal;
+    private Iterator<? extends StateDescriptor<?, ?>> outerIter;
+    private Iterator<K> innerIter;
 
     private final CloseableRegistry registry;
 
     private K currentKey;
 
     public MultiStateKeyIterator(
             List<? extends StateDescriptor<?, ?>> descriptors, 
KeyedStateBackend<K> backend) {
+
+        outerIter = descriptors.iterator();
+        innerIter = null;
+
         this.descriptors = Preconditions.checkNotNull(descriptors);
         this.backend = Preconditions.checkNotNull(backend);
 
         this.registry = new CloseableRegistry();
-        this.internal =
-                descriptors.stream()
-                        .map(
-                                descriptor ->
-                                        backend.getKeys(
-                                                descriptor.getName(), 
VoidNamespace.INSTANCE))
-                        .peek(
-                                stream -> {
-                                    try {
-                                        
registry.registerCloseable(stream::close);
-                                    } catch (IOException e) {
-                                        throw new RuntimeException(
-                                                "Failed to read keys from 
configured StateBackend",
-                                                e);
-                                    }
-                                })
-                        .flatMap(stream -> stream)
-                        .iterator();
+    }
+
+    @Override
+    public void close() throws Exception {
+        registry.close();
     }
 
     @Override
     public boolean hasNext() {
-        return internal.hasNext();
+        if ((innerIter == null || !innerIter.hasNext()) && 
outerIter.hasNext()) {
+            StateDescriptor<?, ?> descriptor = outerIter.next();
+            Stream<K> stream = backend.getKeys(descriptor.getName(), 
VoidNamespace.INSTANCE);
+            innerIter = stream.iterator();
+            try {
+                registry.registerCloseable(stream::close);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to read keys from 
configured StateBackend", e);
+            }
+        }
+        if (innerIter == null) {
+            return false;
+        }
+        return innerIter.hasNext();

Review Comment:
   IIUC, this is incorrect.
   After first if, innerIter maybe an empty iterator (due to lacking related 
states or empty states) but we could not say it doesn't hasNext (outerIter may 
still hasNext)



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {

Review Comment:
   Why the change is needed ?



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {

Review Comment:
   I may missed something.
   This UT may still not complete, right ?
   Could you explain what we are verified from L197 - L202 ?



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -111,9 +128,4 @@ public void remove() {
             }
         }
     }
-
-    @Override
-    public void close() throws Exception {

Review Comment:
   We could avoid moving codes.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
+public class MultiStateKeyIterator<K> implements CloseableIterator<K> {
     private final List<? extends StateDescriptor<?, ?>> descriptors;
 
     private final KeyedStateBackend<K> backend;
 
-    private final Iterator<K> internal;
+    private Iterator<? extends StateDescriptor<?, ?>> outerIter;
+    private Iterator<K> innerIter;
 
     private final CloseableRegistry registry;
 
     private K currentKey;
 
     public MultiStateKeyIterator(
             List<? extends StateDescriptor<?, ?>> descriptors, 
KeyedStateBackend<K> backend) {
+
+        outerIter = descriptors.iterator();
+        innerIter = null;
+
         this.descriptors = Preconditions.checkNotNull(descriptors);
         this.backend = Preconditions.checkNotNull(backend);
 
         this.registry = new CloseableRegistry();
-        this.internal =
-                descriptors.stream()
-                        .map(
-                                descriptor ->
-                                        backend.getKeys(
-                                                descriptor.getName(), 
VoidNamespace.INSTANCE))
-                        .peek(
-                                stream -> {
-                                    try {
-                                        
registry.registerCloseable(stream::close);
-                                    } catch (IOException e) {
-                                        throw new RuntimeException(
-                                                "Failed to read keys from 
configured StateBackend",
-                                                e);
-                                    }
-                                })
-                        .flatMap(stream -> stream)
-                        .iterator();
+    }
+
+    @Override
+    public void close() throws Exception {
+        registry.close();
     }
 
     @Override
     public boolean hasNext() {
-        return internal.hasNext();
+        if ((innerIter == null || !innerIter.hasNext()) && 
outerIter.hasNext()) {
+            StateDescriptor<?, ?> descriptor = outerIter.next();
+            Stream<K> stream = backend.getKeys(descriptor.getName(), 
VoidNamespace.INSTANCE);
+            innerIter = stream.iterator();
+            try {
+                registry.registerCloseable(stream::close);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to read keys from 
configured StateBackend", e);
+            }
+        }
+        if (innerIter == null) {
+            return false;
+        }
+        return innerIter.hasNext();
     }
 
     @Override
     public K next() {
-        currentKey = internal.next();
-        return currentKey;
+        if (!this.hasNext()) {

Review Comment:
   It's not necessary due to contract of `Iterator`.
   This will increase unnecessary cost when using iterator.



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {
+        CountingManyKeysKeyedStateBackend keyedStateBackend =
+                createManyKeysKeyedStateBackend(100_000_000);
+        MultiStateKeyIterator<Integer> testedIterator =
+                new MultiStateKeyIterator<>(descriptors, keyedStateBackend);
+
+        boolean hasnext = testedIterator.hasNext();
+
+        Assert.assertEquals(
+                "Unexpected number of keys enumerated",
+                1,
+                keyedStateBackend.numberOfKeysEnumerated);
+    }
+
+    /**
+     * Mockup {@link AbstractKeyedStateBackend} that counts how many keys are 
enumerated.
+     *
+     * <p>Generates a configured nmber of integer keys, only method actually 
implemented is {@link
+     * CountingManyKeysKeyedStateBackend#getKeys(java.lang.String, 
java.lang.Object)}
+     */
+    static class CountingManyKeysKeyedStateBackend extends 
AbstractKeyedStateBackend<Integer> {

Review Comment:
   How about renaming to `CountingKeyNumberKeyedStateBackend` or just 
`CountingKeysKeyedStateBackend` ?



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
+public class MultiStateKeyIterator<K> implements CloseableIterator<K> {
     private final List<? extends StateDescriptor<?, ?>> descriptors;
 
     private final KeyedStateBackend<K> backend;
 
-    private final Iterator<K> internal;
+    private Iterator<? extends StateDescriptor<?, ?>> outerIter;
+    private Iterator<K> innerIter;
 
     private final CloseableRegistry registry;
 
     private K currentKey;
 
     public MultiStateKeyIterator(
             List<? extends StateDescriptor<?, ?>> descriptors, 
KeyedStateBackend<K> backend) {
+
+        outerIter = descriptors.iterator();
+        innerIter = null;
+
         this.descriptors = Preconditions.checkNotNull(descriptors);
         this.backend = Preconditions.checkNotNull(backend);
 
         this.registry = new CloseableRegistry();
-        this.internal =
-                descriptors.stream()
-                        .map(
-                                descriptor ->
-                                        backend.getKeys(
-                                                descriptor.getName(), 
VoidNamespace.INSTANCE))
-                        .peek(
-                                stream -> {
-                                    try {
-                                        
registry.registerCloseable(stream::close);
-                                    } catch (IOException e) {
-                                        throw new RuntimeException(
-                                                "Failed to read keys from 
configured StateBackend",
-                                                e);
-                                    }
-                                })
-                        .flatMap(stream -> stream)
-                        .iterator();
+    }
+
+    @Override
+    public void close() throws Exception {
+        registry.close();
     }
 
     @Override
     public boolean hasNext() {
-        return internal.hasNext();
+        if ((innerIter == null || !innerIter.hasNext()) && 
outerIter.hasNext()) {
+            StateDescriptor<?, ?> descriptor = outerIter.next();
+            Stream<K> stream = backend.getKeys(descriptor.getName(), 
VoidNamespace.INSTANCE);
+            innerIter = stream.iterator();
+            try {
+                registry.registerCloseable(stream::close);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to read keys from 
configured StateBackend", e);
+            }
+        }
+        if (innerIter == null) {
+            return false;
+        }
+        return innerIter.hasNext();
     }
 
     @Override
     public K next() {
-        currentKey = internal.next();
-        return currentKey;
+        if (!this.hasNext()) {
+            throw new NoSuchElementException();
+        } else {
+            currentKey = this.innerIter.next();
+            return currentKey;
+        }
     }
 
-    /** Removes the current key from <b>ALL</b> known states in the state 
backend. */

Review Comment:
   Why removing this comment ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to