soin08 commented on code in PR #28142:
URL: https://github.com/apache/flink/pull/28142#discussion_r3272791705


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
##########
@@ -229,6 +289,48 @@ public OUT nextRecord(OUT reuse) throws IOException {
         return out.next();
     }
 
+    private static <K, N> CloseableIterator<Tuple2<K, N>> applyKeyFilter(
+            CloseableIterator<Tuple2<K, N>> source, SavepointKeyFilter filter) 
{
+        return new CloseableIterator<Tuple2<K, N>>() {
+            @Nullable private Tuple2<K, N> pending = null;
+
+            @Override
+            public boolean hasNext() {
+                while (pending == null && source.hasNext()) {
+                    Tuple2<K, N> candidate = source.next();
+                    if (filter.test(candidate.f0)) {
+                        pending = candidate;
+                    }
+                }
+                return pending != null;
+            }
+
+            @Override
+            public Tuple2<K, N> next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                Tuple2<K, N> result = pending;
+                pending = null;
+                return result;
+            }
+
+            @Override
+            public void close() throws Exception {
+                source.close();
+            }
+        };
+    }

Review Comment:
   since we generalize this iterator, would it make sense to place it in the 
`flink-core` package in `org.apache.flink.util` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to