Repository: storm Updated Branches: refs/heads/master a726589d8 -> ab66003c1
STORM-1602 Blobstore UTs are failed on Windows * ensures objects of InputStream / OutputStream are closed after using * clojure: with-open * java: try-with-resource * skip checking symbolic link in LocalizerTest when on Windows * Windows seems not handle symbolic link in compressed file properly Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8dd66bfb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8dd66bfb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8dd66bfb Branch: refs/heads/master Commit: 8dd66bfb378b7b103694b7d968ad21483f3a3b80 Parents: 50701df Author: Jungtaek Lim <[email protected]> Authored: Thu Mar 17 15:53:15 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu Mar 17 17:28:24 2016 +0900 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/supervisor.clj | 6 +- .../org/apache/storm/blobstore/BlobStore.java | 5 + .../apache/storm/blobstore/BlobStoreTest.java | 171 +++++++++---------- .../apache/storm/localizer/LocalizerTest.java | 7 +- 4 files changed, 98 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index fd8f6c9..695e7eb 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -1292,8 +1292,10 @@ blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)] (try (FileUtils/forceMkdir (File. tmproot)) - (.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormCodePath tmproot)) nil) - (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil) + (with-open [fos-storm-code (FileOutputStream. (ConfigUtils/supervisorStormCodePath tmproot)) + fos-storm-conf (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot))] + (.readBlobTo blob-store (ConfigUtils/masterStormCodeKey storm-id) fos-storm-code nil) + (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) fos-storm-conf nil)) (finally (.shutdown blob-store))) (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java index 09093a2..14879b4 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -396,6 +396,11 @@ public abstract class BlobStore implements Shutdownable { public long getFileLength() throws IOException { return part.getFileLength(); } + + @Override + public void close() throws IOException { + in.close(); + } } /** http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java index 8445e6a..151b5c6 100644 --- a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java @@ -182,30 +182,30 @@ public class BlobStoreTest { Subject admin = getSubject("admin"); assertStoreHasExactly(store); SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - AtomicOutputStream out = store.createBlob("test", metadata, admin); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } store.deleteBlob("test", admin); //Test for Supervisor Admin Subject supervisor = getSubject("supervisor"); assertStoreHasExactly(store); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, supervisor); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } store.deleteBlob("test", supervisor); //Test for Nimbus itself as a user Subject nimbus = getNimbusSubject(); assertStoreHasExactly(store); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, nimbus); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } store.deleteBlob("test", nimbus); // Test with a dummy test_subject for cases where subject !=null (security turned on) @@ -215,9 +215,9 @@ public class BlobStoreTest { // Tests for case when subject != null (security turned on) and // acls for the blob are set to WORLD_EVERYTHING metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - out = store.createBlob("test", metadata, who); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } assertStoreHasExactly(store, "test"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -231,9 +231,9 @@ public class BlobStoreTest { // acls are not set for the blob (DEFAULT) LOG.info("Creating test again"); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, who); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(2); + } assertStoreHasExactly(store, "test"); // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have @@ -242,28 +242,29 @@ public class BlobStoreTest { readAssertEqualsWithAuth(store, who, "test", 2); LOG.info("Updating test"); - out = store.updateBlob("test", who); - out.write(3); - out.close(); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(3); + } assertStoreHasExactly(store, "test"); readAssertEqualsWithAuth(store, who, "test", 3); LOG.info("Updating test again"); - out = store.updateBlob("test", who); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + } // Test for subject with no principals and acls set to WORLD_EVERYTHING who = new Subject(); metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); LOG.info("Creating test"); - out = store.createBlob("test-empty-subject-WE", metadata, who); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) { + out.write(2); + } assertStoreHasExactly(store, "test-empty-subject-WE", "test"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -273,9 +274,10 @@ public class BlobStoreTest { who = new Subject(); metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); LOG.info("Creating other"); - out = store.createBlob("test-empty-subject-DEF", metadata, who); - out.write(2); - out.close(); + + try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) { + out.write(2); + } assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -286,12 +288,6 @@ public class BlobStoreTest { } else { fail("Error the blobstore is of unknowntype"); } - try { - out.close(); - } catch (IOException e) { - // This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. - } } public void testBasic(BlobStore store) throws Exception { @@ -301,9 +297,9 @@ public class BlobStoreTest { // acls for the blob are set to WORLD_EVERYTHING SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler .WORLD_EVERYTHING); - AtomicOutputStream out = store.createBlob("test", metadata, null); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(1); + } assertStoreHasExactly(store, "test"); // Testing whether acls are set to WORLD_EVERYTHING assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); @@ -317,37 +313,38 @@ public class BlobStoreTest { // update blob interface metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); LOG.info("Creating test again"); - out = store.createBlob("test", metadata, null); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(2); + } assertStoreHasExactly(store, "test"); if (store instanceof LocalFsBlobStore) { assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); } readAssertEquals(store, "test", 2); LOG.info("Updating test"); - out = store.updateBlob("test", null); - out.write(3); - out.close(); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } assertStoreHasExactly(store, "test"); readAssertEquals(store, "test", 3); LOG.info("Updating test again"); - out = store.updateBlob("test", null); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + } // Tests for case when subject == null (security turned off) and // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore if (store instanceof LocalFsBlobStore) { metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); LOG.info("Creating test for empty acls when security is off"); - out = store.createBlob("test-empty-acls", metadata, null); - LOG.info("metadata {}", metadata); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test-empty-acls", metadata, null)) { + LOG.info("metadata {}", metadata); + out.write(2); + } assertStoreHasExactly(store, "test-empty-acls", "test"); // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is @@ -363,12 +360,6 @@ public class BlobStoreTest { } else { fail("Error the blobstore is of unknowntype"); } - try { - out.close(); - } catch (IOException e) { - // This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. - } } @@ -376,26 +367,26 @@ public class BlobStoreTest { assertStoreHasExactly(store); LOG.info("Creating test"); - AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null); - out.write(1); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null)) { + out.write(1); + } assertStoreHasExactly(store, "test"); readAssertEquals(store, "test", 1); LOG.info("Creating other"); - out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 1); readAssertEquals(store, "other", 2); LOG.info("Updating other"); - out = store.updateBlob("other", null); - out.write(5); - out.close(); + try (AtomicOutputStream out = store.updateBlob("other", null)) { + out.write(5); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 1); readAssertEquals(store, "other", 5); @@ -406,18 +397,18 @@ public class BlobStoreTest { readAssertEquals(store, "other", 5); LOG.info("Creating test again"); - out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null); - out.write(2); - out.close(); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 2); readAssertEquals(store, "other", 5); LOG.info("Updating test"); - out = store.updateBlob("test", null); - out.write(3); - out.close(); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } assertStoreHasExactly(store, "test", "other"); readAssertEquals(store, "test", 3); readAssertEquals(store, "other", 5); @@ -428,7 +419,9 @@ public class BlobStoreTest { readAssertEquals(store, "test", 3); LOG.info("Updating test again"); - out = store.updateBlob("test", null); + + // intended to not guarding with try-with-resource since otherwise test will fail + AtomicOutputStream out = store.updateBlob("test", null); out.write(4); out.flush(); LOG.info("SLEEPING"); @@ -452,10 +445,12 @@ public class BlobStoreTest { public void testGetFileLength() throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException { LocalFsBlobStore store = initLocalFs(); - AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null); - out.write(1); - out.close(); - assertEquals(1, store.getBlob("test", null).getFileLength()); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null)) { + out.write(1); + } + try (InputStreamWithMeta blobInputStream = store.getBlob("test", null)) { + assertEquals(1, blobInputStream.getFileLength()); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/8dd66bfb/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java index 096c4b0..613e165 100644 --- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java +++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java @@ -110,7 +110,7 @@ public class LocalizerTest { @Before public void setUp() throws Exception { - baseDir = new File("/tmp/blob-store-localizer-test-"+ UUID.randomUUID()); + baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID()); if (!baseDir.mkdir()) { throw new IOException("failed to create base directory"); } @@ -259,6 +259,11 @@ public class LocalizerTest { // archive passed in must contain symlink named tmptestsymlink if not a zip file public void testArchives(String archivePath, boolean supportSymlinks, int size) throws Exception { + if (Utils.isOnWindows()) { + // Windows should set this to false cause symlink in compressed file doesn't work properly. + supportSymlinks = false; + } + Map conf = new HashMap(); // set clean time really high so doesn't kick in conf.put(Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
