masteryhx commented on code in PR #23239:
URL: https://github.com/apache/flink/pull/23239#discussion_r1311067683


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -46,47 +55,59 @@ public final class MultiStateKeyIterator<K> implements 
CloseableIterator<K> {
 
     private final KeyedStateBackend<K> backend;
 
-    private final Iterator<K> internal;
+    private Iterator<? extends StateDescriptor<?, ?>> outerIter;
+    private Iterator<K> innerIter;
 
     private final CloseableRegistry registry;
 
     private K currentKey;
 
     public MultiStateKeyIterator(
             List<? extends StateDescriptor<?, ?>> descriptors, 
KeyedStateBackend<K> backend) {
+
+        outerIter = descriptors.iterator();
+        innerIter = null;
+
         this.descriptors = Preconditions.checkNotNull(descriptors);
         this.backend = Preconditions.checkNotNull(backend);
 
         this.registry = new CloseableRegistry();
-        this.internal =
-                descriptors.stream()
-                        .map(
-                                descriptor ->
-                                        backend.getKeys(
-                                                descriptor.getName(), 
VoidNamespace.INSTANCE))
-                        .peek(
-                                stream -> {
-                                    try {
-                                        
registry.registerCloseable(stream::close);
-                                    } catch (IOException e) {
-                                        throw new RuntimeException(
-                                                "Failed to read keys from 
configured StateBackend",
-                                                e);
-                                    }
-                                })
-                        .flatMap(stream -> stream)
-                        .iterator();
     }
 
     @Override
     public boolean hasNext() {

Review Comment:
   How about:
   ```
   
       @Override
       public boolean hasNext() {
           while (innerIter == null || !innerIter.hasNext()) {
               if (!outerIter.hasNext()) {
                   return false;
               }
   
               StateDescriptor<?, ?> descriptor = outerIter.next();
               Stream<K> stream =
                       backend.getKeys(descriptor.getName(), 
VoidNamespace.INSTANCE);
               innerIter = stream.iterator();
               try {
                   registry.registerCloseable(stream::close);
               } catch (IOException e) {
                   throw new RuntimeException(
                           "Failed to read keys from configured StateBackend", 
e);
               }
           }
           return true;
       }
   ```



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {

Review Comment:
   IIUC, you want to use this case to verify the key numer you iterator is 
correct ?
   So you should iterator until it doesn't hasNext, right ?



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,13 +31,22 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>

Review Comment:
   IMO, this comment about why we update the logic is not necessary, which 
could be found in the Jira Ticket.
   Or you could just add simple description before outerIter, e.g. : "Avoid 
using Stream#flatMap due to xxx, see FLINK-26585 for more details"



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -46,47 +55,59 @@ public final class MultiStateKeyIterator<K> implements 
CloseableIterator<K> {
 
     private final KeyedStateBackend<K> backend;
 
-    private final Iterator<K> internal;
+    private Iterator<? extends StateDescriptor<?, ?>> outerIter;

Review Comment:
   Could be "final" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to