Repository: storm
Updated Branches:
  refs/heads/master aaebc3b23 -> 4c9a864f3


STORM-2679: Need to close resources and streams after use

* This closes #2262


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d364270b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d364270b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d364270b

Branch: refs/heads/master
Commit: d364270b933cf59e9eaa845cc37af26a4ea7f63f
Parents: aaebc3b
Author: vinodkc <vinod.kc...@gmail.com>
Authored: Sun Aug 6 14:55:10 2017 +0530
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Wed Aug 9 13:37:40 2017 +0900

----------------------------------------------------------------------
 .../storm/cassandra/Murmur3StreamGrouping.java  |   2 +-
 .../apache/storm/jdbc/common/JdbcClient.java    | 124 ++++++++++---------
 .../apache/storm/flux/parser/FluxParser.java    |  11 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |   2 +-
 .../org/apache/storm/security/auth/AutoSSL.java |   8 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   2 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |   4 +-
 .../org/apache/storm/utils/ServerUtils.java     |   2 +-
 .../java/org/apache/storm/LocalStateTest.java   |   2 +-
 .../apache/storm/blobstore/BlobStoreTest.java   |   2 +-
 10 files changed, 82 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
index 9fbed68..774955a 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -117,7 +117,7 @@ public class Murmur3StreamGrouping implements 
CustomStreamGrouping {
     @VisibleForTesting
     public static long hashes(List<Object> values) throws IOException {
         byte[] keyBytes;
-        try(ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
DataOutputStream out = new DataOutputStream(bos)) {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
DataOutputStream out = new DataOutputStream(bos)) {
             for(Object key : values) {
                 byte[] arr = ((String)key).getBytes("UTF-8");
                 out.writeShort(arr.length);

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 228babe..00f8a6e 100644
--- 
a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ 
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -56,25 +56,26 @@ public class JdbcClient {
 
             LOG.debug("Executing query {}", query);
 
-            PreparedStatement preparedStatement = 
connection.prepareStatement(query);
-            if(queryTimeoutSecs > 0) {
-                preparedStatement.setQueryTimeout(queryTimeoutSecs);
-            }
+            try (PreparedStatement preparedStatement = 
connection.prepareStatement(query)) {
+                if (queryTimeoutSecs > 0) {
+                    preparedStatement.setQueryTimeout(queryTimeoutSecs);
+                }
 
-            for(List<Column> columnList : columnLists) {
-                setPreparedStatementParams(preparedStatement, columnList);
-                preparedStatement.addBatch();
-            }
+                for (List<Column> columnList : columnLists) {
+                    setPreparedStatementParams(preparedStatement, columnList);
+                    preparedStatement.addBatch();
+                }
 
-            int[] results = preparedStatement.executeBatch();
-            if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) {
-                connection.rollback();
-                throw new RuntimeException("failed at least one sql statement 
in the batch, operation rolled back.");
-            } else {
-                try {
-                    connection.commit();
-                } catch (SQLException e) {
-                    throw new RuntimeException("Failed to commit insert query 
" + query, e);
+                int[] results = preparedStatement.executeBatch();
+                if (Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) 
{
+                    connection.rollback();
+                    throw new RuntimeException("failed at least one sql 
statement in the batch, operation rolled back.");
+                } else {
+                    try {
+                        connection.commit();
+                    } catch (SQLException e) {
+                        throw new RuntimeException("Failed to commit insert 
query " + query, e);
+                    }
                 }
             }
         } catch (SQLException e) {
@@ -106,50 +107,52 @@ public class JdbcClient {
         Connection connection = null;
         try {
             connection = connectionProvider.getConnection();
-            PreparedStatement preparedStatement = 
connection.prepareStatement(sqlQuery);
-            if(queryTimeoutSecs > 0) {
-                preparedStatement.setQueryTimeout(queryTimeoutSecs);
-            }
-            setPreparedStatementParams(preparedStatement, queryParams);
-            ResultSet resultSet = preparedStatement.executeQuery();
-            List<List<Column>> rows = Lists.newArrayList();
-            while(resultSet.next()){
-                ResultSetMetaData metaData = resultSet.getMetaData();
-                int columnCount = metaData.getColumnCount();
-                List<Column> row = Lists.newArrayList();
-                for(int i=1 ; i <= columnCount; i++) {
-                    String columnLabel = metaData.getColumnLabel(i);
-                    int columnType = metaData.getColumnType(i);
-                    Class columnJavaType = Util.getJavaType(columnType);
-                    if (columnJavaType.equals(String.class)) {
-                        row.add(new Column<String>(columnLabel, 
resultSet.getString(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Integer.class)) {
-                        row.add(new Column<Integer>(columnLabel, 
resultSet.getInt(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Double.class)) {
-                        row.add(new Column<Double>(columnLabel, 
resultSet.getDouble(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Float.class)) {
-                        row.add(new Column<Float>(columnLabel, 
resultSet.getFloat(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Short.class)) {
-                        row.add(new Column<Short>(columnLabel, 
resultSet.getShort(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Boolean.class)) {
-                        row.add(new Column<Boolean>(columnLabel, 
resultSet.getBoolean(columnLabel), columnType));
-                    } else if (columnJavaType.equals(byte[].class)) {
-                        row.add(new Column<byte[]>(columnLabel, 
resultSet.getBytes(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Long.class)) {
-                        row.add(new Column<Long>(columnLabel, 
resultSet.getLong(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Date.class)) {
-                        row.add(new Column<Date>(columnLabel, 
resultSet.getDate(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Time.class)) {
-                        row.add(new Column<Time>(columnLabel, 
resultSet.getTime(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Timestamp.class)) {
-                        row.add(new Column<Timestamp>(columnLabel, 
resultSet.getTimestamp(columnLabel), columnType));
-                    } else {
-                        throw new RuntimeException("type =  " + columnType + " 
for column " + columnLabel + " not supported.");
+            try (PreparedStatement preparedStatement = 
connection.prepareStatement(sqlQuery)) {
+                if (queryTimeoutSecs > 0) {
+                    preparedStatement.setQueryTimeout(queryTimeoutSecs);
+                }
+                setPreparedStatementParams(preparedStatement, queryParams);
+                try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                    List<List<Column>> rows = Lists.newArrayList();
+                    while (resultSet.next()) {
+                        ResultSetMetaData metaData = resultSet.getMetaData();
+                        int columnCount = metaData.getColumnCount();
+                        List<Column> row = Lists.newArrayList();
+                        for (int i = 1; i <= columnCount; i++) {
+                            String columnLabel = metaData.getColumnLabel(i);
+                            int columnType = metaData.getColumnType(i);
+                            Class columnJavaType = 
Util.getJavaType(columnType);
+                            if (columnJavaType.equals(String.class)) {
+                                row.add(new Column<String>(columnLabel, 
resultSet.getString(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Integer.class)) {
+                                row.add(new Column<Integer>(columnLabel, 
resultSet.getInt(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Double.class)) {
+                                row.add(new Column<Double>(columnLabel, 
resultSet.getDouble(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Float.class)) {
+                                row.add(new Column<Float>(columnLabel, 
resultSet.getFloat(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Short.class)) {
+                                row.add(new Column<Short>(columnLabel, 
resultSet.getShort(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Boolean.class)) {
+                                row.add(new Column<Boolean>(columnLabel, 
resultSet.getBoolean(columnLabel), columnType));
+                            } else if (columnJavaType.equals(byte[].class)) {
+                                row.add(new Column<byte[]>(columnLabel, 
resultSet.getBytes(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Long.class)) {
+                                row.add(new Column<Long>(columnLabel, 
resultSet.getLong(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Date.class)) {
+                                row.add(new Column<Date>(columnLabel, 
resultSet.getDate(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Time.class)) {
+                                row.add(new Column<Time>(columnLabel, 
resultSet.getTime(columnLabel), columnType));
+                            } else if (columnJavaType.equals(Timestamp.class)) 
{
+                                row.add(new Column<Timestamp>(columnLabel, 
resultSet.getTimestamp(columnLabel), columnType));
+                            } else {
+                                throw new RuntimeException("type =  " + 
columnType + " for column " + columnLabel + " not supported.");
+                            }
+                        }
+                        rows.add(row);
                     }
+                    return rows;
                 }
-                rows.add(row);
             }
-            return rows;
         } catch (SQLException e) {
             throw new RuntimeException("Failed to execute select query " + 
sqlQuery, e);
         } finally {
@@ -179,8 +182,9 @@ public class JdbcClient {
         Connection connection = null;
         try {
             connection = connectionProvider.getConnection();
-            Statement statement = connection.createStatement();
-            statement.execute(sql);
+            try (Statement statement = connection.createStatement()) {
+                statement.execute(sql);
+            }
         } catch (SQLException e) {
             throw new RuntimeException("Failed to execute SQL", e);
         } finally {

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git 
a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java 
b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 35904a2..1bb018a 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -96,11 +96,12 @@ public class FluxParser {
         // properties file substitution
         if(propsFile != null){
             LOG.info("Performing property substitution.");
-            InputStream propsIn = new FileInputStream(propsFile);
-            Properties props = new Properties();
-            props.load(propsIn);
-            for(Object key : props.keySet()){
-                str = str.replace("${" + key + "}", 
props.getProperty((String)key));
+            try (InputStream propsIn = new FileInputStream(propsFile)) {
+                Properties props = new Properties();
+                props.load(propsIn);
+                for (Object key : props.keySet()) {
+                    str = str.replace("${" + key + "}", 
props.getProperty((String) key));
+                }
             }
         } else {
             LOG.info("Not performing property substitution.");

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java 
b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index e73f60e..0ad5399 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -135,7 +135,7 @@ public class StormSubmitter {
                 LOG.info("Pushing Credentials to topology {} in local mode", 
name);
                 localNimbus.uploadNewCredentials(name, new 
Credentials(fullCreds));
             } else {
-                try(NimbusClient client = 
NimbusClient.getConfiguredClient(conf)) {
+                try (NimbusClient client = 
NimbusClient.getConfiguredClient(conf)) {
                     LOG.info("Uploading new credentials to {}", name);
                     client.getClient().uploadNewCredentials(name, new 
Credentials(fullCreds));
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
index aa68153..21aebe5 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
@@ -99,8 +99,7 @@ public class AutoSSL implements IAutoCredentials {
     // Adds the serialized and base64 file to the credentials map as a string 
with the filename as
     // the key.
     public static void serializeSSLFile(String readFile, Map<String, String> 
credentials) {
-        try {
-            FileInputStream in = new FileInputStream(readFile);
+        try (FileInputStream in = new FileInputStream(readFile)) {
             LOG.debug("serializing ssl file: {}", readFile);
             ByteArrayOutputStream result = new ByteArrayOutputStream();
             byte[] buffer = new byte[4096];
@@ -133,8 +132,9 @@ public class AutoSSL implements IAutoCredentials {
             if (resultStr != null) {
                 byte[] decodedData = 
DatatypeConverter.parseBase64Binary(resultStr);
                 File f = new File(directory, credsKey);
-                FileOutputStream fout = new FileOutputStream(f);
-                fout.write(decodedData);
+                try (FileOutputStream fout = new FileOutputStream(f)) {
+                    fout.write(decodedData);
+                }
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 28f7e10..51790c2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1208,7 +1208,7 @@ public class Utils {
      */
     public static int getAvailablePort(int preferredPort) {
         int localPort = -1;
-        try(ServerSocket socket = new ServerSocket(preferredPort)) {
+        try (ServerSocket socket = new ServerSocket(preferredPort)) {
             localPort = socket.getLocalPort();
         } catch(IOException exp) {
             if (preferredPort > 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index 9be90bc..d6412a8 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -126,7 +126,7 @@ public class BlobStoreUtils {
                 break;
             }
             LOG.debug("Download blob key: {}, NimbusInfo {}", key, nimbusInfo);
-            try(NimbusClient client = new NimbusClient(conf, 
nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
+            try (NimbusClient client = new NimbusClient(conf, 
nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
                 rbm = client.getClient().getBlobMeta(key);
                 remoteBlobStore = new NimbusBlobStore();
                 remoteBlobStore.setClient(conf, client);
@@ -174,7 +174,7 @@ public class BlobStoreUtils {
             if (isSuccess) {
                 break;
             }
-            try(NimbusClient client = new NimbusClient(conf, 
nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
+            try (NimbusClient client = new NimbusClient(conf, 
nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
                 remoteBlobStore = new NimbusBlobStore();
                 remoteBlobStore.setClient(conf, client);
                 in = remoteBlobStore.getBlob(key);

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java 
b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 16b841b..3498aa8 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -310,7 +310,7 @@ public class ServerUtils {
     public static String writeScript(String dir, List<String> command,
                                      Map<String,String> environment) throws 
IOException {
         String path = scriptFilePath(dir);
-        try(BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
+        try (BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
             out.write("#!/bin/bash");
             out.newLine();
             if (environment != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java 
b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
index 2860482..a4625a8 100644
--- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
+++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
@@ -36,7 +36,7 @@ public class LocalStateTest {
 
     @Test
     public void testLocalState() throws Exception{
-        try(TmpPath dir1_tmp = new TmpPath();  TmpPath dir2_tmp = new 
TmpPath() ) {
+        try (TmpPath dir1_tmp = new TmpPath();  TmpPath dir2_tmp = new 
TmpPath() ) {
             GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
             GlobalStreamId globalStreamId_b = new GlobalStreamId("b", "b");
             GlobalStreamId globalStreamId_c = new GlobalStreamId("c", "c");

http://git-wip-us.apache.org/repos/asf/storm/blob/d364270b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java 
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
index 5f93923..45b6468 100644
--- a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
+++ b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
@@ -178,7 +178,7 @@ public class BlobStoreTest {
     String key = "test";
     SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
             .WORLD_EVERYTHING);
-    try(AtomicOutputStream out = store.createBlob(key, metadata, null)) {
+    try (AtomicOutputStream out = store.createBlob(key, metadata, null)) {
         out.write(1);
         File blobDir = store.getKeyDataDir(key);
         Files.createFile(blobDir.toPath().resolve("tempFile.tmp"));

Reply via email to