Repository: flink Updated Branches: refs/heads/master ebfa0bdf5 -> 4500bfd77
[FLINK-8676] Ensure key stream is closed after backend#applyToAllKeys(). This closes #5513. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4500bfd7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4500bfd7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4500bfd7 Branch: refs/heads/master Commit: 4500bfd7729583c6a7cb5a6fb4f2d16ba8ba8cd4 Parents: ebfa0bd Author: sihuazhou <summerle...@163.com> Authored: Fri Feb 16 18:31:18 2018 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Feb 16 19:22:12 2018 +0100 ---------------------------------------------------------------------- .../state/AbstractKeyedStateBackend.java | 36 ++++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4500bfd7/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index d159d46..4898292 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -52,6 +52,7 @@ import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -289,24 +290,23 @@ public abstract class AbstractKeyedStateBackend<K> final StateDescriptor<S, T> stateDescriptor, final KeyedStateFunction<K, S> function) throws Exception { - try { - getKeys(stateDescriptor.getName(), namespace) - .forEach((K key) -> { - setCurrentKey(key); - try { - function.process( - key, - getPartitionedState( - namespace, - namespaceSerializer, - stateDescriptor) - ); - } catch (Throwable e) { - // we wrap the checked exception in an unchecked - // one and catch it (and re-throw it) later. - throw new RuntimeException(e); - } - }); + try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) { + keyStream.forEach((K key) -> { + setCurrentKey(key); + try { + function.process( + key, + getPartitionedState( + namespace, + namespaceSerializer, + stateDescriptor) + ); + } catch (Throwable e) { + // we wrap the checked exception in an unchecked + // one and catch it (and re-throw it) later. + throw new RuntimeException(e); + } + }); } catch (RuntimeException e) { throw e; }