StephanEwen commented on a change in pull request #13445:
URL: https://github.com/apache/flink/pull/13445#discussion_r493432824



##########
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java
##########
@@ -37,23 +40,33 @@
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements Iterator<K> {
+public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
        private final List<? extends StateDescriptor<?, ?>> descriptors;
 
        private final KeyedStateBackend<K> backend;
 
        private final Iterator<K> internal;
 
+       private final CloseableRegistry registry;
+
        private K currentKey;
 
        public MultiStateKeyIterator(List<? extends StateDescriptor<?, ?>> 
descriptors, KeyedStateBackend<K> backend) {
                this.descriptors = Preconditions.checkNotNull(descriptors);
-
                this.backend = Preconditions.checkNotNull(backend);
 
+               this.registry = new CloseableRegistry();
                this.internal = descriptors
                        .stream()
-                       .flatMap(descriptor -> 
backend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE))
+                       .map(descriptor -> 
backend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE))
+                       .peek(stream -> {
+                               try {
+                                       
registry.registerCloseable(stream::close);
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Failed to 
read keys from configured StateBackend", e);

Review comment:
       Maybe worth double-checking whether it is better to declare the 
IOException in the `throws` signature than wrapping a checked exception as 
`RuntimeException`. Depends what the exception philosophy of the remaining 
module is (pure unchecked or checked where applicable).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to