[FLINK-2322] [streaming] Close file streams to release resources early. This closes #928
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0693c92b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0693c92b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0693c92b Branch: refs/heads/master Commit: 0693c92bdda655e1fbce232038909a7c2a385a22 Parents: fab61a1 Author: tedyu <[email protected]> Authored: Tue Jul 21 11:13:17 2015 -0700 Committer: Stephan Ewen <[email protected]> Committed: Mon Aug 3 18:48:07 2015 +0200 ---------------------------------------------------------------------- .../flink/api/java/sca/UdfAnalyzerUtils.java | 19 ++++++++++++++++--- .../flink/api/java/utils/ParameterTool.java | 4 +++- 2 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java index df1e421..dbfd29e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java @@ -32,6 +32,7 @@ import org.objectweb.asm.tree.analysis.BasicValue; import org.objectweb.asm.tree.analysis.Value; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; @@ -114,12 +115,14 @@ public final class UdfAnalyzerUtils { */ @SuppressWarnings("unchecked") public static Object[] findMethodNode(String internalClassName, String name, String desc) { + InputStream stream = null; try { // iterate through hierarchy and search for method node / // class that really implements the method while (internalClassName != null) { - ClassReader cr = new ClassReader(Thread.currentThread().getContextClassLoader() - .getResourceAsStream(internalClassName.replace('.', '/') + ".class")); + stream = Thread.currentThread().getContextClassLoader() + .getResourceAsStream(internalClassName.replace('.', '/') + ".class"); + ClassReader cr = new ClassReader(stream); final ClassNode cn = new ClassNode(); cr.accept(cn, 0); for (MethodNode mn : (List<MethodNode>) cn.methods) { @@ -129,9 +132,19 @@ public final class UdfAnalyzerUtils { } internalClassName = cr.getSuperName(); } - } catch (IOException e) { + } + catch (IOException e) { throw new IllegalStateException("Method '" + name + "' could not be found", e); } + finally { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + // best effort cleanup + } + } + } throw new IllegalStateException("Method '" + name + "' could not be found"); } http://git-wip-us.apache.org/repos/asf/flink/blob/0693c92b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java index 317dce4..b60a559 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java @@ -130,7 +130,9 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement throw new FileNotFoundException("Properties file "+path+" does not exist"); } Properties props = new Properties(); - props.load(new FileInputStream(propertiesFile)); + FileInputStream fis = new FileInputStream(propertiesFile); + props.load(fis); + fis.close(); return fromMap((Map)props); }
