http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java index 45b6468..57b0750 100644 --- a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java @@ -1,27 +1,22 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.nio.file.Files; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -29,9 +24,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; - import javax.security.auth.Subject; - import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.generated.AuthorizationException; @@ -53,423 +46,422 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.spy; -import java.nio.file.Files; - public class BlobStoreTest { - private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); - URI base; - File baseFile; - private static Map<String, Object> conf = new HashMap(); - public static final int READ = 0x01; - public static final int WRITE = 0x02; - public static final int ADMIN = 0x04; - - @Before - public void init() { - initializeConfigs(); - baseFile = new File("target/blob-store-test-"+UUID.randomUUID()); - base = baseFile.toURI(); - } - - @After - public void cleanup() throws IOException { - FileUtils.deleteDirectory(baseFile); - } - - // Method which initializes nimbus admin - public static void initializeConfigs() { - conf.put(Config.NIMBUS_ADMINS,"admin"); - conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); - } - - // Gets Nimbus Subject with NimbusPrincipal set on it - public static Subject getNimbusSubject() { - Subject nimbus = new Subject(); - nimbus.getPrincipals().add(new NimbusPrincipal()); - return nimbus; - } - - // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization - public static void assertStoreHasExactly(BlobStore store, Subject who, String ... keys) - throws IOException, KeyNotFoundException, AuthorizationException { - Set<String> expected = new HashSet<String>(Arrays.asList(keys)); - Set<String> found = new HashSet<String>(); - Iterator<String> c = store.listKeys(); - while (c.hasNext()) { - String keyName = c.next(); - found.add(keyName); - } - Set<String> extra = new HashSet<String>(found); - extra.removeAll(expected); - assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty()); - Set<String> missing = new HashSet<String>(expected); - missing.removeAll(found); - assertTrue("Found keys missing from the blob store "+missing, missing.isEmpty()); - } - - public static void assertStoreHasExactly(BlobStore store, String ... keys) - throws IOException, KeyNotFoundException, AuthorizationException { - assertStoreHasExactly(store, null, keys); - } - - // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on) - public static int readInt(BlobStore store, Subject who, String key) - throws IOException, KeyNotFoundException, AuthorizationException { - InputStream in = store.getBlob(key, who); - try { - return in.read(); - } finally { - in.close(); + public static final int READ = 0x01; + public static final int WRITE = 0x02; + public static final int ADMIN = 0x04; + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); + private static Map<String, Object> conf = new HashMap(); + URI base; + File baseFile; + + // Method which initializes nimbus admin + public static void initializeConfigs() { + conf.put(Config.NIMBUS_ADMINS, "admin"); + conf.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor"); } - } - - public static int readInt(BlobStore store, String key) - throws IOException, KeyNotFoundException, AuthorizationException { - return readInt(store, null, key); - } - - public static void readAssertEquals(BlobStore store, String key, int value) - throws IOException, KeyNotFoundException, AuthorizationException { - assertEquals(value, readInt(store, key)); - } - - // Checks for assertion when we turn on security - public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value) - throws IOException, KeyNotFoundException, AuthorizationException { - assertEquals(value, readInt(store, who, key)); - } - - private LocalFsBlobStore initLocalFs() { - LocalFsBlobStore store = new LocalFsBlobStore(); - // Spy object that tries to mock the real object store - LocalFsBlobStore spy = spy(store); - Mockito.doNothing().when(spy).checkForBlobUpdate("test"); - Mockito.doNothing().when(spy).checkForBlobUpdate("other"); - Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE"); - Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF"); - Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls"); - Map<String, Object> conf = Utils.readStormConfig(); - conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath()); - conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal"); - spy.prepare(conf, null, null); - return spy; - } - - @Test - public void testLocalFsWithAuth() throws Exception { - testWithAuthentication(initLocalFs()); - } - - @Test - public void testBasicLocalFs() throws Exception { - testBasic(initLocalFs()); - } - - @Test - public void testMultipleLocalFs() throws Exception { - testMultiple(initLocalFs()); - } - - @Test - public void testDeleteAfterFailedCreate() throws Exception{ - //Check that a blob can be deleted when a temporary file exists in the blob directory - LocalFsBlobStore store = initLocalFs(); - - String key = "test"; - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING); - try (AtomicOutputStream out = store.createBlob(key, metadata, null)) { - out.write(1); - File blobDir = store.getKeyDataDir(key); - Files.createFile(blobDir.toPath().resolve("tempFile.tmp")); - } - - store.deleteBlob("test",null); - - } - - public Subject getSubject(String name) { - Subject subject = new Subject(); - SingleUserPrincipal user = new SingleUserPrincipal(name); - subject.getPrincipals().add(user); - return subject; - } - - // Check for Blobstore with authentication - public void testWithAuthentication(BlobStore store) throws Exception { - //Test for Nimbus Admin - Subject admin = getSubject("admin"); - assertStoreHasExactly(store); - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { - assertStoreHasExactly(store, "test"); - out.write(1); - } - store.deleteBlob("test", admin); - - //Test for Supervisor Admin - Subject supervisor = getSubject("supervisor"); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { - assertStoreHasExactly(store, "test"); - out.write(1); - } - store.deleteBlob("test", supervisor); - - //Test for Nimbus itself as a user - Subject nimbus = getNimbusSubject(); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) { - assertStoreHasExactly(store, "test"); - out.write(1); - } - store.deleteBlob("test", nimbus); - // Test with a dummy test_subject for cases where subject !=null (security turned on) - Subject who = getSubject("test_subject"); - assertStoreHasExactly(store); - - // Tests for case when subject != null (security turned on) and - // acls for the blob are set to WORLD_EVERYTHING - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { - out.write(1); - } - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test", 1); - - LOG.info("Deleting test"); - store.deleteBlob("test", who); - assertStoreHasExactly(store); - - // Tests for case when subject != null (security turned on) and - // acls are not set for the blob (DEFAULT) - LOG.info("Creating test again"); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { - out.write(2); + // Gets Nimbus Subject with NimbusPrincipal set on it + public static Subject getNimbusSubject() { + Subject nimbus = new Subject(); + nimbus.getPrincipals().add(new NimbusPrincipal()); + return nimbus; } - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because - // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have - // complete access to the blob - assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test", 2); - - LOG.info("Updating test"); - try (AtomicOutputStream out = store.updateBlob("test", who)) { - out.write(3); + + // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization + public static void assertStoreHasExactly(BlobStore store, Subject who, String... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + Set<String> expected = new HashSet<String>(Arrays.asList(keys)); + Set<String> found = new HashSet<String>(); + Iterator<String> c = store.listKeys(); + while (c.hasNext()) { + String keyName = c.next(); + found.add(keyName); + } + Set<String> extra = new HashSet<String>(found); + extra.removeAll(expected); + assertTrue("Found extra keys in the blob store " + extra, extra.isEmpty()); + Set<String> missing = new HashSet<String>(expected); + missing.removeAll(found); + assertTrue("Found keys missing from the blob store " + missing, missing.isEmpty()); } - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); - - LOG.info("Updating test again"); - try (AtomicOutputStream out = store.updateBlob("test", who)) { - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); + + public static void assertStoreHasExactly(BlobStore store, String... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + assertStoreHasExactly(store, null, keys); } - // Test for subject with no principals and acls set to WORLD_EVERYTHING - who = new Subject(); - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - LOG.info("Creating test"); - try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) { - out.write(2); + // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on) + public static int readInt(BlobStore store, Subject who, String key) + throws IOException, KeyNotFoundException, AuthorizationException { + InputStream in = store.getBlob(key, who); + try { + return in.read(); + } finally { + in.close(); + } } - assertStoreHasExactly(store, "test-empty-subject-WE", "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); - - // Test for subject with no principals and acls set to DEFAULT - who = new Subject(); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - LOG.info("Creating other"); - - try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) { - out.write(2); + + public static int readInt(BlobStore store, String key) + throws IOException, KeyNotFoundException, AuthorizationException { + return readInt(store, null, key); } - assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); - - if (store instanceof LocalFsBlobStore) { - ((LocalFsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); + + public static void readAssertEquals(BlobStore store, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, key)); } - } - - public void testBasic(BlobStore store) throws Exception { - assertStoreHasExactly(store); - LOG.info("Creating test"); - // Tests for case when subject == null (security turned off) and - // acls for the blob are set to WORLD_EVERYTHING - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING); - try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { - out.write(1); + + @Before + public void init() { + initializeConfigs(); + baseFile = new File("target/blob-store-test-" + UUID.randomUUID()); + base = baseFile.toURI(); } - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEquals(store, "test", 1); - - LOG.info("Deleting test"); - store.deleteBlob("test", null); - assertStoreHasExactly(store); - - // The following tests are run for both hdfs and local store to test the - // update blob interface - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - LOG.info("Creating test again"); - try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { - out.write(2); + + @After + public void cleanup() throws IOException { + FileUtils.deleteDirectory(baseFile); } - assertStoreHasExactly(store, "test"); - if (store instanceof LocalFsBlobStore) { - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + + // Checks for assertion when we turn on security + public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, who, key)); } - readAssertEquals(store, "test", 2); - LOG.info("Updating test"); - try (AtomicOutputStream out = store.updateBlob("test", null)) { - out.write(3); + + private LocalFsBlobStore initLocalFs() { + LocalFsBlobStore store = new LocalFsBlobStore(); + // Spy object that tries to mock the real object store + LocalFsBlobStore spy = spy(store); + Mockito.doNothing().when(spy).checkForBlobUpdate("test"); + Mockito.doNothing().when(spy).checkForBlobUpdate("other"); + Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE"); + Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF"); + Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls"); + Map<String, Object> conf = Utils.readStormConfig(); + conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath()); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + spy.prepare(conf, null, null); + return spy; } - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); - - LOG.info("Updating test again"); - try (AtomicOutputStream out = store.updateBlob("test", null)) { - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); + + @Test + public void testLocalFsWithAuth() throws Exception { + testWithAuthentication(initLocalFs()); } - // Tests for case when subject == null (security turned off) and - // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore - if (store instanceof LocalFsBlobStore) { - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - LOG.info("Creating test for empty acls when security is off"); - try (AtomicOutputStream out = store.createBlob("test-empty-acls", metadata, null)) { - LOG.info("metadata {}", metadata); - out.write(2); - } - assertStoreHasExactly(store, "test-empty-acls", "test"); - // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore - // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is - // always authenticated. - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.get_acl().toString().contains("OTHER")); - - LOG.info("Deleting test-empty-acls"); - store.deleteBlob("test-empty-acls", null); + @Test + public void testBasicLocalFs() throws Exception { + testBasic(initLocalFs()); } - if (store instanceof LocalFsBlobStore) { - ((LocalFsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); + @Test + public void testMultipleLocalFs() throws Exception { + testMultiple(initLocalFs()); } - } + @Test + public void testDeleteAfterFailedCreate() throws Exception { + //Check that a blob can be deleted when a temporary file exists in the blob directory + LocalFsBlobStore store = initLocalFs(); - public void testMultiple(BlobStore store) throws Exception { + String key = "test"; + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob(key, metadata, null)) { + out.write(1); + File blobDir = store.getKeyDataDir(key); + Files.createFile(blobDir.toPath().resolve("tempFile.tmp")); + } - assertStoreHasExactly(store); - LOG.info("Creating test"); - try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null)) { - out.write(1); - } - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 1); + store.deleteBlob("test", null); - LOG.info("Creating other"); - try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null)) { - out.write(2); } - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 1); - readAssertEquals(store, "other", 2); - LOG.info("Updating other"); - try (AtomicOutputStream out = store.updateBlob("other", null)) { - out.write(5); + public Subject getSubject(String name) { + Subject subject = new Subject(); + SingleUserPrincipal user = new SingleUserPrincipal(name); + subject.getPrincipals().add(user); + return subject; } - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 1); - readAssertEquals(store, "other", 5); - - LOG.info("Deleting test"); - store.deleteBlob("test", null); - assertStoreHasExactly(store, "other"); - readAssertEquals(store, "other", 5); - - LOG.info("Creating test again"); - try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null)) { - out.write(2); - } - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 2); - readAssertEquals(store, "other", 5); - LOG.info("Updating test"); - try (AtomicOutputStream out = store.updateBlob("test", null)) { - out.write(3); + // Check for Blobstore with authentication + public void testWithAuthentication(BlobStore store) throws Exception { + //Test for Nimbus Admin + Subject admin = getSubject("admin"); + assertStoreHasExactly(store); + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", admin); + + //Test for Supervisor Admin + Subject supervisor = getSubject("supervisor"); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", supervisor); + + //Test for Nimbus itself as a user + Subject nimbus = getNimbusSubject(); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", nimbus); + + // Test with a dummy test_subject for cases where subject !=null (security turned on) + Subject who = getSubject("test_subject"); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls for the blob are set to WORLD_EVERYTHING + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", who); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls are not set for the blob (DEFAULT) + LOG.info("Creating test again"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(2); + } + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because + // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have + // complete access to the blob + assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 2); + + LOG.info("Updating test"); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(3); + } + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + LOG.info("Updating test again"); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + } + + // Test for subject with no principals and acls set to WORLD_EVERYTHING + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test"); + try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) { + out.write(2); + } + assertStoreHasExactly(store, "test-empty-subject-WE", "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); + + // Test for subject with no principals and acls set to DEFAULT + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + LOG.info("Creating other"); + + try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) { + out.write(2); + } + assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); + + if (store instanceof LocalFsBlobStore) { + ((LocalFsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } } - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 3); - readAssertEquals(store, "other", 5); - - LOG.info("Deleting other"); - store.deleteBlob("other", null); - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); - - LOG.info("Updating test again"); - - // intended to not guarding with try-with-resource since otherwise test will fail - AtomicOutputStream out = store.updateBlob("test", null); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - - if (store instanceof LocalFsBlobStore) { - ((LocalFsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); - } assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); - try { - out.close(); - } catch (IOException e) { - // This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. + + public void testBasic(BlobStore store) throws Exception { + assertStoreHasExactly(store); + LOG.info("Creating test"); + // Tests for case when subject == null (security turned off) and + // acls for the blob are set to WORLD_EVERYTHING + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEquals(store, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store); + + // The following tests are run for both hdfs and local store to test the + // update blob interface + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test again"); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(2); + } + assertStoreHasExactly(store, "test"); + if (store instanceof LocalFsBlobStore) { + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + } + readAssertEquals(store, "test", 2); + LOG.info("Updating test"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + } + + // Tests for case when subject == null (security turned off) and + // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore + if (store instanceof LocalFsBlobStore) { + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + LOG.info("Creating test for empty acls when security is off"); + try (AtomicOutputStream out = store.createBlob("test-empty-acls", metadata, null)) { + LOG.info("metadata {}", metadata); + out.write(2); + } + assertStoreHasExactly(store, "test-empty-acls", "test"); + // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore + // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is + // always authenticated. + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.get_acl().toString().contains("OTHER")); + + LOG.info("Deleting test-empty-acls"); + store.deleteBlob("test-empty-acls", null); + } + + if (store instanceof LocalFsBlobStore) { + ((LocalFsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } } - } - - @Test - public void testGetFileLength() - throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException { - LocalFsBlobStore store = initLocalFs(); - try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null)) { - out.write(1); + + + public void testMultiple(BlobStore store) throws Exception { + + assertStoreHasExactly(store); + LOG.info("Creating test"); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 1); + + LOG.info("Creating other"); + try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 2); + + LOG.info("Updating other"); + try (AtomicOutputStream out = store.updateBlob("other", null)) { + out.write(5); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store, "other"); + readAssertEquals(store, "other", 5); + + LOG.info("Creating test again"); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 2); + readAssertEquals(store, "other", 5); + + LOG.info("Updating test"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 3); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting other"); + store.deleteBlob("other", null); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + + // intended to not guarding with try-with-resource since otherwise test will fail + AtomicOutputStream out = store.updateBlob("test", null); + out.write(4); + out.flush(); + LOG.info("SLEEPING"); + Thread.sleep(2); + + if (store instanceof LocalFsBlobStore) { + ((LocalFsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + try { + out.close(); + } catch (IOException e) { + // This is likely to happen when we try to commit something that + // was cleaned up. This is expected and acceptable. + } } - try (InputStreamWithMeta blobInputStream = store.getBlob("test", null)) { - assertEquals(1, blobInputStream.getFileLength()); + + @Test + public void testGetFileLength() + throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException { + LocalFsBlobStore store = initLocalFs(); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler + .WORLD_EVERYTHING), null)) { + out.write(1); + } + try (InputStreamWithMeta blobInputStream = store.getBlob("test", null)) { + assertEquals(1, blobInputStream.getFileLength()); + } } - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java index e861db8..2c3a49a 100644 --- a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java +++ b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java @@ -1,22 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; +import java.util.List; +import java.util.Map; +import org.apache.storm.nimbus.NimbusInfo; +import org.junit.Test; + import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -24,19 +24,13 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.List; -import java.util.Map; - -import org.apache.storm.nimbus.NimbusInfo; -import org.junit.Test; - public class BlobStoreUtilsTest { private static final String KEY = "key"; private static final String BLOBSTORE_KEY = "/blobstore/" + KEY; @SuppressWarnings("unchecked") - private Map<String, Object> conf = (Map<String, Object>)mock(Map.class); + private Map<String, Object> conf = (Map<String, Object>) mock(Map.class); private MockZookeeperClientBuilder zkClientBuilder = new MockZookeeperClientBuilder(); private BlobStore blobStore = mock(BlobStore.class); private NimbusInfo nimbusDetails = mock(NimbusInfo.class); @@ -75,7 +69,7 @@ public class BlobStoreUtilsTest { @Test public void testUpdateKeyForBlobStore_nodeWithNullChildren() { zkClientBuilder.withExists(BLOBSTORE_KEY, true); - zkClientBuilder.withGetChildren(BLOBSTORE_KEY, (List<String>)null); + zkClientBuilder.withGetChildren(BLOBSTORE_KEY, (List<String>) null); BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, zkClientBuilder.build(), KEY, nimbusDetails); zkClientBuilder.verifyExists(true); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java b/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java index ea12204..d988b48 100644 --- a/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java @@ -1,40 +1,40 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; -import org.apache.storm.Config; -import org.apache.storm.nimbus.NimbusInfo; -import org.apache.storm.utils.Utils; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; +import org.apache.storm.Config; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Utils; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.*; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -43,93 +43,94 @@ import static org.junit.Assert.assertTrue; * and BlobSynchronizer class methods */ public class BlobSynchronizerTest { - private URI base; - private File baseFile; - private static Map<String, Object> conf = new HashMap(); - private NIOServerCnxnFactory factory; + private static Map<String, Object> conf = new HashMap(); + private URI base; + private File baseFile; + private NIOServerCnxnFactory factory; - @Before - public void init() throws Exception { - initializeConfigs(); - baseFile = new File("target/blob-store-test-"+ UUID.randomUUID()); - base = baseFile.toURI(); - } + // Method which initializes nimbus admin + public static void initializeConfigs() { + conf.put(Config.NIMBUS_ADMINS, "admin"); + conf.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor"); + } - @After - public void cleanUp() throws IOException { - FileUtils.deleteDirectory(baseFile); - if (factory != null) { - factory.shutdown(); + @Before + public void init() throws Exception { + initializeConfigs(); + baseFile = new File("target/blob-store-test-" + UUID.randomUUID()); + base = baseFile.toURI(); } - } - // Method which initializes nimbus admin - public static void initializeConfigs() { - conf.put(Config.NIMBUS_ADMINS,"admin"); - conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); - } + @After + public void cleanUp() throws IOException { + FileUtils.deleteDirectory(baseFile); + if (factory != null) { + factory.shutdown(); + } + } - private LocalFsBlobStore initLocalFs() { - LocalFsBlobStore store = new LocalFsBlobStore(); - Map<String, Object> conf = Utils.readStormConfig(); - conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath()); - conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal"); - this.conf = conf; - store.prepare(conf, null, null); - return store; - } + private LocalFsBlobStore initLocalFs() { + LocalFsBlobStore store = new LocalFsBlobStore(); + Map<String, Object> conf = Utils.readStormConfig(); + conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath()); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + this.conf = conf; + store.prepare(conf, null, null); + return store; + } - @Test - public void testBlobSynchronizerForKeysToDownload() { - BlobStore store = initLocalFs(); - BlobSynchronizer sync = new BlobSynchronizer(store, conf); - // test for keylist to download - Set<String> zkSet = new HashSet<String>(); - zkSet.add("key1"); - Set<String> blobStoreSet = new HashSet<String>(); - blobStoreSet.add("key1"); - Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); - assertTrue("Not Empty", resultSet.isEmpty()); - zkSet.add("key1"); - blobStoreSet.add("key2"); - resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); - assertTrue("Not Empty", resultSet.isEmpty()); - blobStoreSet.remove("key1"); - blobStoreSet.remove("key2"); - zkSet.add("key1"); - resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); - assertTrue("Unexpected keys to download", (resultSet.size() == 1) && (resultSet.contains("key1"))); - } + @Test + public void testBlobSynchronizerForKeysToDownload() { + BlobStore store = initLocalFs(); + BlobSynchronizer sync = new BlobSynchronizer(store, conf); + // test for keylist to download + Set<String> zkSet = new HashSet<String>(); + zkSet.add("key1"); + Set<String> blobStoreSet = new HashSet<String>(); + blobStoreSet.add("key1"); + Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); + assertTrue("Not Empty", resultSet.isEmpty()); + zkSet.add("key1"); + blobStoreSet.add("key2"); + resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); + assertTrue("Not Empty", resultSet.isEmpty()); + blobStoreSet.remove("key1"); + blobStoreSet.remove("key2"); + zkSet.add("key1"); + resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet); + assertTrue("Unexpected keys to download", (resultSet.size() == 1) && (resultSet.contains("key1"))); + } - @Test - public void testGetLatestSequenceNumber() throws Exception { - List<String> stateInfoList = new ArrayList<String>(); - stateInfoList.add("nimbus1:8000-2"); - stateInfoList.add("nimbus-1:8000-4"); - assertTrue("Failed to get the latest version", BlobStoreUtils.getLatestSequenceNumber(stateInfoList)==4); - } + @Test + public void testGetLatestSequenceNumber() throws Exception { + List<String> stateInfoList = new ArrayList<String>(); + stateInfoList.add("nimbus1:8000-2"); + stateInfoList.add("nimbus-1:8000-4"); + assertTrue("Failed to get the latest version", BlobStoreUtils.getLatestSequenceNumber(stateInfoList) == 4); + } - @Test - public void testNimbodesWithLatestVersionOfBlob() throws Exception { - try (TestingServer server = new TestingServer(); CuratorFramework zkClient = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3))) { - zkClient.start(); - // Creating nimbus hosts containing latest version of blob - zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1"); - zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2"); - Set<NimbusInfo> set = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1"); - assertEquals("Failed to get the correct nimbus hosts with latest blob version", (set.iterator().next()).getHost(),"nimbus2"); - zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1"); - zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2"); - } - } + @Test + public void testNimbodesWithLatestVersionOfBlob() throws Exception { + try (TestingServer server = new TestingServer(); CuratorFramework zkClient = CuratorFrameworkFactory + .newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3))) { + zkClient.start(); + // Creating nimbus hosts containing latest version of blob + zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1"); + zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2"); + Set<NimbusInfo> set = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1"); + assertEquals("Failed to get the correct nimbus hosts with latest blob version", (set.iterator().next()).getHost(), "nimbus2"); + zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1"); + zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2"); + } + } - @Test - public void testNormalizeVersionInfo () throws Exception { - BlobKeySequenceInfo info1 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1"); - assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800")); - assertTrue(info1.getSequenceNumber().equals("1")); - BlobKeySequenceInfo info2 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1"); - assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800")); - assertTrue(info2.getSequenceNumber().equals("1")); - } + @Test + public void testNormalizeVersionInfo() throws Exception { + BlobKeySequenceInfo info1 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1"); + assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800")); + assertTrue(info1.getSequenceNumber().equals("1")); + BlobKeySequenceInfo info2 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1"); + assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800")); + assertTrue(info2.getSequenceNumber().equals("1")); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java b/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java index c3e2b84..2da3b2d 100644 --- a/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java +++ b/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java @@ -1,30 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.blobstore; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +package org.apache.storm.blobstore; import java.util.Arrays; import java.util.List; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ExistsBuilder; import org.apache.curator.framework.api.GetChildrenBuilder; @@ -32,6 +21,11 @@ import org.apache.curator.framework.api.Pathable; import org.apache.log4j.Logger; import org.apache.zookeeper.data.Stat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class MockZookeeperClientBuilder { private static final Logger LOG = Logger.getLogger(MockZookeeperClientBuilder.class); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java index 73f5ff4..dee13dc 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java @@ -15,9 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.daemon.drpc; -import static org.junit.Assert.*; +package org.apache.storm.daemon.drpc; import java.util.Arrays; import java.util.HashMap; @@ -27,9 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import javax.security.auth.Subject; - import org.apache.storm.Config; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.DRPCExceptionType; @@ -39,19 +36,20 @@ import org.apache.storm.security.auth.DefaultPrincipalToLocal; import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.SingleUserPrincipal; import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer; -import org.apache.storm.security.auth.authorizer.DenyAuthorizer; import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer.AclFunctionEntry; +import org.apache.storm.security.auth.authorizer.DenyAuthorizer; import org.apache.storm.utils.Time; import org.junit.AfterClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class DRPCTest { private static final ExecutorService exec = Executors.newCachedThreadPool(); - - public static interface ThrowStuff { - public void run() throws Exception; - } - + private static void assertThrows(ThrowStuff t, Class<? extends Exception> expected) { try { t.run(); @@ -60,12 +58,12 @@ public class DRPCTest { assertTrue("Expected " + t + " to throw " + expected + " but threw " + e, expected.isInstance(e)); } } - + @AfterClass public static void close() { exec.shutdownNow(); } - + public static DRPCRequest getNextAvailableRequest(DRPC server, String func) throws Exception { DRPCRequest request = null; long timedout = System.currentTimeMillis() + 5_000; @@ -79,7 +77,7 @@ public class DRPCTest { fail("Test timed out waiting for a request on " + func); return request; } - + @Test public void testGoodBlocking() throws Exception { try (DRPC server = new DRPC(null, 100)) { @@ -93,7 +91,7 @@ public class DRPCTest { assertEquals("tested", result); } } - + @Test public void testFailedBlocking() throws Exception { try (DRPC server = new DRPC(null, 100)) { @@ -110,11 +108,11 @@ public class DRPCTest { Throwable t = e.getCause(); assertEquals(t.getClass(), DRPCExecutionException.class); //Don't know a better way to validate that it failed. - assertEquals(DRPCExceptionType.FAILED_REQUEST, ((DRPCExecutionException)t).get_type()); + assertEquals(DRPCExceptionType.FAILED_REQUEST, ((DRPCExecutionException) t).get_type()); } } } - + @Test public void testDequeueAfterTimeout() throws Exception { long timeout = 1000; @@ -135,7 +133,7 @@ public class DRPCTest { assertEquals("", request.get_func_args()); } } - + @Test public void testDeny() throws Exception { try (DRPC server = new DRPC(new DenyAuthorizer(), 100)) { @@ -143,7 +141,7 @@ public class DRPCTest { assertThrows(() -> server.fetchRequest("testing"), AuthorizationException.class); } } - + @Test public void testStrict() throws Exception { ReqContext jt = new ReqContext(new Subject()); @@ -174,29 +172,29 @@ public class DRPCTest { DRPC.checkAuthorization(jt, auth, "fetchRequest", "jump"); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "fetchRequest", "jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "fetchRequest", "jump"), AuthorizationException.class); - + DRPC.checkAuthorization(jt, auth, "result", "jump"); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "result", "jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "result", "jump"), AuthorizationException.class); - + assertThrows(() -> DRPC.checkAuthorization(jt, auth, "execute", "jump"), AuthorizationException.class); DRPC.checkAuthorization(jc, auth, "execute", "jump"); assertThrows(() -> DRPC.checkAuthorization(other, auth, "execute", "jump"), AuthorizationException.class); - + //not_jump (closed in strict mode) assertThrows(() -> DRPC.checkAuthorization(jt, auth, "fetchRequest", "not_jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "fetchRequest", "not_jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "fetchRequest", "not_jump"), AuthorizationException.class); - + assertThrows(() -> DRPC.checkAuthorization(jt, auth, "result", "not_jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "result", "not_jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "result", "not_jump"), AuthorizationException.class); - + assertThrows(() -> DRPC.checkAuthorization(jt, auth, "execute", "not_jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "execute", "not_jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "execute", "not_jump"), AuthorizationException.class); } - + @Test public void testNotStrict() throws Exception { ReqContext jt = new ReqContext(new Subject()); @@ -227,26 +225,30 @@ public class DRPCTest { DRPC.checkAuthorization(jt, auth, "fetchRequest", "jump"); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "fetchRequest", "jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "fetchRequest", "jump"), AuthorizationException.class); - + DRPC.checkAuthorization(jt, auth, "result", "jump"); assertThrows(() -> DRPC.checkAuthorization(jc, auth, "result", "jump"), AuthorizationException.class); assertThrows(() -> DRPC.checkAuthorization(other, auth, "result", "jump"), AuthorizationException.class); - + assertThrows(() -> DRPC.checkAuthorization(jt, auth, "execute", "jump"), AuthorizationException.class); DRPC.checkAuthorization(jc, auth, "execute", "jump"); assertThrows(() -> DRPC.checkAuthorization(other, auth, "execute", "jump"), AuthorizationException.class); - + //not_jump (open in not strict mode) DRPC.checkAuthorization(jt, auth, "fetchRequest", "not_jump"); DRPC.checkAuthorization(jc, auth, "fetchRequest", "not_jump"); DRPC.checkAuthorization(other, auth, "fetchRequest", "not_jump"); - + DRPC.checkAuthorization(jt, auth, "result", "not_jump"); DRPC.checkAuthorization(jc, auth, "result", "not_jump"); DRPC.checkAuthorization(other, auth, "result", "not_jump"); - + DRPC.checkAuthorization(jt, auth, "execute", "not_jump"); DRPC.checkAuthorization(jc, auth, "execute", "not_jump"); DRPC.checkAuthorization(other, auth, "execute", "not_jump"); } + + public static interface ThrowStuff { + public void run() throws Exception; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java index 60ff2d7..47484ac 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.nimbus; import org.apache.storm.Config; @@ -26,7 +27,7 @@ import org.apache.storm.testing.TestWordSpout; import org.apache.storm.topology.TopologyBuilder; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; public class NimbusTest { @Test
