Repository: storm Updated Branches: refs/heads/master a9c2f3adb -> 339b1e6f3
STORM-3302: Ensures we close sockets to HDFS Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/845787d6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/845787d6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/845787d6 Branch: refs/heads/master Commit: 845787d671dcdf4475493ea5888f2021b1422ba9 Parents: 730c1a3 Author: Derek Dagit <der...@oath.com> Authored: Wed Dec 12 10:43:28 2018 -0600 Committer: Derek Dagit <der...@oath.com> Committed: Wed Dec 12 10:43:28 2018 -0600 ---------------------------------------------------------------------- .../org/apache/storm/blobstore/BlobStore.java | 14 ++++++--- .../storm/dependency/DependencyUploader.java | 20 +++++++++--- .../apache/storm/blobstore/BlobStoreUtils.java | 33 ++++++++++++-------- .../storm/nimbus/LeaderListenerCallback.java | 3 +- 4 files changed, 46 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java index cb2928c..6cf9df9 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -289,12 +289,16 @@ public abstract class BlobStore implements Shutdownable, AutoCloseable { out.write(buffer, 0, len); } out.close(); - } catch (AuthorizationException | IOException | RuntimeException e) { - if (out != null) { - out.cancel(); - } + out = null; } finally { - in.close(); + try { + if (out != null) { + out.cancel(); + } + in.close(); + } catch (IOException throwaway) { + // Ignored + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java index 41c1d86..d8f8c5a 100644 --- a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java +++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java @@ -154,11 +154,23 @@ public class DependencyUploader { acls.add(new AccessControl(AccessControlType.OTHER, BlobStoreAclHandler.READ)); - AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); - Files.copy(dependency.toPath(), blob); - blob.close(); + AtomicOutputStream blob = null; + try { + blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); + Files.copy(dependency.toPath(), blob); + blob.close(); + blob = null; - uploadNew = true; + uploadNew = true; + } finally { + try { + if (blob != null) { + blob.cancel(); + } + } catch (IOException throwaway) { + // Ignore. + } + } } return uploadNew; http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java index b9f93db..00d833f 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java @@ -122,7 +122,6 @@ public class BlobStoreUtils { throws TTransportException { ReadableBlobMeta rbm; ClientBlobStore remoteBlobStore; - InputStreamWithMeta in; boolean isSuccess = false; LOG.debug("Download blob NimbusInfos {}", nimbusInfos); for (NimbusInfo nimbusInfo : nimbusInfos) { @@ -134,8 +133,9 @@ public class BlobStoreUtils { rbm = client.getClient().getBlobMeta(key); remoteBlobStore = new NimbusBlobStore(); remoteBlobStore.setClient(conf, client); - in = remoteBlobStore.getBlob(key); - blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject()); + try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) { + blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject()); + } // if key already exists while creating the blob else update it Iterator<String> keyIterator = blobStore.listKeys(); while (keyIterator.hasNext()) { @@ -170,8 +170,7 @@ public class BlobStoreUtils { public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) throws TTransportException { ClientBlobStore remoteBlobStore; - InputStreamWithMeta in; - AtomicOutputStream out; + AtomicOutputStream out = null; boolean isSuccess = false; LOG.debug("Download blob NimbusInfos {}", nimbusInfos); for (NimbusInfo nimbusInfo : nimbusInfos) { @@ -181,15 +180,15 @@ public class BlobStoreUtils { try (NimbusClient client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)) { remoteBlobStore = new NimbusBlobStore(); remoteBlobStore.setClient(conf, client); - in = remoteBlobStore.getBlob(key); - out = blobStore.updateBlob(key, getNimbusSubject()); - byte[] buffer = new byte[2048]; - int len = 0; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - if (out != null) { + try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) { + out = blobStore.updateBlob(key, getNimbusSubject()); + byte[] buffer = new byte[2048]; + int len = 0; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } out.close(); + out = null; } isSuccess = true; } catch(FileNotFoundException fnf) { @@ -204,6 +203,14 @@ public class BlobStoreUtils { } catch (Exception exp) { // Logging an exception while client is connecting LOG.error("Exception", exp); + } finally { + if (out != null) { + try { + out.cancel(); + } catch (IOException e) { + // Ignore. + } + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java index edd7444..2e1a6ca 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java @@ -196,8 +196,7 @@ public class LeaderListenerCallback { Subject subject = ReqContext.context().subject(); for (String activeTopologyCodeKey : activeTopologyCodeKeys) { - try { - InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject); + try (InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject)) { byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue()); StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class); if (stormCode.is_set_dependency_jars()) {