Repository: flink
Updated Branches:
  refs/heads/master ebfa0bdf5 -> 4500bfd77


[FLINK-8676] Ensure key stream is closed after backend#applyToAllKeys().

This closes #5513.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4500bfd7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4500bfd7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4500bfd7

Branch: refs/heads/master
Commit: 4500bfd7729583c6a7cb5a6fb4f2d16ba8ba8cd4
Parents: ebfa0bd
Author: sihuazhou <summerle...@163.com>
Authored: Fri Feb 16 18:31:18 2018 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Feb 16 19:22:12 2018 +0100

----------------------------------------------------------------------
 .../state/AbstractKeyedStateBackend.java        | 36 ++++++++++----------
 1 file changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4500bfd7/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index d159d46..4898292 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -52,6 +52,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -289,24 +290,23 @@ public abstract class AbstractKeyedStateBackend<K>
                        final StateDescriptor<S, T> stateDescriptor,
                        final KeyedStateFunction<K, S> function) throws 
Exception {
 
-               try {
-                       getKeys(stateDescriptor.getName(), namespace)
-                                       .forEach((K key) -> {
-                                               setCurrentKey(key);
-                                               try {
-                                                       function.process(
-                                                                       key,
-                                                                       
getPartitionedState(
-                                                                               
        namespace,
-                                                                               
        namespaceSerializer,
-                                                                               
        stateDescriptor)
-                                                       );
-                                               } catch (Throwable e) {
-                                                       // we wrap the checked 
exception in an unchecked
-                                                       // one and catch it 
(and re-throw it) later.
-                                                       throw new 
RuntimeException(e);
-                                               }
-                                       });
+               try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), 
namespace)) {
+                       keyStream.forEach((K key) -> {
+                               setCurrentKey(key);
+                               try {
+                                       function.process(
+                                               key,
+                                               getPartitionedState(
+                                                       namespace,
+                                                       namespaceSerializer,
+                                                       stateDescriptor)
+                                       );
+                               } catch (Throwable e) {
+                                       // we wrap the checked exception in an 
unchecked
+                                       // one and catch it (and re-throw it) 
later.
+                                       throw new RuntimeException(e);
+                               }
+                       });
                } catch (RuntimeException e) {
                        throw e;
                }

Reply via email to