http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index fcb7221..19ff38c 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -31,10 +32,7 @@ import org.apache.storm.security.auth.FixedGroupsMapping;
 import org.apache.storm.security.auth.NimbusPrincipal;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -58,504 +56,489 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.ClassRule;
+
 public class BlobStoreTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
-  protected static MiniDFSCluster dfscluster = null;
-  protected static Configuration hadoopConf = null;
-  URI base;
-  File baseFile;
-  private static Map<String, Object> conf = new HashMap();
-  public static final int READ = 0x01;
-  public static final int WRITE = 0x02;
-  public static final int ADMIN = 0x04;
-
-  @Before
-  public void init() {
-    System.setProperty("test.build.data", "target/test/data");
-    initializeConfigs();
-    baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID());
-    base = baseFile.toURI();
-  }
-
-  @After
-  public void cleanup()
-          throws IOException {
-    FileUtils.deleteDirectory(baseFile);
-  }
-
-  @AfterClass
-  public static void cleanupAfterClass() throws IOException {
-    if (dfscluster != null) {
-      dfscluster.shutdown();
+
+    @ClassRule
+    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new 
MiniDFSClusterRule();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
+    URI base;
+    File baseFile;
+    private static final Map<String, Object> CONF = new HashMap<>();
+    public static final int READ = 0x01;
+    public static final int WRITE = 0x02;
+    public static final int ADMIN = 0x04;
+
+    @Before
+    public void init() {
+        initializeConfigs();
+        baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
+        base = baseFile.toURI();
+    }
+
+    @After
+    public void cleanup()
+        throws IOException {
+        FileUtils.deleteDirectory(baseFile);
+    }
+
+    // Method which initializes nimbus admin
+    public static void initializeConfigs() {
+        CONF.put(Config.NIMBUS_ADMINS, "admin");
+        CONF.put(Config.NIMBUS_ADMINS_GROUPS, "adminsGroup");
+
+        // Construct a groups mapping for the FixedGroupsMapping class
+        Map<String, Set<String>> groupsMapping = new HashMap<>();
+        Set<String> groupSet = new HashSet<>();
+        groupSet.add("adminsGroup");
+        groupsMapping.put("adminsGroupsUser", groupSet);
+
+        // Now create a params map to put it in to our conf
+        Map<String, Object> paramMap = new HashMap<>();
+        paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, 
groupsMapping);
+        CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, 
"org.apache.storm.security.auth.FixedGroupsMapping");
+        CONF.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap);
+        CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
+    }
+
+    //Gets Nimbus Subject with NimbusPrincipal set on it
+    public static Subject getNimbusSubject() {
+        Subject nimbus = new Subject();
+        nimbus.getPrincipals().add(new NimbusPrincipal());
+        return nimbus;
+    }
+
+    // Overloading the assertStoreHasExactly method accomodate Subject in 
order to check for authorization
+    public static void assertStoreHasExactly(BlobStore store, Subject who, 
String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        Set<String> expected = new HashSet<>(Arrays.asList(keys));
+        Set<String> found = new HashSet<>();
+        Iterator<String> c = store.listKeys();
+        while (c.hasNext()) {
+            String keyName = c.next();
+            found.add(keyName);
+        }
+        Set<String> extra = new HashSet<>(found);
+        extra.removeAll(expected);
+        assertTrue("Found extra keys in the blob store " + extra, 
extra.isEmpty());
+        Set<String> missing = new HashSet<>(expected);
+        missing.removeAll(found);
+        assertTrue("Found keys missing from the blob store " + missing, 
missing.isEmpty());
+    }
+
+    public static void assertStoreHasExactly(BlobStore store, String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertStoreHasExactly(store, null, keys);
+    }
+
+    // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
+    public static int readInt(BlobStore store, Subject who, String key) throws 
IOException, KeyNotFoundException, AuthorizationException {
+        try (InputStream in = store.getBlob(key, who)) {
+            return in.read();
+        }
+    }
+
+    public static int readInt(BlobStore store, String key)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        return readInt(store, null, key);
+    }
+
+    public static void readAssertEquals(BlobStore store, String key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, key));
     }
-  }
-
-  // Method which initializes nimbus admin
-  public static void initializeConfigs() {
-    conf.put(Config.NIMBUS_ADMINS,"admin");
-    conf.put(Config.NIMBUS_ADMINS_GROUPS,"adminsGroup");
-
-    // Construct a groups mapping for the FixedGroupsMapping class
-    Map<String, Set<String>> groupsMapping = new HashMap<String, 
Set<String>>();
-    Set<String> groupSet = new HashSet<String>();
-    groupSet.add("adminsGroup");
-    groupsMapping.put("adminsGroupsUser", groupSet);
-
-    // Now create a params map to put it in to our conf
-    Map<String, Object> paramMap = new HashMap<String, Object>();
-    paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groupsMapping);
-    conf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, 
"org.apache.storm.security.auth.FixedGroupsMapping");
-    conf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap);
-    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
-  }
-
-  //Gets Nimbus Subject with NimbusPrincipal set on it
-  public static Subject getNimbusSubject() {
-    Subject nimbus = new Subject();
-    nimbus.getPrincipals().add(new NimbusPrincipal());
-    return nimbus;
-  }
-
-  // Overloading the assertStoreHasExactly method accomodate Subject in order 
to check for authorization
-  public static void assertStoreHasExactly(BlobStore store, Subject who, 
String ... keys)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    Set<String> expected = new HashSet<String>(Arrays.asList(keys));
-    Set<String> found = new HashSet<String>();
-    Iterator<String> c = store.listKeys();
-    while (c.hasNext()) {
-      String keyName = c.next();
-      found.add(keyName);
+
+    // Checks for assertion when we turn on security
+    public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, who, key));
     }
-    Set<String> extra = new HashSet<String>(found);
-    extra.removeAll(expected);
-    assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty());
-    Set<String> missing = new HashSet<String>(expected);
-    missing.removeAll(found);
-    assertTrue("Found keys missing from the blob store "+missing, 
missing.isEmpty());
-  }
-
-  public static void assertStoreHasExactly(BlobStore store, String ... keys)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertStoreHasExactly(store, null, keys);
-  }
-
-  // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
-  public static int readInt(BlobStore store, Subject who, String key) throws 
IOException, KeyNotFoundException, AuthorizationException {
-    InputStream in = store.getBlob(key, who);
-    try {
-      return in.read();
-    } finally {
-      in.close();
+
+    private AutoCloseableBlobStoreContainer initHdfs(String dirName)
+        throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.BLOBSTORE_DIR, dirName);
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
+        HdfsBlobStore store = new HdfsBlobStore();
+        store.prepareInternal(conf, null, 
DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
+        return new AutoCloseableBlobStoreContainer(store);
     }
-  }
-
-  public static int readInt(BlobStore store, String key)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    return readInt(store, null, key);
-  }
-
-  public static void readAssertEquals(BlobStore store, String key, int value)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertEquals(value, readInt(store, key));
-  }
-
-  // Checks for assertion when we turn on security
-  public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertEquals(value, readInt(store, who, key));
-  }
-
-  private HdfsBlobStore initHdfs(String dirName)
-          throws Exception {
-    if (hadoopConf == null) {
-      hadoopConf = new Configuration();
+
+    private static class AutoCloseableBlobStoreContainer implements 
AutoCloseable {
+
+        private final HdfsBlobStore blobStore;
+
+        public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) {
+            this.blobStore = blobStore;
+        }
+
+        @Override
+        public void close() throws Exception {
+            this.blobStore.shutdown();
+        }
+
     }
-    try {
-      if (dfscluster == null) {
-        dfscluster = new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
-        dfscluster.waitActive();
-      }
-    } catch (IOException e) {
-      LOG.error("error creating MiniDFSCluster");
+
+    @Test
+    public void testHdfsReplication()
+        throws Exception {
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstoreReplication")) {
+            testReplication("/storm/blobstoreReplication/test", 
container.blobStore);
+        }
     }
-    Map<String, Object> conf = new HashMap();
-    conf.put(Config.BLOBSTORE_DIR, dirName);
-    
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal");
-    conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
-    HdfsBlobStore store = new HdfsBlobStore();
-    store.prepareInternal(conf, null, dfscluster.getConfiguration(0));
-    return store;
-  }
-
-  @Test
-  public void testHdfsReplication()
-          throws Exception {
-    BlobStore store = initHdfs("/storm/blobstoreReplication");
-    testReplication("/storm/blobstoreReplication/test", store);
-  }
-
-  @Test
-  public void testBasicHdfs()
-          throws Exception {
-    testBasic(initHdfs("/storm/blobstore1"));
-  }
-
-  @Test
-  public void testMultipleHdfs()
-          throws Exception {
-    // use different blobstore dir so it doesn't conflict with other test
-    testMultiple(initHdfs("/storm/blobstore2"));
-  }
-
-  @Test
-  public void testHdfsWithAuth()
-          throws Exception {
-    // use different blobstore dir so it doesn't conflict with other tests
-    testWithAuthentication(initHdfs("/storm/blobstore3"));
-  }
-
-  // Test for replication.
-  public void testReplication(String path, BlobStore store)
-          throws Exception {
-    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    metadata.set_replication_factor(4);
-    AtomicOutputStream out = store.createBlob("test", metadata, null);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", null), 4);
-    store.deleteBlob("test", null);
-
-    //Test for replication with NIMBUS as user
-    Subject admin = getSubject("admin");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, admin);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 4);
-    store.updateBlobReplication("test", 5, admin);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 5);
-    store.deleteBlob("test", admin);
-
-    //Test for replication using SUPERVISOR access
-    Subject supervisor = getSubject("supervisor");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, supervisor);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 4);
-    store.updateBlobReplication("test", 5, supervisor);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 5);
-    store.deleteBlob("test", supervisor);
-
-    Subject adminsGroupsUser = getSubject("adminsGroupsUser");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, adminsGroupsUser);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", adminsGroupsUser), 4);
-    store.updateBlobReplication("test", 5, adminsGroupsUser);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", adminsGroupsUser), 5);
-    store.deleteBlob("test", adminsGroupsUser);
-
-    //Test for a user having read or write or admin access to read replication 
for a blob
-    String createSubject = "createSubject";
-    String writeSubject = "writeSubject";
-    String adminSubject = "adminSubject";
-    Subject who = getSubject(createSubject);
-    AccessControl writeAccess = new AccessControl(AccessControlType.USER, 
READ);
-    AccessControl adminAccess = new AccessControl(AccessControlType.USER, 
ADMIN);
-    writeAccess.set_name(writeSubject);
-    adminAccess.set_name(adminSubject);
-    List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
-    metadata = new SettableBlobMeta(acl);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, who);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    who = getSubject(writeSubject);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 4);
-
-    //Test for a user having WRITE or ADMIN privileges to change replication 
of a blob
-    who = getSubject(adminSubject);
-    store.updateBlobReplication("test", 5, who);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 5);
-    store.deleteBlob("test", getSubject(createSubject));
-  }
-
-  public Subject getSubject(String name) {
-    Subject subject = new Subject();
-    SingleUserPrincipal user = new SingleUserPrincipal(name);
-    subject.getPrincipals().add(user);
-    return subject;
-  }
-
-  // Check for Blobstore with authentication
-  public void testWithAuthentication(BlobStore store)
-          throws Exception {
-    //Test for Nimbus Admin
-    Subject admin = getSubject("admin");
-    assertStoreHasExactly(store);
-    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    AtomicOutputStream out = store.createBlob("test", metadata, admin);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", admin);
-
-    //Test for Nimbus Groups Admin
-    Subject adminsGroupsUser = getSubject("adminsGroupsUser");
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, adminsGroupsUser);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", adminsGroupsUser);
-
-    //Test for Supervisor Admin
-    Subject supervisor = getSubject("supervisor");
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, supervisor);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", supervisor);
-
-    //Test for Nimbus itself as a user
-    Subject nimbus = getNimbusSubject();
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, nimbus);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", nimbus);
-
-    // Test with a dummy test_subject for cases where subject !=null (security 
turned on)
-    Subject who = getSubject("test_subject");
-    assertStoreHasExactly(store);
-
-    // Tests for case when subject != null (security turned on) and
-    // acls for the blob are set to WORLD_EVERYTHING
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    out = store.createBlob("test", metadata, who);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test", 1);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", who);
-    assertStoreHasExactly(store);
-
-    // Tests for case when subject != null (security turned on) and
-    // acls are not set for the blob (DEFAULT)
-    LOG.info("Creating test again");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, who);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should 
not contain WORLD_EVERYTHING because
-    // the subject is neither null nor empty. The ACL should however contain 
USER_EVERYTHING as user needs to have
-    // complete access to the blob
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test", 2);
-
-    LOG.info("Updating test");
-    out = store.updateBlob("test", who);
-    out.write(3);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEqualsWithAuth(store, who, "test", 3);
-
-    LOG.info("Updating test again");
-    out = store.updateBlob("test", who);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-    assertStoreHasExactly(store, "test");
-    readAssertEqualsWithAuth(store, who, "test", 3);
-
-    //Test for subject with no principals and acls set to WORLD_EVERYTHING
-    who = new Subject();
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    LOG.info("Creating test");
-    out = store.createBlob("test-empty-subject-WE", metadata, who);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test-empty-subject-WE", "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
-    //Test for subject with no principals and acls set to DEFAULT
-    who = new Subject();
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    LOG.info("Creating other");
-    out = store.createBlob("test-empty-subject-DEF", metadata, who);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
-    if (store instanceof HdfsBlobStore) {
-      ((HdfsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
+
+    @Test
+    public void testBasicHdfs()
+        throws Exception {
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstore1")) {
+            testBasic(container.blobStore);
+        }
     }
-    try {
-      out.close();
-    } catch (IOException e) {
-      //This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    @Test
+    public void testMultipleHdfs()
+        throws Exception {
+        // use different blobstore dir so it doesn't conflict with other test
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstore2")) {
+            testMultiple(container.blobStore);
+        }
     }
-  }
-
-  public void testBasic(BlobStore store)
-          throws Exception {
-    assertStoreHasExactly(store);
-    LOG.info("Creating test");
-    // Tests for case when subject == null (security turned off) and
-    // acls for the blob are set to WORLD_EVERYTHING
-    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
-            .WORLD_EVERYTHING);
-    AtomicOutputStream out = store.createBlob("test", metadata, null);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEquals(store, "test", 1);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", null);
-    assertStoreHasExactly(store);
-
-    // The following tests are run for both hdfs and local store to test the
-    // update blob interface
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    LOG.info("Creating test again");
-    out = store.createBlob("test", metadata, null);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 2);
-    LOG.info("Updating test");
-    out = store.updateBlob("test", null);
-    out.write(3);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-
-    LOG.info("Updating test again");
-    out = store.updateBlob("test", null);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-
-    if (store instanceof HdfsBlobStore) {
-      ((HdfsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
+
+    @Test
+    public void testHdfsWithAuth()
+        throws Exception {
+        // use different blobstore dir so it doesn't conflict with other tests
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstore3")) {
+            testWithAuthentication(container.blobStore);
+        }
     }
-    try {
-      out.close();
-    } catch (IOException e) {
-      //This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    // Test for replication.
+    public void testReplication(String path, BlobStore store)
+        throws Exception {
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", null), 4);
+        store.deleteBlob("test", null);
+
+        //Test for replication with NIMBUS as user
+        Subject admin = getSubject("admin");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
admin)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 4);
+        store.updateBlobReplication("test", 5, admin);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 5);
+        store.deleteBlob("test", admin);
+
+        //Test for replication using SUPERVISOR access
+        Subject supervisor = getSubject("supervisor");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
supervisor)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 4);
+        store.updateBlobReplication("test", 5, supervisor);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 5);
+        store.deleteBlob("test", supervisor);
+
+        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
adminsGroupsUser)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", adminsGroupsUser), 4);
+        store.updateBlobReplication("test", 5, adminsGroupsUser);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", adminsGroupsUser), 5);
+        store.deleteBlob("test", adminsGroupsUser);
+
+        //Test for a user having read or write or admin access to read 
replication for a blob
+        String createSubject = "createSubject";
+        String writeSubject = "writeSubject";
+        String adminSubject = "adminSubject";
+        Subject who = getSubject(createSubject);
+        AccessControl writeAccess = new AccessControl(AccessControlType.USER, 
READ);
+        AccessControl adminAccess = new AccessControl(AccessControlType.USER, 
ADMIN);
+        writeAccess.set_name(writeSubject);
+        adminAccess.set_name(adminSubject);
+        List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
+        metadata = new SettableBlobMeta(acl);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        who = getSubject(writeSubject);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 4);
+
+        //Test for a user having WRITE or ADMIN privileges to change 
replication of a blob
+        who = getSubject(adminSubject);
+        store.updateBlobReplication("test", 5, who);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 5);
+        store.deleteBlob("test", getSubject(createSubject));
     }
-  }
-
-
-  public void testMultiple(BlobStore store)
-          throws Exception {
-    assertStoreHasExactly(store);
-    LOG.info("Creating test");
-    AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
-            .WORLD_EVERYTHING), null);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 1);
-
-    LOG.info("Creating other");
-    out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-            null);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 1);
-    readAssertEquals(store, "other", 2);
-
-    LOG.info("Updating other");
-    out = store.updateBlob("other", null);
-    out.write(5);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 1);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", null);
-    assertStoreHasExactly(store, "other");
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Creating test again");
-    out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-            null);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 2);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Updating test");
-    out = store.updateBlob("test", null);
-    out.write(3);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 3);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Deleting other");
-    store.deleteBlob("other", null);
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-
-    LOG.info("Updating test again");
-    out = store.updateBlob("test", null);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-
-    if (store instanceof HdfsBlobStore) {
-      ((HdfsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
-    }    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-    try {
-      out.close();
-    } catch (IOException e) {
-      //This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    public Subject getSubject(String name) {
+        Subject subject = new Subject();
+        SingleUserPrincipal user = new SingleUserPrincipal(name);
+        subject.getPrincipals().add(user);
+        return subject;
+    }
+
+    // Check for Blobstore with authentication
+    public void testWithAuthentication(BlobStore store)
+        throws Exception {
+        //Test for Nimbus Admin
+        Subject admin = getSubject("admin");
+        assertStoreHasExactly(store);
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
admin)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", admin);
+
+        //Test for Nimbus Groups Admin
+        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
adminsGroupsUser)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", adminsGroupsUser);
+
+        //Test for Supervisor Admin
+        Subject supervisor = getSubject("supervisor");
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
supervisor)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", supervisor);
+
+        //Test for Nimbus itself as a user
+        Subject nimbus = getNimbusSubject();
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
nimbus)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", nimbus);
+
+        // Test with a dummy test_subject for cases where subject !=null 
(security turned on)
+        Subject who = getSubject("test_subject");
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", who);
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls are not set for the blob (DEFAULT)
+        LOG.info("Creating test again");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING. Here the acl 
should not contain WORLD_EVERYTHING because
+        // the subject is neither null nor empty. The ACL should however 
contain USER_EVERYTHING as user needs to have
+        // complete access to the blob
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 2);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        //Test for subject with no principals and acls set to WORLD_EVERYTHING
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = 
store.createBlob("test-empty-subject-WE", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+        //Test for subject with no principals and acls set to DEFAULT
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = 
store.createBlob("test-empty-subject-DEF", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+    }
+
+    public void testBasic(BlobStore store)
+        throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        // Tests for case when subject == null (security turned off) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store);
+
+        // The following tests are run for both hdfs and local store to test 
the
+        // update blob interface
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 2);
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+    }
+
+    public void testMultiple(BlobStore store)
+        throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 2);
+
+        LOG.info("Updating other");
+        try (AtomicOutputStream out = store.updateBlob("other", null)) {
+            out.write(5);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store, "other");
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 2);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 3);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting other");
+        store.deleteBlob("other", null);
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
index e34ec4a..3628c79 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -23,9 +24,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,194 +37,190 @@ import java.util.Map;
 
 import static org.junit.Assert.*;
 
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.ClassRule;
+
 public class HdfsBlobStoreImplTest {
+
+    @ClassRule
+    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new 
MiniDFSClusterRule();
+
     private static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
 
-    protected static Configuration hadoopConf;
-    protected static MiniDFSCluster dfscluster;
     // key dir needs to be number 0 to number of buckets, choose one so we 
know where to look
     private static String KEYDIR = "0";
     private Path blobDir = new Path("/storm/blobstore1");
     private Path fullKeyDir = new Path(blobDir, KEYDIR);
     private String BLOBSTORE_DATA = "data";
 
-    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl {
+    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements 
AutoCloseable {
 
         public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) 
throws IOException {
             super(path, conf);
         }
 
         public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
-                                     Configuration hconf) throws IOException {
+            Configuration hconf) throws IOException {
             super(path, conf, hconf);
         }
 
         protected Path getKeyDir(String key) {
             return new Path(new Path(blobDir, KEYDIR), key);
         }
-    }
-
-    @BeforeClass
-    public static void init() {
-        System.setProperty("test.build.data", "target/test/data");
-        if (hadoopConf == null) {
-            hadoopConf = new Configuration();
-        }
-        try {
-            if (dfscluster == null) {
-                dfscluster = new MiniDFSCluster.Builder(hadoopConf).build();
-                dfscluster.waitActive();
-            }
-        } catch (IOException e) {
-            LOG.error("error creating MiniDFSCluster");
-        }
-    }
 
-    @AfterClass
-    public static void cleanup() throws IOException {
-        if (dfscluster != null) {
-            dfscluster.shutdown();
+        @Override
+        public void close() throws Exception {
+            this.shutdown();
         }
     }
 
     // Be careful about adding additional tests as the dfscluster will be 
shared
-
     @Test
     public void testMultiple() throws Exception {
         String testString = "testingblob";
         String validKey = "validkeyBasic";
 
-        FileSystem fs = dfscluster.getFileSystem();
-        Map<String, Object> conf = new HashMap();
+        //Will be closed automatically when shutting down the DFS cluster
+        FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
+        Map<String, Object> conf = new HashMap<>();
 
-        TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, 
hadoopConf);
-        // should have created blobDir
-        assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
-        assertEquals("BlobStore dir was created with wrong permissions",
+        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, 
conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+            // should have created blobDir
+            assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, 
fs.getFileStatus(blobDir).getPermission());
 
-        // test exist with non-existent key
-        assertFalse("file exists but shouldn't", hbs.exists("bogus"));
-
-        // test write
-        BlobStoreFile pfile = hbs.write(validKey, false);
-        // Adding metadata to avoid null pointer exception
-        SettableBlobMeta meta = new SettableBlobMeta();
-        meta.set_replication_factor(1);
-        pfile.setMetadata(meta);
-        OutputStream ios = pfile.getOutputStream();
-        ios.write(testString.getBytes(Charset.forName("UTF-8")));
-        ios.close();
-
-        // test commit creates properly
-        assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
-        pfile.commit();
-        Path dataFile = new Path(new Path(fullKeyDir, validKey), 
BLOBSTORE_DATA);
-        assertTrue("blob data not committed", fs.exists(dataFile));
-        assertEquals("BlobStore dir was created with wrong permissions",
+            // test exist with non-existent key
+            assertFalse("file exists but shouldn't", hbs.exists("bogus"));
+
+            // test write
+            BlobStoreFile pfile = hbs.write(validKey, false);
+            // Adding metadata to avoid null pointer exception
+            SettableBlobMeta meta = new SettableBlobMeta();
+            meta.set_replication_factor(1);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit creates properly
+            assertTrue("BlobStore key dir wasn't created", 
fs.exists(fullKeyDir));
+            pfile.commit();
+            Path dataFile = new Path(new Path(fullKeyDir, validKey), 
BLOBSTORE_DATA);
+            assertTrue("blob data not committed", fs.exists(dataFile));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile).getPermission());
-        assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
-        // test read
-        BlobStoreFile readpFile = hbs.read(validKey);
-        String readString = IOUtils.toString(readpFile.getInputStream(), 
"UTF-8");
-        assertEquals("string read from blob doesn't match", testString, 
readString);
-
-        // test listkeys
-        Iterator<String> keys = hbs.listKeys();
-        assertTrue("blob has one key", keys.hasNext());
-        assertEquals("one key in blobstore", validKey, keys.next());
-
-        // delete
-        hbs.deleteKey(validKey);
-        assertFalse("key not deleted", fs.exists(dataFile));
-        assertFalse("key not deleted", hbs.exists(validKey));
-
-        // Now do multiple
-        String testString2 = "testingblob2";
-        String validKey2= "validkey2";
-
-        // test write
-        pfile = hbs.write(validKey, false);
-        pfile.setMetadata(meta);
-        ios = pfile.getOutputStream();
-        ios.write(testString.getBytes(Charset.forName("UTF-8")));
-        ios.close();
-
-        // test commit creates properly
-        assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
-        pfile.commit();
-        assertTrue("blob data not committed", fs.exists(dataFile));
-        assertEquals("BlobStore dir was created with wrong permissions",
+            assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+            // test read
+            BlobStoreFile readpFile = hbs.read(validKey);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, 
StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", 
testString, readString);
+            }
+
+            // test listkeys
+            Iterator<String> keys = hbs.listKeys();
+            assertTrue("blob has one key", keys.hasNext());
+            assertEquals("one key in blobstore", validKey, keys.next());
+
+            // delete
+            hbs.deleteKey(validKey);
+            assertFalse("key not deleted", fs.exists(dataFile));
+            assertFalse("key not deleted", hbs.exists(validKey));
+
+            // Now do multiple
+            String testString2 = "testingblob2";
+            String validKey2 = "validkey2";
+
+            // test write
+            pfile = hbs.write(validKey, false);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit creates properly
+            assertTrue("BlobStore key dir wasn't created", 
fs.exists(fullKeyDir));
+            pfile.commit();
+            assertTrue("blob data not committed", fs.exists(dataFile));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile).getPermission());
-        assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
-        // test write again
-        pfile = hbs.write(validKey2, false);
-        pfile.setMetadata(meta);
-        OutputStream ios2 = pfile.getOutputStream();
-        ios2.write(testString2.getBytes(Charset.forName("UTF-8")));
-        ios2.close();
-
-        // test commit second creates properly
-        pfile.commit();
-        Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), 
BLOBSTORE_DATA);
-        assertTrue("blob data not committed", fs.exists(dataFile2));
-        assertEquals("BlobStore dir was created with wrong permissions",
+            assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+            // test write again
+            pfile = hbs.write(validKey2, false);
+            pfile.setMetadata(meta);
+            try (OutputStream ios2 = pfile.getOutputStream()) {
+                ios2.write(testString2.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit second creates properly
+            pfile.commit();
+            Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), 
BLOBSTORE_DATA);
+            assertTrue("blob data not committed", fs.exists(dataFile2));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile2).getPermission());
-        assertTrue("key doesn't exist but should", hbs.exists(validKey2));
-
-        // test listkeys
-        keys = hbs.listKeys();
-        int total = 0;
-        boolean key1Found = false;
-        boolean key2Found = false;
-        while(keys.hasNext()) {
-            total++;
-            String key = keys.next();
-            if (key.equals(validKey)) {
-                key1Found = true;
-            } else if (key.equals(validKey2)) {
-                key2Found = true;
-            } else {
-                fail("Found key that wasn't expected: " + key);
+            assertTrue("key doesn't exist but should", hbs.exists(validKey2));
+
+            // test listkeys
+            keys = hbs.listKeys();
+            int total = 0;
+            boolean key1Found = false;
+            boolean key2Found = false;
+            while (keys.hasNext()) {
+                total++;
+                String key = keys.next();
+                if (key.equals(validKey)) {
+                    key1Found = true;
+                } else if (key.equals(validKey2)) {
+                    key2Found = true;
+                } else {
+                    fail("Found key that wasn't expected: " + key);
+                }
             }
+            assertEquals("number of keys is wrong", 2, total);
+            assertTrue("blobstore missing key1", key1Found);
+            assertTrue("blobstore missing key2", key2Found);
+
+            // test read
+            readpFile = hbs.read(validKey);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, 
StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", 
testString, readString);
+            }
+
+            // test read
+            readpFile = hbs.read(validKey2);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, 
StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", 
testString2, readString);
+            }
+
+            hbs.deleteKey(validKey);
+            assertFalse("key not deleted", hbs.exists(validKey));
+            hbs.deleteKey(validKey2);
+            assertFalse("key not deleted", hbs.exists(validKey2));
         }
-        assertEquals("number of keys is wrong", 2, total);
-        assertTrue("blobstore missing key1", key1Found);
-        assertTrue("blobstore missing key2", key2Found);
-
-        // test read
-        readpFile = hbs.read(validKey);
-        readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8");
-        assertEquals("string read from blob doesn't match", testString, 
readString);
-
-        // test read
-        readpFile = hbs.read(validKey2);
-        readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8");
-        assertEquals("string read from blob doesn't match", testString2, 
readString);
-
-        hbs.deleteKey(validKey);
-        assertFalse("key not deleted", hbs.exists(validKey));
-        hbs.deleteKey(validKey2);
-        assertFalse("key not deleted", hbs.exists(validKey2));
     }
 
     @Test
-    public void testGetFileLength() throws IOException {
-        FileSystem fs = dfscluster.getFileSystem();
-        Map<String, Object> conf = new HashMap();
+    public void testGetFileLength() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
         String validKey = "validkeyBasic";
         String testString = "testingblob";
-        TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, 
hadoopConf);
-        BlobStoreFile pfile = hbs.write(validKey, false);
-        // Adding metadata to avoid null pointer exception
-        SettableBlobMeta meta = new SettableBlobMeta();
-        meta.set_replication_factor(1);
-        pfile.setMetadata(meta);
-        OutputStream ios = pfile.getOutputStream();
-        ios.write(testString.getBytes(Charset.forName("UTF-8")));
-        ios.close();
-        assertEquals(testString.getBytes(Charset.forName("UTF-8")).length, 
pfile.getFileLength());
+        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, 
conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+            BlobStoreFile pfile = hbs.write(validKey, false);
+            // Adding metadata to avoid null pointer exception
+            SettableBlobMeta meta = new SettableBlobMeta();
+            meta.set_replication_factor(1);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+            assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, 
pfile.getFileLength());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
index cd828da..0f6bd28 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -40,49 +41,67 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.junit.Before;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.Assert;
 
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class AvroGenericRecordBoltTest {
 
-    private String hdfsURI;
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> {
+        Configuration conf = new Configuration();
+        conf.set("fs.trash.interval", "10");
+        conf.setBoolean("dfs.permissions", true);
+        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+        FileUtil.fullyDelete(baseDir);
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+        return conf;
+    });
+    @Mock
+    private OutputCollector collector;
+    @Mock
+    private TopologyContext topologyContext;
+
     private DistributedFileSystem fs;
-    private MiniDFSCluster hdfsCluster;
+    private String hdfsURI;
     private static final String testRoot = "/unittest";
-    private static final Schema schema1;
-    private static final Schema schema2;
-    private static final Tuple tuple1;
-    private static final Tuple tuple2;
-    private static final String schemaV1 = "{\"type\":\"record\"," +
-            "\"name\":\"myrecord\"," +
-            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
-            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
-
-    private static final String schemaV2 = "{\"type\":\"record\"," +
-            "\"name\":\"myrecord\"," +
-            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
-            "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," +
-            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
-
-    static {
-
+    private static Schema schema1;
+    private static Schema schema2;
+    private static Tuple tuple1;
+    private static Tuple tuple2;
+    private static final String schemaV1 = "{\"type\":\"record\","
+        + "\"name\":\"myrecord\","
+        + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},"
+        + "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+
+    private static final String schemaV2 = "{\"type\":\"record\","
+        + "\"name\":\"myrecord\","
+        + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},"
+        + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" },"
+        + "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+
+    @BeforeClass
+    public static void setupClass() {
         Schema.Parser parser = new Schema.Parser();
         schema1 = parser.parse(schemaV1);
 
@@ -100,33 +119,19 @@ public class AvroGenericRecordBoltTest {
         tuple2 = generateTestTuple(builder2.build());
     }
 
-    @Mock private OutputCollector collector;
-    @Mock private TopologyContext topologyContext;
-
     @Before
     public void setup() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        Configuration conf = new Configuration();
-        conf.set("fs.trash.interval", "10");
-        conf.setBoolean("dfs.permissions", true);
-        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
-        FileUtil.fullyDelete(baseDir);
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-
-        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-        hdfsCluster = builder.build();
-        fs = hdfsCluster.getFileSystem();
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
         hdfsURI = fs.getUri() + "/";
     }
 
     @After
     public void shutDown() throws IOException {
         fs.close();
-        hdfsCluster.shutdown();
     }
 
-    @Test public void multipleTuplesOneFile() throws IOException
-    {
+    @Test
+    public void multipleTuplesOneFile() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -139,8 +144,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void multipleTuplesMutliplesFiles() throws IOException
-    {
+    @Test
+    public void multipleTuplesMutliplesFiles() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, 
schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -153,8 +158,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void forwardSchemaChangeWorks() throws IOException
-    {
+    @Test
+    public void forwardSchemaChangeWorks() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -167,8 +172,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void backwardSchemaChangeWorks() throws IOException
-    {
+    @Test
+    public void backwardSchemaChangeWorks() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -180,8 +185,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void schemaThrashing() throws IOException
-    {
+    @Test
+    public void schemaThrashing() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -206,19 +211,19 @@ public class AvroGenericRecordBoltTest {
         FileNameFormat fieldsFileNameFormat = new 
DefaultFileNameFormat().withPath(testRoot);
 
         FileRotationPolicy rotationPolicy =
-                new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
+            new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
 
         return new AvroGenericRecordBolt()
-                .withFsUrl(nameNodeAddr)
-                .withFileNameFormat(fieldsFileNameFormat)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(fieldsSyncPolicy);
+            .withFsUrl(nameNodeAddr)
+            .withFileNameFormat(fieldsFileNameFormat)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(fieldsSyncPolicy);
     }
 
     private static Tuple generateTestTuple(GenericRecord record) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+            new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("record");
@@ -250,24 +255,23 @@ public class AvroGenericRecordBoltTest {
         return nonZero;
     }
 
-    private void fileIsGoodAvro (Path path) throws IOException {
+    private void fileIsGoodAvro(Path path) throws IOException {
         DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-        FSDataInputStream in = fs.open(path, 0);
-        FileOutputStream out = new FileOutputStream("target/FOO.avro");
-
-        byte[] buffer = new byte[100];
-        int bytesRead;
-        while ((bytesRead = in.read(buffer)) > 0) {
-            out.write(buffer, 0, bytesRead);
+        try (FSDataInputStream in = fs.open(path, 0); FileOutputStream out = 
new FileOutputStream("target/FOO.avro")) {
+            byte[] buffer = new byte[100];
+            int bytesRead;
+            while ((bytesRead = in.read(buffer)) > 0) {
+                out.write(buffer, 0, bytesRead);
+            }
         }
-        out.close();
 
         java.io.File file = new File("target/FOO.avro");
 
-        DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<>(file, datumReader);
-        GenericRecord user = null;
-        while (dataFileReader.hasNext()) {
-            user = dataFileReader.next(user);
+        try (DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<>(file, datumReader)) {
+            GenericRecord user = null;
+            while (dataFileReader.hasNext()) {
+                user = dataFileReader.next(user);
+            }
         }
 
         file.delete();

http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index e8f0702..de7316f 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -44,7 +44,7 @@ import org.junit.Assert;
 
 import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+
 import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -59,12 +59,27 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestHdfsBolt {
 
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> {
+        Configuration conf = new Configuration();
+        conf.set("fs.trash.interval", "10");
+        conf.setBoolean("dfs.permissions", true);
+        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+        FileUtil.fullyDelete(baseDir);
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+        return conf;
+    });
+    
     private String hdfsURI;
     private DistributedFileSystem fs;
-    private MiniDFSCluster hdfsCluster;
     private static final String testRoot = "/unittest";
     Tuple tuple1 = generateTestTuple(1, "First Tuple", "SFO", "CA");
     Tuple tuple2 = generateTestTuple(1, "Second Tuple", "SJO", "CA");
@@ -75,24 +90,13 @@ public class TestHdfsBolt {
 
     @Before
     public void setup() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        Configuration conf = new Configuration();
-        conf.set("fs.trash.interval", "10");
-        conf.setBoolean("dfs.permissions", true);
-        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
-        FileUtil.fullyDelete(baseDir);
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-
-        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-        hdfsCluster = builder.build();
-        fs = hdfsCluster.getFileSystem();
-        hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
+        hdfsURI = "hdfs://localhost:" + 
dfsClusterRule.getDfscluster().getNameNodePort() + "/";
     }
 
     @After
     public void shutDown() throws IOException {
         fs.close();
-        hdfsCluster.shutdown();
     }
 
     @Test
@@ -241,7 +245,7 @@ public class TestHdfsBolt {
     private Tuple generateTestTuple(Object id, Object msg,Object city,Object 
state) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+                new Config(), new HashMap<>(), new HashMap<>(), new 
HashMap<>(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("id", "msg","city","state");
@@ -250,23 +254,17 @@ public class TestHdfsBolt {
         return new TupleImpl(topologyContext, new Values(id, msg,city,state), 
1, "");
     }
 
-    private void printFiles(String path) throws IOException {
-        Path p = new Path(path);
-        FileStatus[] fileStatuses = fs.listStatus(p);
-        for (FileStatus file : fileStatuses) {
-            System.out.println("@@@ " + file.getPath() + " [" + file.getLen() 
+ "]");
-        }
-    }
-
     // Generally used to compare how files were actually written and compare 
to expectations based on total
     // amount of data written and rotation policies
     private int countNonZeroLengthFiles(String path) throws IOException {
         Path p = new Path(path);
         int nonZero = 0;
 
-        for (FileStatus file : fs.listStatus(p))
-            if (file.getLen() > 0)
+        for (FileStatus file : fs.listStatus(p)) {
+            if (file.getLen() > 0) {
                 nonZero++;
+            }
+        }
 
         return nonZero;
     }
@@ -275,9 +273,11 @@ public class TestHdfsBolt {
         Path p = new Path(path);
         int zeroLength = 0;
 
-        for (FileStatus file : fs.listStatus(p))
-            if (file.getLen() == 0)
+        for (FileStatus file : fs.listStatus(p)) {
+            if (file.getLen() == 0) {
                 zeroLength++;
+            }
+        }
 
         return zeroLength;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
index 9913d9d..f297ba9 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -17,8 +17,10 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
 import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -39,13 +41,10 @@ import org.junit.*;
 
 import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -56,14 +55,28 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestSequenceFileBolt {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestSequenceFileBolt.class);
+    
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> {
+       Configuration conf = new Configuration();
+        conf.set("fs.trash.interval", "10");
+        conf.setBoolean("dfs.permissions", true);
+        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+        FileUtil.fullyDelete(baseDir);
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+        return conf;
+    });
 
     private String hdfsURI;
     private DistributedFileSystem fs;
-    private MiniDFSCluster hdfsCluster;
     private static final String testRoot = "/unittest";
     Tuple tuple1 = generateTestTuple(1l, "first tuple");
     Tuple tuple2 = generateTestTuple(2l, "second tuple");
@@ -74,24 +87,13 @@ public class TestSequenceFileBolt {
 
     @Before
     public void setup() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        Configuration conf = new Configuration();
-        conf.set("fs.trash.interval", "10");
-        conf.setBoolean("dfs.permissions", true);
-        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
-        FileUtil.fullyDelete(baseDir);
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-
-        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-        hdfsCluster = builder.build();
-        fs = hdfsCluster.getFileSystem();
-        hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
+        hdfsURI = "hdfs://localhost:" + 
dfsClusterRule.getDfscluster().getNameNodePort() + "/";
     }
 
     @After
     public void shutDown() throws IOException {
         fs.close();
-        hdfsCluster.shutdown();
     }
 
     @Test
@@ -159,7 +161,7 @@ public class TestSequenceFileBolt {
     private Tuple generateTestTuple(Long key, String value) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+                new Config(), new HashMap<>(), new HashMap<>(), new 
HashMap<>(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("key", "value");

http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index b96f1ff..4016c55 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -9,180 +10,170 @@
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-
 package org.apache.storm.hdfs.spout;
 
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.Rule;
 
 public class TestDirLock {
 
-  static MiniDFSCluster.Builder builder;
-  static MiniDFSCluster hdfsCluster;
-  static FileSystem fs;
-  static String hdfsURI;
-  static HdfsConfiguration conf = new  HdfsConfiguration();
-  static final int LOCK_EXPIRY_SEC = 1;
-  private Path locksDir = new Path("/tmp/lockdir");
-
-  @BeforeClass
-  public static void setupClass() throws IOException {
-    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
-    builder = new MiniDFSCluster.Builder(new Configuration());
-    hdfsCluster = builder.build();
-    fs  = hdfsCluster.getFileSystem();
-    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
-  }
-
-  @AfterClass
-  public static void teardownClass() throws IOException {
-    fs.close();
-    hdfsCluster.shutdown();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    assert fs.mkdirs(locksDir) ;
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    fs.delete(locksDir, true);
-  }
-
-
-  @Test
-  public void testBasicLocking() throws Exception {
-    // 1 grab lock
-    DirLock lock = DirLock.tryLock(fs, locksDir);
-    Assert.assertTrue(fs.exists(lock.getLockFile()));
-
-    // 2 try to grab another lock while dir is locked
-    DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail
-    Assert.assertNull(lock2);
-
-    // 3 let go first lock
-    lock.release();
-    Assert.assertFalse(fs.exists(lock.getLockFile()));
-
-    // 4 try locking again
-    lock2  = DirLock.tryLock(fs, locksDir);
-    Assert.assertTrue(fs.exists(lock2.getLockFile()));
-    lock2.release();
-    Assert.assertFalse(fs.exists(lock.getLockFile()));
-    lock2.release();  // should be throw
-  }
-
-
-  @Test
-  public void testConcurrentLocking() throws Exception {
-    DirLockingThread[] thds = startThreads(100, locksDir);
-    for (DirLockingThread thd : thds) {
-      thd.join();
-      if( !thd.cleanExit ) {
-        System.err.println(thd.getName() + " did not exit cleanly");
-      }
-      Assert.assertTrue(thd.cleanExit);
-    }
+    @Rule
+    public MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
 
-    Path lockFile = new Path(locksDir + Path.SEPARATOR + 
DirLock.DIR_LOCK_FILE);
-    Assert.assertFalse(fs.exists(lockFile));
-  }
+    private static final int LOCK_EXPIRY_SEC = 1;
 
-  private DirLockingThread[] startThreads(int thdCount, Path dir)
-          throws IOException {
-    DirLockingThread[] result = new DirLockingThread[thdCount];
-    for (int i = 0; i < thdCount; i++) {
-      result[i] = new DirLockingThread(i, fs, dir);
+    private FileSystem fs;
+    private HdfsConfiguration conf = new HdfsConfiguration();
+    private final Path locksDir = new Path("/tmp/lockdir");
+
+    @Before
+    public void setUp() throws IOException {
+        conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, "5000");
+        fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
+        assert fs.mkdirs(locksDir);
+    }
+
+    @After
+    public void teardownClass() throws IOException {
+        fs.delete(locksDir, true);
+        fs.close();
     }
 
-    for (DirLockingThread thd : result) {
-      thd.start();
+    @Test
+    public void testBasicLocking() throws Exception {
+        // 1 grab lock
+        DirLock lock = DirLock.tryLock(fs, locksDir);
+        Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+        // 2 try to grab another lock while dir is locked
+        DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail
+        Assert.assertNull(lock2);
+
+        // 3 let go first lock
+        lock.release();
+        Assert.assertFalse(fs.exists(lock.getLockFile()));
+
+        // 4 try locking again
+        lock2 = DirLock.tryLock(fs, locksDir);
+        Assert.assertTrue(fs.exists(lock2.getLockFile()));
+        lock2.release();
+        Assert.assertFalse(fs.exists(lock.getLockFile()));
+        lock2.release();  // should be throw
     }
-    return result;
-  }
 
-  @Test
-  public void testLockRecovery() throws Exception {
-    DirLock lock1 = DirLock.tryLock(fs, locksDir);   // should pass
-    Assert.assertNotNull(lock1);
+    @Test
+    public void testConcurrentLocking() throws Exception {
+        DirLockingThread[] threads = null;
+        try {
+            threads = startThreads(100, locksDir);
+            for (DirLockingThread thd : threads) {
+                thd.join(30_000);
+                Assert.assertTrue(thd.getName() + " did not exit cleanly", 
thd.cleanExit);
+            }
 
-    DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, 
LOCK_EXPIRY_SEC); // should fail
-    Assert.assertNull(lock2);
+            Path lockFile = new Path(locksDir + Path.SEPARATOR + 
DirLock.DIR_LOCK_FILE);
+            Assert.assertFalse(fs.exists(lockFile));
+        } finally {
+            if (threads != null) {
+                for (DirLockingThread thread : threads) {
+                    thread.interrupt();
+                    thread.join(30_000);
+                    if(thread.isAlive()) {
+                        throw new RuntimeException("Failed to stop threads 
within 30 seconds, threads may leak into other tests");
+                    }
+                }
+            }
+        }
+    }
 
-    Thread.sleep(LOCK_EXPIRY_SEC*1000 + 500); // wait for lock to expire
-    Assert.assertTrue(fs.exists(lock1.getLockFile()));
+    private DirLockingThread[] startThreads(int thdCount, Path dir)
+        throws IOException {
+        DirLockingThread[] result = new DirLockingThread[thdCount];
+        for (int i = 0; i < thdCount; i++) {
+            result[i] = new DirLockingThread(i, fs, dir);
+        }
+
+        for (DirLockingThread thd : result) {
+            thd.start();
+        }
+        return result;
+    }
 
-    DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, 
LOCK_EXPIRY_SEC); // should pass now
-    Assert.assertNotNull(lock3);
-    Assert.assertTrue(fs.exists(lock3.getLockFile()));
-    lock3.release();
-    Assert.assertFalse(fs.exists(lock3.getLockFile()));
-    lock1.release(); // should not throw
-  }
+    @Test
+    public void testLockRecovery() throws Exception {
+        DirLock lock1 = DirLock.tryLock(fs, locksDir);   // should pass
+        Assert.assertNotNull(lock1);
 
-  class DirLockingThread extends Thread {
+        DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, 
LOCK_EXPIRY_SEC); // should fail
+        Assert.assertNull(lock2);
 
-    private int thdNum;
-    private final FileSystem fs;
-    private final Path dir;
-    public boolean cleanExit = false;
+        Thread.sleep(LOCK_EXPIRY_SEC * 1000 + 500); // wait for lock to expire
+        Assert.assertTrue(fs.exists(lock1.getLockFile()));
 
-    public DirLockingThread(int thdNum,FileSystem fs, Path dir)
-            throws IOException {
-      this.thdNum = thdNum;
-      this.fs = fs;
-      this.dir = dir;
+        DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, 
LOCK_EXPIRY_SEC); // should pass now
+        Assert.assertNotNull(lock3);
+        Assert.assertTrue(fs.exists(lock3.getLockFile()));
+        lock3.release();
+        Assert.assertFalse(fs.exists(lock3.getLockFile()));
+        lock1.release(); // should not throw
     }
 
-    @Override
-    public void run() {
-      Thread.currentThread().setName("DirLockingThread-" + thdNum);
-      DirLock lock = null;
-      try {
-        do {
-          System.err.println("Trying lock " + getName());
-          lock = DirLock.tryLock(fs, dir);
-          System.err.println("Acquired lock " + getName());
-          if(lock==null) {
-            System.out.println("Retrying lock - " + getName());
-          }
-        } while (lock==null);
-        cleanExit= true;
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-      finally {
-          try {
-            if(lock!=null) {
-              lock.release();
-              System.err.println("Released lock " + getName());
+    class DirLockingThread extends Thread {
+
+        private int thdNum;
+        private final FileSystem fs;
+        private final Path dir;
+        public boolean cleanExit = false;
+
+        public DirLockingThread(int thdNum, FileSystem fs, Path dir)
+            throws IOException {
+            this.thdNum = thdNum;
+            this.fs = fs;
+            this.dir = dir;
+        }
+
+        @Override
+        public void run() {
+            Thread.currentThread().setName("DirLockingThread-" + thdNum);
+            DirLock lock = null;
+            try {
+                do {
+                    System.err.println("Trying lock " + getName());
+                    lock = DirLock.tryLock(fs, dir);
+                    System.err.println("Acquired lock " + getName());
+                    if (lock == null) {
+                        System.out.println("Retrying lock - " + getName());
+                    }
+                } while (lock == null && 
!Thread.currentThread().isInterrupted());
+                cleanExit = true;
+            } catch (IOException e) {
+                e.printStackTrace();
+            } finally {
+                try {
+                    if (lock != null) {
+                        lock.release();
+                        System.err.println("Released lock " + getName());
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace(System.err);
+                }
             }
-          } catch (IOException e) {
-            e.printStackTrace(System.err);
-          }
-      }
-      System.err.println("Thread exiting " + getName());
-    } // run()
-
-  } // class DirLockingThread
+            System.err.println("Thread exiting " + getName());
+        } // run()
+
+    } // class DirLockingThread
 }

Reply via email to