[FLINK-2322] [streaming] Close file streams to release resources early.

This closes #928


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0693c92b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0693c92b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0693c92b

Branch: refs/heads/master
Commit: 0693c92bdda655e1fbce232038909a7c2a385a22
Parents: fab61a1
Author: tedyu <[email protected]>
Authored: Tue Jul 21 11:13:17 2015 -0700
Committer: Stephan Ewen <[email protected]>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/sca/UdfAnalyzerUtils.java     | 19 ++++++++++++++++---
 .../flink/api/java/utils/ParameterTool.java      |  4 +++-
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index df1e421..dbfd29e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -32,6 +32,7 @@ import org.objectweb.asm.tree.analysis.BasicValue;
 import org.objectweb.asm.tree.analysis.Value;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -114,12 +115,14 @@ public final class UdfAnalyzerUtils {
         */
        @SuppressWarnings("unchecked")
        public static Object[] findMethodNode(String internalClassName, String 
name, String desc) {
+               InputStream stream = null;
                try {
                        // iterate through hierarchy and search for method node 
/
                        // class that really implements the method
                        while (internalClassName != null) {
-                               ClassReader cr = new 
ClassReader(Thread.currentThread().getContextClassLoader()
-                                               
.getResourceAsStream(internalClassName.replace('.', '/') + ".class"));
+                               stream = 
Thread.currentThread().getContextClassLoader()
+                                               
.getResourceAsStream(internalClassName.replace('.', '/') + ".class");
+                               ClassReader cr = new ClassReader(stream);
                                final ClassNode cn = new ClassNode();
                                cr.accept(cn, 0);
                                for (MethodNode mn : (List<MethodNode>) 
cn.methods) {
@@ -129,9 +132,19 @@ public final class UdfAnalyzerUtils {
                                }
                                internalClassName = cr.getSuperName();
                        }
-               } catch (IOException e) {
+               }
+               catch (IOException e) {
                        throw new IllegalStateException("Method '" + name + "' 
could not be found", e);
                }
+               finally {
+                       if (stream != null) {
+                               try {
+                                       stream.close();
+                               } catch (IOException e) { 
+                                       // best effort cleanup
+                               }
+                       }
+               }
                throw new IllegalStateException("Method '" + name + "' could 
not be found");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 317dce4..b60a559 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -130,7 +130,9 @@ public class ParameterTool extends 
ExecutionConfig.GlobalJobParameters implement
                        throw new FileNotFoundException("Properties file 
"+path+" does not exist");
                }
                Properties props = new Properties();
-               props.load(new FileInputStream(propertiesFile));
+               FileInputStream fis = new FileInputStream(propertiesFile);
+               props.load(fis);
+               fis.close();
                return fromMap((Map)props);
        }
 

Reply via email to