sjwiesman commented on a change in pull request #13445:
URL: https://github.com/apache/flink/pull/13445#discussion_r493628555
##########
File path:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java
##########
@@ -37,23 +40,33 @@
* @param <K> Type of the key by which state is keyed.
*/
@Internal
-public final class MultiStateKeyIterator<K> implements Iterator<K> {
+public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
private final List<? extends StateDescriptor<?, ?>> descriptors;
private final KeyedStateBackend<K> backend;
private final Iterator<K> internal;
+ private final CloseableRegistry registry;
+
private K currentKey;
public MultiStateKeyIterator(List<? extends StateDescriptor<?, ?>>
descriptors, KeyedStateBackend<K> backend) {
this.descriptors = Preconditions.checkNotNull(descriptors);
-
this.backend = Preconditions.checkNotNull(backend);
+ this.registry = new CloseableRegistry();
this.internal = descriptors
.stream()
- .flatMap(descriptor ->
backend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE))
+ .map(descriptor ->
backend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE))
+ .peek(stream -> {
+ try {
+
registry.registerCloseable(stream::close);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to
read keys from configured StateBackend", e);
Review comment:
In this case I have to wrap it in an unchecked exception because
`Stream#peek` does not allow for checked exceptions. Otherwise I would agree.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]