http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java 
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
index 45b6468..57b0750 100644
--- a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
+++ b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreTest.java
@@ -1,27 +1,22 @@
-
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,9 +24,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-
 import javax.security.auth.Subject;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.generated.AuthorizationException;
@@ -53,423 +46,422 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.spy;
 
-import java.nio.file.Files;
-
 public class BlobStoreTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
-  URI base;
-  File baseFile;
-  private static Map<String, Object> conf = new HashMap();
-  public static final int READ = 0x01;
-  public static final int WRITE = 0x02;
-  public static final int ADMIN = 0x04;
-
-  @Before
-  public void init() {
-    initializeConfigs();
-    baseFile = new File("target/blob-store-test-"+UUID.randomUUID());
-    base = baseFile.toURI();
-  }
-
-  @After
-  public void cleanup() throws IOException {
-    FileUtils.deleteDirectory(baseFile);
-  }
-
-  // Method which initializes nimbus admin
-  public static void initializeConfigs() {
-    conf.put(Config.NIMBUS_ADMINS,"admin");
-    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
-  }
-
-  // Gets Nimbus Subject with NimbusPrincipal set on it
-  public static Subject getNimbusSubject() {
-    Subject nimbus = new Subject();
-    nimbus.getPrincipals().add(new NimbusPrincipal());
-    return nimbus;
-  }
-
-  // Overloading the assertStoreHasExactly method accomodate Subject in order 
to check for authorization
-  public static void assertStoreHasExactly(BlobStore store, Subject who, 
String ... keys)
-      throws IOException, KeyNotFoundException, AuthorizationException {
-    Set<String> expected = new HashSet<String>(Arrays.asList(keys));
-    Set<String> found = new HashSet<String>();
-    Iterator<String> c = store.listKeys();
-    while (c.hasNext()) {
-      String keyName = c.next();
-      found.add(keyName);
-    }
-    Set<String> extra = new HashSet<String>(found);
-    extra.removeAll(expected);
-    assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty());
-    Set<String> missing = new HashSet<String>(expected);
-    missing.removeAll(found);
-    assertTrue("Found keys missing from the blob store "+missing, 
missing.isEmpty());
-  }
-
-  public static void assertStoreHasExactly(BlobStore store, String ... keys)
-      throws IOException, KeyNotFoundException, AuthorizationException {
-    assertStoreHasExactly(store, null, keys);
-  }
-
-  // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
-  public static int readInt(BlobStore store, Subject who, String key)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    InputStream in = store.getBlob(key, who);
-    try {
-      return in.read();
-    } finally {
-      in.close();
+    public static final int READ = 0x01;
+    public static final int WRITE = 0x02;
+    public static final int ADMIN = 0x04;
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
+    private static Map<String, Object> conf = new HashMap();
+    URI base;
+    File baseFile;
+
+    // Method which initializes nimbus admin
+    public static void initializeConfigs() {
+        conf.put(Config.NIMBUS_ADMINS, "admin");
+        conf.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
     }
-  }
-
-  public static int readInt(BlobStore store, String key)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    return readInt(store, null, key);
-  }
-
-  public static void readAssertEquals(BlobStore store, String key, int value)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertEquals(value, readInt(store, key));
-  }
-
-  // Checks for assertion when we turn on security
-  public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-   assertEquals(value, readInt(store, who, key));
-  }
-
-  private LocalFsBlobStore initLocalFs() {
-    LocalFsBlobStore store = new LocalFsBlobStore();
-    // Spy object that tries to mock the real object store
-    LocalFsBlobStore spy = spy(store);
-    Mockito.doNothing().when(spy).checkForBlobUpdate("test");
-    Mockito.doNothing().when(spy).checkForBlobUpdate("other");
-    Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE");
-    Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF");
-    Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls");
-    Map<String, Object> conf = Utils.readStormConfig();
-    conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
-    
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal");
-    spy.prepare(conf, null, null);
-    return spy;
-  }
-
-  @Test
-  public void testLocalFsWithAuth() throws Exception {
-    testWithAuthentication(initLocalFs());
-  }
-
-  @Test
-  public void testBasicLocalFs() throws Exception {
-    testBasic(initLocalFs());
-  }
-
-  @Test
-  public void testMultipleLocalFs() throws Exception {
-    testMultiple(initLocalFs());
-  }
-
-  @Test
-  public void testDeleteAfterFailedCreate() throws Exception{
-    //Check that a blob can be deleted when a temporary file exists in the 
blob directory
-    LocalFsBlobStore store = initLocalFs();
-
-    String key = "test";
-    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
-            .WORLD_EVERYTHING);
-    try (AtomicOutputStream out = store.createBlob(key, metadata, null)) {
-        out.write(1);
-        File blobDir = store.getKeyDataDir(key);
-        Files.createFile(blobDir.toPath().resolve("tempFile.tmp"));
-    }
-    
-    store.deleteBlob("test",null);
-
-  }
-
-  public Subject getSubject(String name) {
-    Subject subject = new Subject();
-    SingleUserPrincipal user = new SingleUserPrincipal(name);
-    subject.getPrincipals().add(user);
-    return subject;
-  }
-
-  // Check for Blobstore with authentication
-  public void testWithAuthentication(BlobStore store) throws Exception {
-    //Test for Nimbus Admin
-    Subject admin = getSubject("admin");
-    assertStoreHasExactly(store);
-    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
-      assertStoreHasExactly(store, "test");
-      out.write(1);
-    }
-    store.deleteBlob("test", admin);
-
-    //Test for Supervisor Admin
-    Subject supervisor = getSubject("supervisor");
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    try (AtomicOutputStream out = store.createBlob("test", metadata, 
supervisor)) {
-      assertStoreHasExactly(store, "test");
-      out.write(1);
-    }
-    store.deleteBlob("test", supervisor);
-
-    //Test for Nimbus itself as a user
-    Subject nimbus = getNimbusSubject();
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
-      assertStoreHasExactly(store, "test");
-      out.write(1);
-    }
-    store.deleteBlob("test", nimbus);
 
-    // Test with a dummy test_subject for cases where subject !=null (security 
turned on)
-    Subject who = getSubject("test_subject");
-    assertStoreHasExactly(store);
-
-    // Tests for case when subject != null (security turned on) and
-    // acls for the blob are set to WORLD_EVERYTHING
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-      out.write(1);
-    }
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test", 1);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", who);
-    assertStoreHasExactly(store);
-
-    // Tests for case when subject != null (security turned on) and
-    // acls are not set for the blob (DEFAULT)
-    LOG.info("Creating test again");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-      out.write(2);
+    // Gets Nimbus Subject with NimbusPrincipal set on it
+    public static Subject getNimbusSubject() {
+        Subject nimbus = new Subject();
+        nimbus.getPrincipals().add(new NimbusPrincipal());
+        return nimbus;
     }
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should 
not contain WORLD_EVERYTHING because
-    // the subject is neither null nor empty. The ACL should however contain 
USER_EVERYTHING as user needs to have
-    // complete access to the blob
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test", 2);
-
-    LOG.info("Updating test");
-    try (AtomicOutputStream out = store.updateBlob("test", who)) {
-      out.write(3);
+
+    // Overloading the assertStoreHasExactly method accomodate Subject in 
order to check for authorization
+    public static void assertStoreHasExactly(BlobStore store, Subject who, 
String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        Set<String> expected = new HashSet<String>(Arrays.asList(keys));
+        Set<String> found = new HashSet<String>();
+        Iterator<String> c = store.listKeys();
+        while (c.hasNext()) {
+            String keyName = c.next();
+            found.add(keyName);
+        }
+        Set<String> extra = new HashSet<String>(found);
+        extra.removeAll(expected);
+        assertTrue("Found extra keys in the blob store " + extra, 
extra.isEmpty());
+        Set<String> missing = new HashSet<String>(expected);
+        missing.removeAll(found);
+        assertTrue("Found keys missing from the blob store " + missing, 
missing.isEmpty());
     }
-    assertStoreHasExactly(store, "test");
-    readAssertEqualsWithAuth(store, who, "test", 3);
-
-    LOG.info("Updating test again");
-    try (AtomicOutputStream out = store.updateBlob("test", who)) {
-      out.write(4);
-      out.flush();
-      LOG.info("SLEEPING");
-      Thread.sleep(2);
-      assertStoreHasExactly(store, "test");
-      readAssertEqualsWithAuth(store, who, "test", 3);
+
+    public static void assertStoreHasExactly(BlobStore store, String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertStoreHasExactly(store, null, keys);
     }
 
-    // Test for subject with no principals and acls set to WORLD_EVERYTHING
-    who = new Subject();
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    LOG.info("Creating test");
-    try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", 
metadata, who)) {
-      out.write(2);
+    // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
+    public static int readInt(BlobStore store, Subject who, String key)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        InputStream in = store.getBlob(key, who);
+        try {
+            return in.read();
+        } finally {
+            in.close();
+        }
     }
-    assertStoreHasExactly(store, "test-empty-subject-WE", "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
-    // Test for subject with no principals and acls set to DEFAULT
-    who = new Subject();
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    LOG.info("Creating other");
-
-    try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", 
metadata, who)) {
-      out.write(2);
+
+    public static int readInt(BlobStore store, String key)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        return readInt(store, null, key);
     }
-    assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
-    if (store instanceof LocalFsBlobStore) {
-      ((LocalFsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
+
+    public static void readAssertEquals(BlobStore store, String key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, key));
     }
-  }
-
-  public void testBasic(BlobStore store) throws Exception {
-    assertStoreHasExactly(store);
-    LOG.info("Creating test");
-    // Tests for case when subject == null (security turned off) and
-    // acls for the blob are set to WORLD_EVERYTHING
-    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
-            .WORLD_EVERYTHING);
-    try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
-      out.write(1);
+
+    @Before
+    public void init() {
+        initializeConfigs();
+        baseFile = new File("target/blob-store-test-" + UUID.randomUUID());
+        base = baseFile.toURI();
     }
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEquals(store, "test", 1);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", null);
-    assertStoreHasExactly(store);
-
-    // The following tests are run for both hdfs and local store to test the
-    // update blob interface
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    LOG.info("Creating test again");
-    try (AtomicOutputStream out = store.createBlob("test", metadata, null)) {
-      out.write(2);
+
+    @After
+    public void cleanup() throws IOException {
+        FileUtils.deleteDirectory(baseFile);
     }
-    assertStoreHasExactly(store, "test");
-    if (store instanceof LocalFsBlobStore) {
-      assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+
+    // Checks for assertion when we turn on security
+    public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, who, key));
     }
-    readAssertEquals(store, "test", 2);
-    LOG.info("Updating test");
-    try (AtomicOutputStream out = store.updateBlob("test", null)) {
-      out.write(3);
+
+    private LocalFsBlobStore initLocalFs() {
+        LocalFsBlobStore store = new LocalFsBlobStore();
+        // Spy object that tries to mock the real object store
+        LocalFsBlobStore spy = spy(store);
+        Mockito.doNothing().when(spy).checkForBlobUpdate("test");
+        Mockito.doNothing().when(spy).checkForBlobUpdate("other");
+        
Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE");
+        
Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF");
+        Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls");
+        Map<String, Object> conf = Utils.readStormConfig();
+        conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        spy.prepare(conf, null, null);
+        return spy;
     }
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-
-    LOG.info("Updating test again");
-    try (AtomicOutputStream out = store.updateBlob("test", null)) {
-      out.write(4);
-      out.flush();
-      LOG.info("SLEEPING");
-      Thread.sleep(2);
+
+    @Test
+    public void testLocalFsWithAuth() throws Exception {
+        testWithAuthentication(initLocalFs());
     }
 
-    // Tests for case when subject == null (security turned off) and
-    // acls for the blob are set to DEFAULT (Empty ACL List) only for 
LocalFsBlobstore
-    if (store instanceof LocalFsBlobStore) {
-      metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-      LOG.info("Creating test for empty acls when security is off");
-      try (AtomicOutputStream out = store.createBlob("test-empty-acls", 
metadata, null)) {
-        LOG.info("metadata {}", metadata);
-        out.write(2);
-      }
-      assertStoreHasExactly(store, "test-empty-acls", "test");
-      // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing 
only for LocalFsBlobstore
-      // as the HdfsBlobstore gets the subject information of the local system 
user and behaves as it is
-      // always authenticated.
-      assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.get_acl().toString().contains("OTHER"));
-
-      LOG.info("Deleting test-empty-acls");
-      store.deleteBlob("test-empty-acls", null);
+    @Test
+    public void testBasicLocalFs() throws Exception {
+        testBasic(initLocalFs());
     }
 
-    if (store instanceof LocalFsBlobStore) {
-      ((LocalFsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
+    @Test
+    public void testMultipleLocalFs() throws Exception {
+        testMultiple(initLocalFs());
     }
-  }
 
+    @Test
+    public void testDeleteAfterFailedCreate() throws Exception {
+        //Check that a blob can be deleted when a temporary file exists in the 
blob directory
+        LocalFsBlobStore store = initLocalFs();
 
-  public void testMultiple(BlobStore store) throws Exception {
+        String key = "test";
+        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
+                                                             
.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob(key, metadata, null)) {
+            out.write(1);
+            File blobDir = store.getKeyDataDir(key);
+            Files.createFile(blobDir.toPath().resolve("tempFile.tmp"));
+        }
 
-    assertStoreHasExactly(store);
-    LOG.info("Creating test");
-    try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
-        .WORLD_EVERYTHING), null)) {
-      out.write(1);
-    }
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 1);
+        store.deleteBlob("test", null);
 
-    LOG.info("Creating other");
-    try (AtomicOutputStream out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-        null)) {
-      out.write(2);
     }
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 1);
-    readAssertEquals(store, "other", 2);
 
-    LOG.info("Updating other");
-    try (AtomicOutputStream out = store.updateBlob("other", null)) {
-      out.write(5);
+    public Subject getSubject(String name) {
+        Subject subject = new Subject();
+        SingleUserPrincipal user = new SingleUserPrincipal(name);
+        subject.getPrincipals().add(user);
+        return subject;
     }
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 1);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", null);
-    assertStoreHasExactly(store, "other");
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Creating test again");
-    try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-        null)) {
-      out.write(2);
-    }
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 2);
-    readAssertEquals(store, "other", 5);
 
-    LOG.info("Updating test");
-    try (AtomicOutputStream out = store.updateBlob("test", null)) {
-      out.write(3);
+    // Check for Blobstore with authentication
+    public void testWithAuthentication(BlobStore store) throws Exception {
+        //Test for Nimbus Admin
+        Subject admin = getSubject("admin");
+        assertStoreHasExactly(store);
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
admin)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", admin);
+
+        //Test for Supervisor Admin
+        Subject supervisor = getSubject("supervisor");
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
supervisor)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", supervisor);
+
+        //Test for Nimbus itself as a user
+        Subject nimbus = getNimbusSubject();
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
nimbus)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", nimbus);
+
+        // Test with a dummy test_subject for cases where subject !=null 
(security turned on)
+        Subject who = getSubject("test_subject");
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", who);
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls are not set for the blob (DEFAULT)
+        LOG.info("Creating test again");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING. Here the acl 
should not contain WORLD_EVERYTHING because
+        // the subject is neither null nor empty. The ACL should however 
contain USER_EVERYTHING as user needs to have
+        // complete access to the blob
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 2);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(4);
+            out.flush();
+            LOG.info("SLEEPING");
+            Thread.sleep(2);
+            assertStoreHasExactly(store, "test");
+            readAssertEqualsWithAuth(store, who, "test", 3);
+        }
+
+        // Test for subject with no principals and acls set to WORLD_EVERYTHING
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = 
store.createBlob("test-empty-subject-WE", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+        // Test for subject with no principals and acls set to DEFAULT
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        LOG.info("Creating other");
+
+        try (AtomicOutputStream out = 
store.createBlob("test-empty-subject-DEF", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+        if (store instanceof LocalFsBlobStore) {
+            ((LocalFsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
     }
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 3);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Deleting other");
-    store.deleteBlob("other", null);
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-
-    LOG.info("Updating test again");
-
-    // intended to not guarding with try-with-resource since otherwise test 
will fail
-    AtomicOutputStream out = store.updateBlob("test", null);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-
-    if (store instanceof LocalFsBlobStore) {
-      ((LocalFsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
-    }    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-    try {
-      out.close();
-    } catch (IOException e) {
-      // This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    public void testBasic(BlobStore store) throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        // Tests for case when subject == null (security turned off) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
+                                                             
.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store);
+
+        // The following tests are run for both hdfs and local store to test 
the
+        // update blob interface
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        if (store instanceof LocalFsBlobStore) {
+            assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        }
+        readAssertEquals(store, "test", 2);
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+            out.flush();
+            LOG.info("SLEEPING");
+            Thread.sleep(2);
+        }
+
+        // Tests for case when subject == null (security turned off) and
+        // acls for the blob are set to DEFAULT (Empty ACL List) only for 
LocalFsBlobstore
+        if (store instanceof LocalFsBlobStore) {
+            metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+            LOG.info("Creating test for empty acls when security is off");
+            try (AtomicOutputStream out = store.createBlob("test-empty-acls", 
metadata, null)) {
+                LOG.info("metadata {}", metadata);
+                out.write(2);
+            }
+            assertStoreHasExactly(store, "test-empty-acls", "test");
+            // Testing whether acls are set to WORLD_EVERYTHING, Here we are 
testing only for LocalFsBlobstore
+            // as the HdfsBlobstore gets the subject information of the local 
system user and behaves as it is
+            // always authenticated.
+            assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.get_acl().toString().contains("OTHER"));
+
+            LOG.info("Deleting test-empty-acls");
+            store.deleteBlob("test-empty-acls", null);
+        }
+
+        if (store instanceof LocalFsBlobStore) {
+            ((LocalFsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
     }
-  }
-
-  @Test
-  public void testGetFileLength()
-          throws AuthorizationException, KeyNotFoundException, 
KeyAlreadyExistsException, IOException {
-    LocalFsBlobStore store = initLocalFs();
-    try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
-        .WORLD_EVERYTHING), null)) {
-      out.write(1);
+
+
+    public void testMultiple(BlobStore store) throws Exception {
+
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
+                                                                               
         .WORLD_EVERYTHING), null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+                                                       null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 2);
+
+        LOG.info("Updating other");
+        try (AtomicOutputStream out = store.updateBlob("other", null)) {
+            out.write(5);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store, "other");
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+                                                       null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 2);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 3);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting other");
+        store.deleteBlob("other", null);
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+
+        // intended to not guarding with try-with-resource since otherwise 
test will fail
+        AtomicOutputStream out = store.updateBlob("test", null);
+        out.write(4);
+        out.flush();
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof LocalFsBlobStore) {
+            ((LocalFsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+        try {
+            out.close();
+        } catch (IOException e) {
+            // This is likely to happen when we try to commit something that
+            // was cleaned up.  This is expected and acceptable.
+        }
     }
-    try (InputStreamWithMeta blobInputStream = store.getBlob("test", null)) {
-      assertEquals(1, blobInputStream.getFileLength());
+
+    @Test
+    public void testGetFileLength()
+        throws AuthorizationException, KeyNotFoundException, 
KeyAlreadyExistsException, IOException {
+        LocalFsBlobStore store = initLocalFs();
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
+                                                                               
         .WORLD_EVERYTHING), null)) {
+            out.write(1);
+        }
+        try (InputStreamWithMeta blobInputStream = store.getBlob("test", 
null)) {
+            assertEquals(1, blobInputStream.getFileLength());
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java 
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
index e861db8..2c3a49a 100644
--- 
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobStoreUtilsTest.java
@@ -1,22 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.junit.Test;
+
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
@@ -24,19 +24,13 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.nimbus.NimbusInfo;
-import org.junit.Test;
-
 public class BlobStoreUtilsTest {
 
     private static final String KEY = "key";
     private static final String BLOBSTORE_KEY = "/blobstore/" + KEY;
 
     @SuppressWarnings("unchecked")
-    private Map<String, Object> conf = (Map<String, Object>)mock(Map.class);
+    private Map<String, Object> conf = (Map<String, Object>) mock(Map.class);
     private MockZookeeperClientBuilder zkClientBuilder = new 
MockZookeeperClientBuilder();
     private BlobStore blobStore = mock(BlobStore.class);
     private NimbusInfo nimbusDetails = mock(NimbusInfo.class);
@@ -75,7 +69,7 @@ public class BlobStoreUtilsTest {
     @Test
     public void testUpdateKeyForBlobStore_nodeWithNullChildren() {
         zkClientBuilder.withExists(BLOBSTORE_KEY, true);
-        zkClientBuilder.withGetChildren(BLOBSTORE_KEY, (List<String>)null);
+        zkClientBuilder.withGetChildren(BLOBSTORE_KEY, (List<String>) null);
         BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, 
zkClientBuilder.build(), KEY, nimbusDetails);
 
         zkClientBuilder.verifyExists(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java
 
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java
index ea12204..d988b48 100644
--- 
a/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/blobstore/BlobSynchronizerTest.java
@@ -1,40 +1,40 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
-import org.apache.storm.Config;
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Utils;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
+import org.apache.storm.Config;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.*;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -43,93 +43,94 @@ import static org.junit.Assert.assertTrue;
  *  and BlobSynchronizer class methods
  */
 public class BlobSynchronizerTest {
-  private URI base;
-  private File baseFile;
-  private static Map<String, Object> conf = new HashMap();
-  private NIOServerCnxnFactory factory;
+    private static Map<String, Object> conf = new HashMap();
+    private URI base;
+    private File baseFile;
+    private NIOServerCnxnFactory factory;
 
-  @Before
-  public void init() throws Exception {
-    initializeConfigs();
-    baseFile = new File("target/blob-store-test-"+ UUID.randomUUID());
-    base = baseFile.toURI();
-  }
+    // Method which initializes nimbus admin
+    public static void initializeConfigs() {
+        conf.put(Config.NIMBUS_ADMINS, "admin");
+        conf.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
+    }
 
-  @After
-  public void cleanUp() throws IOException {
-    FileUtils.deleteDirectory(baseFile);
-    if (factory != null) {
-      factory.shutdown();
+    @Before
+    public void init() throws Exception {
+        initializeConfigs();
+        baseFile = new File("target/blob-store-test-" + UUID.randomUUID());
+        base = baseFile.toURI();
     }
-  }
 
-  // Method which initializes nimbus admin
-  public static void initializeConfigs() {
-    conf.put(Config.NIMBUS_ADMINS,"admin");
-    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
-  }
+    @After
+    public void cleanUp() throws IOException {
+        FileUtils.deleteDirectory(baseFile);
+        if (factory != null) {
+            factory.shutdown();
+        }
+    }
 
-  private LocalFsBlobStore initLocalFs() {
-    LocalFsBlobStore store = new LocalFsBlobStore();
-    Map<String, Object> conf = Utils.readStormConfig();
-    conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
-    
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal");
-    this.conf = conf;
-    store.prepare(conf, null, null);
-    return store;
-  }
+    private LocalFsBlobStore initLocalFs() {
+        LocalFsBlobStore store = new LocalFsBlobStore();
+        Map<String, Object> conf = Utils.readStormConfig();
+        conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        this.conf = conf;
+        store.prepare(conf, null, null);
+        return store;
+    }
 
-  @Test
-  public void testBlobSynchronizerForKeysToDownload() {
-    BlobStore store = initLocalFs();
-    BlobSynchronizer sync = new BlobSynchronizer(store, conf);
-    // test for keylist to download
-    Set<String> zkSet = new HashSet<String>();
-    zkSet.add("key1");
-    Set<String> blobStoreSet = new HashSet<String>();
-    blobStoreSet.add("key1");
-    Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
-    assertTrue("Not Empty", resultSet.isEmpty());
-    zkSet.add("key1");
-    blobStoreSet.add("key2");
-    resultSet =  sync.getKeySetToDownload(blobStoreSet, zkSet);
-    assertTrue("Not Empty", resultSet.isEmpty());
-    blobStoreSet.remove("key1");
-    blobStoreSet.remove("key2");
-    zkSet.add("key1");
-    resultSet =  sync.getKeySetToDownload(blobStoreSet, zkSet);
-    assertTrue("Unexpected keys to download", (resultSet.size() == 1) && 
(resultSet.contains("key1")));
-  }
+    @Test
+    public void testBlobSynchronizerForKeysToDownload() {
+        BlobStore store = initLocalFs();
+        BlobSynchronizer sync = new BlobSynchronizer(store, conf);
+        // test for keylist to download
+        Set<String> zkSet = new HashSet<String>();
+        zkSet.add("key1");
+        Set<String> blobStoreSet = new HashSet<String>();
+        blobStoreSet.add("key1");
+        Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+        assertTrue("Not Empty", resultSet.isEmpty());
+        zkSet.add("key1");
+        blobStoreSet.add("key2");
+        resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+        assertTrue("Not Empty", resultSet.isEmpty());
+        blobStoreSet.remove("key1");
+        blobStoreSet.remove("key2");
+        zkSet.add("key1");
+        resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+        assertTrue("Unexpected keys to download", (resultSet.size() == 1) && 
(resultSet.contains("key1")));
+    }
 
-  @Test
-  public void testGetLatestSequenceNumber() throws Exception {
-    List<String> stateInfoList = new ArrayList<String>();
-    stateInfoList.add("nimbus1:8000-2");
-    stateInfoList.add("nimbus-1:8000-4");
-    assertTrue("Failed to get the latest version", 
BlobStoreUtils.getLatestSequenceNumber(stateInfoList)==4);
-  }
+    @Test
+    public void testGetLatestSequenceNumber() throws Exception {
+        List<String> stateInfoList = new ArrayList<String>();
+        stateInfoList.add("nimbus1:8000-2");
+        stateInfoList.add("nimbus-1:8000-4");
+        assertTrue("Failed to get the latest version", 
BlobStoreUtils.getLatestSequenceNumber(stateInfoList) == 4);
+    }
 
-  @Test
-  public void testNimbodesWithLatestVersionOfBlob() throws Exception {
-      try (TestingServer server = new TestingServer(); CuratorFramework 
zkClient = CuratorFrameworkFactory.newClient(server.getConnectString(), new 
ExponentialBackoffRetry(1000, 3))) {
-          zkClient.start();
-          // Creating nimbus hosts containing latest version of blob
-          
zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
-          
zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
-          Set<NimbusInfo> set = 
BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1");
-          assertEquals("Failed to get the correct nimbus hosts with latest 
blob version", (set.iterator().next()).getHost(),"nimbus2");
-          
zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
-          
zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
-      }
-  }
+    @Test
+    public void testNimbodesWithLatestVersionOfBlob() throws Exception {
+        try (TestingServer server = new TestingServer(); CuratorFramework 
zkClient = CuratorFrameworkFactory
+            .newClient(server.getConnectString(), new 
ExponentialBackoffRetry(1000, 3))) {
+            zkClient.start();
+            // Creating nimbus hosts containing latest version of blob
+            
zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
+            
zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
+            Set<NimbusInfo> set = 
BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1");
+            assertEquals("Failed to get the correct nimbus hosts with latest 
blob version", (set.iterator().next()).getHost(), "nimbus2");
+            
zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
+            
zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
+        }
+    }
 
-  @Test
-  public void testNormalizeVersionInfo () throws Exception {
-    BlobKeySequenceInfo info1 = 
BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1");
-    assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800"));
-    assertTrue(info1.getSequenceNumber().equals("1"));
-    BlobKeySequenceInfo info2 = 
BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1");
-    assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800"));
-    assertTrue(info2.getSequenceNumber().equals("1"));
-  }
+    @Test
+    public void testNormalizeVersionInfo() throws Exception {
+        BlobKeySequenceInfo info1 = 
BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1");
+        assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800"));
+        assertTrue(info1.getSequenceNumber().equals("1"));
+        BlobKeySequenceInfo info2 = 
BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1");
+        assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800"));
+        assertTrue(info2.getSequenceNumber().equals("1"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java
 
b/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java
index c3e2b84..2da3b2d 100644
--- 
a/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java
+++ 
b/storm-server/src/test/java/org/apache/storm/blobstore/MockZookeeperClientBuilder.java
@@ -1,30 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.blobstore;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+package org.apache.storm.blobstore;
 
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.ExistsBuilder;
 import org.apache.curator.framework.api.GetChildrenBuilder;
@@ -32,6 +21,11 @@ import org.apache.curator.framework.api.Pathable;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class MockZookeeperClientBuilder {
 
     private static final Logger LOG = 
Logger.getLogger(MockZookeeperClientBuilder.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java 
b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
index 73f5ff4..dee13dc 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
@@ -15,9 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.daemon.drpc;
 
-import static org.junit.Assert.*;
+package org.apache.storm.daemon.drpc;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,9 +26,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import javax.security.auth.Subject;
-
 import org.apache.storm.Config;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.DRPCExceptionType;
@@ -39,19 +36,20 @@ import 
org.apache.storm.security.auth.DefaultPrincipalToLocal;
 import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer;
-import org.apache.storm.security.auth.authorizer.DenyAuthorizer;
 import 
org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer.AclFunctionEntry;
+import org.apache.storm.security.auth.authorizer.DenyAuthorizer;
 import org.apache.storm.utils.Time;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class DRPCTest {
     private static final ExecutorService exec = 
Executors.newCachedThreadPool();
-    
-    public static interface ThrowStuff {
-        public void run() throws Exception;
-    }
-    
+
     private static void assertThrows(ThrowStuff t, Class<? extends Exception> 
expected) {
         try {
             t.run();
@@ -60,12 +58,12 @@ public class DRPCTest {
             assertTrue("Expected " + t + " to throw " + expected + " but threw 
" + e, expected.isInstance(e));
         }
     }
-    
+
     @AfterClass
     public static void close() {
         exec.shutdownNow();
     }
-    
+
     public static DRPCRequest getNextAvailableRequest(DRPC server, String 
func) throws Exception {
         DRPCRequest request = null;
         long timedout = System.currentTimeMillis() + 5_000;
@@ -79,7 +77,7 @@ public class DRPCTest {
         fail("Test timed out waiting for a request on " + func);
         return request;
     }
-    
+
     @Test
     public void testGoodBlocking() throws Exception {
         try (DRPC server = new DRPC(null, 100)) {
@@ -93,7 +91,7 @@ public class DRPCTest {
             assertEquals("tested", result);
         }
     }
-    
+
     @Test
     public void testFailedBlocking() throws Exception {
         try (DRPC server = new DRPC(null, 100)) {
@@ -110,11 +108,11 @@ public class DRPCTest {
                 Throwable t = e.getCause();
                 assertEquals(t.getClass(), DRPCExecutionException.class);
                 //Don't know a better way to validate that it failed.
-                assertEquals(DRPCExceptionType.FAILED_REQUEST, 
((DRPCExecutionException)t).get_type());
+                assertEquals(DRPCExceptionType.FAILED_REQUEST, 
((DRPCExecutionException) t).get_type());
             }
         }
     }
-    
+
     @Test
     public void testDequeueAfterTimeout() throws Exception {
         long timeout = 1000;
@@ -135,7 +133,7 @@ public class DRPCTest {
             assertEquals("", request.get_func_args());
         }
     }
-    
+
     @Test
     public void testDeny() throws Exception {
         try (DRPC server = new DRPC(new DenyAuthorizer(), 100)) {
@@ -143,7 +141,7 @@ public class DRPCTest {
             assertThrows(() -> server.fetchRequest("testing"), 
AuthorizationException.class);
         }
     }
-    
+
     @Test
     public void testStrict() throws Exception {
         ReqContext jt = new ReqContext(new Subject());
@@ -174,29 +172,29 @@ public class DRPCTest {
         DRPC.checkAuthorization(jt, auth, "fetchRequest", "jump");
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "fetchRequest", 
"jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, 
"fetchRequest", "jump"), AuthorizationException.class);
-        
+
         DRPC.checkAuthorization(jt, auth, "result", "jump");
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "result", 
"jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, "result", 
"jump"), AuthorizationException.class);
-        
+
         assertThrows(() -> DRPC.checkAuthorization(jt, auth, "execute", 
"jump"), AuthorizationException.class);
         DRPC.checkAuthorization(jc, auth, "execute", "jump");
         assertThrows(() -> DRPC.checkAuthorization(other, auth, "execute", 
"jump"), AuthorizationException.class);
-        
+
         //not_jump (closed in strict mode)
         assertThrows(() -> DRPC.checkAuthorization(jt, auth, "fetchRequest", 
"not_jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "fetchRequest", 
"not_jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, 
"fetchRequest", "not_jump"), AuthorizationException.class);
-        
+
         assertThrows(() -> DRPC.checkAuthorization(jt, auth, "result", 
"not_jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "result", 
"not_jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, "result", 
"not_jump"), AuthorizationException.class);
-        
+
         assertThrows(() -> DRPC.checkAuthorization(jt, auth, "execute", 
"not_jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "execute", 
"not_jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, "execute", 
"not_jump"), AuthorizationException.class);
     }
-    
+
     @Test
     public void testNotStrict() throws Exception {
         ReqContext jt = new ReqContext(new Subject());
@@ -227,26 +225,30 @@ public class DRPCTest {
         DRPC.checkAuthorization(jt, auth, "fetchRequest", "jump");
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "fetchRequest", 
"jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, 
"fetchRequest", "jump"), AuthorizationException.class);
-        
+
         DRPC.checkAuthorization(jt, auth, "result", "jump");
         assertThrows(() -> DRPC.checkAuthorization(jc, auth, "result", 
"jump"), AuthorizationException.class);
         assertThrows(() -> DRPC.checkAuthorization(other, auth, "result", 
"jump"), AuthorizationException.class);
-        
+
         assertThrows(() -> DRPC.checkAuthorization(jt, auth, "execute", 
"jump"), AuthorizationException.class);
         DRPC.checkAuthorization(jc, auth, "execute", "jump");
         assertThrows(() -> DRPC.checkAuthorization(other, auth, "execute", 
"jump"), AuthorizationException.class);
-        
+
         //not_jump (open in not strict mode)
         DRPC.checkAuthorization(jt, auth, "fetchRequest", "not_jump");
         DRPC.checkAuthorization(jc, auth, "fetchRequest", "not_jump");
         DRPC.checkAuthorization(other, auth, "fetchRequest", "not_jump");
-        
+
         DRPC.checkAuthorization(jt, auth, "result", "not_jump");
         DRPC.checkAuthorization(jc, auth, "result", "not_jump");
         DRPC.checkAuthorization(other, auth, "result", "not_jump");
-        
+
         DRPC.checkAuthorization(jt, auth, "execute", "not_jump");
         DRPC.checkAuthorization(jc, auth, "execute", "not_jump");
         DRPC.checkAuthorization(other, auth, "execute", "not_jump");
     }
+
+    public static interface ThrowStuff {
+        public void run() throws Exception;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java 
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
index 60ff2d7..47484ac 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.nimbus;
 
 import org.apache.storm.Config;
@@ -26,7 +27,7 @@ import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
 
 public class NimbusTest {
     @Test

Reply via email to