http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index 6e03bcf..f9374fe 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -33,10 +34,7 @@ import org.apache.storm.security.auth.NimbusPrincipal;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.storm.utils.Utils;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -63,469 +61,454 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.*;
 
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.ClassRule;
+
 public class BlobStoreTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
-  protected static MiniDFSCluster dfscluster = null;
-  protected static Configuration hadoopConf = null;
-  URI base;
-  File baseFile;
-  private static Map conf = new HashMap();
-  public static final int READ = 0x01;
-  public static final int WRITE = 0x02;
-  public static final int ADMIN = 0x04;
-
-  @Before
-  public void init() {
-    System.setProperty("test.build.data", "target/test/data");
-    initializeConfigs();
-    baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID());
-    base = baseFile.toURI();
-  }
-
-  @After
-  public void cleanup()
-          throws IOException {
-    FileUtils.deleteDirectory(baseFile);
-  }
-
-  @AfterClass
-  public static void cleanupAfterClass() throws IOException {
-    if (dfscluster != null) {
-      dfscluster.shutdown();
+
+    @ClassRule
+    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new 
MiniDFSClusterRule();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreTest.class);
+    URI base;
+    File baseFile;
+    private static final Map CONF = new HashMap();
+    public static final int READ = 0x01;
+    public static final int WRITE = 0x02;
+    public static final int ADMIN = 0x04;
+
+    @Before
+    public void init() {
+        initializeConfigs();
+        baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
+        base = baseFile.toURI();
+    }
+
+    @After
+    public void cleanup()
+        throws IOException {
+        FileUtils.deleteDirectory(baseFile);
+    }
+
+    // Method which initializes nimbus admin
+    public static void initializeConfigs() {
+        CONF.put(Config.NIMBUS_ADMINS, "admin");
+        CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor");
+    }
+
+    //Gets Nimbus Subject with NimbusPrincipal set on it
+    public static Subject getNimbusSubject() {
+        Subject nimbus = new Subject();
+        nimbus.getPrincipals().add(new NimbusPrincipal());
+        return nimbus;
+    }
+
+    // Overloading the assertStoreHasExactly method accomodate Subject in 
order to check for authorization
+    public static void assertStoreHasExactly(BlobStore store, Subject who, 
String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        Set<String> expected = new HashSet<>(Arrays.asList(keys));
+        Set<String> found = new HashSet<>();
+        Iterator<String> c = store.listKeys();
+        while (c.hasNext()) {
+            String keyName = c.next();
+            found.add(keyName);
+        }
+        Set<String> extra = new HashSet<>(found);
+        extra.removeAll(expected);
+        assertTrue("Found extra keys in the blob store " + extra, 
extra.isEmpty());
+        Set<String> missing = new HashSet<>(expected);
+        missing.removeAll(found);
+        assertTrue("Found keys missing from the blob store " + missing, 
missing.isEmpty());
+    }
+
+    public static void assertStoreHasExactly(BlobStore store, String... keys)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertStoreHasExactly(store, null, keys);
+    }
+
+    // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
+    public static int readInt(BlobStore store, Subject who, String key) throws 
IOException, KeyNotFoundException, AuthorizationException {
+        try (InputStream in = store.getBlob(key, who)) {
+            return in.read();
+        }
+    }
+
+    public static int readInt(BlobStore store, String key)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        return readInt(store, null, key);
+    }
+
+    public static void readAssertEquals(BlobStore store, String key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, key));
     }
-  }
-
-  // Method which initializes nimbus admin
-  public static void initializeConfigs() {
-    conf.put(Config.NIMBUS_ADMINS,"admin");
-    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
-  }
-
-  //Gets Nimbus Subject with NimbusPrincipal set on it
-  public static Subject getNimbusSubject() {
-    Subject nimbus = new Subject();
-    nimbus.getPrincipals().add(new NimbusPrincipal());
-    return nimbus;
-  }
-
-  // Overloading the assertStoreHasExactly method accomodate Subject in order 
to check for authorization
-  public static void assertStoreHasExactly(BlobStore store, Subject who, 
String ... keys)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    Set<String> expected = new HashSet<String>(Arrays.asList(keys));
-    Set<String> found = new HashSet<String>();
-    Iterator<String> c = store.listKeys();
-    while (c.hasNext()) {
-      String keyName = c.next();
-      found.add(keyName);
+
+    // Checks for assertion when we turn on security
+    public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
+        throws IOException, KeyNotFoundException, AuthorizationException {
+        assertEquals(value, readInt(store, who, key));
     }
-    Set<String> extra = new HashSet<String>(found);
-    extra.removeAll(expected);
-    assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty());
-    Set<String> missing = new HashSet<String>(expected);
-    missing.removeAll(found);
-    assertTrue("Found keys missing from the blob store "+missing, 
missing.isEmpty());
-  }
-
-  public static void assertStoreHasExactly(BlobStore store, String ... keys)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertStoreHasExactly(store, null, keys);
-  }
-
-  // Overloading the readInt method accomodate Subject in order to check for 
authorization (security turned on)
-  public static int readInt(BlobStore store, Subject who, String key) throws 
IOException, KeyNotFoundException, AuthorizationException {
-    InputStream in = store.getBlob(key, who);
-    try {
-      return in.read();
-    } finally {
-      in.close();
+
+    private AutoCloseableBlobStoreContainer initHdfs(String dirName)
+        throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.BLOBSTORE_DIR, dirName);
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
+        HdfsBlobStore store = new HdfsBlobStore();
+        store.prepareInternal(conf, null, 
DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
+        return new AutoCloseableBlobStoreContainer(store);
     }
-  }
-
-  public static int readInt(BlobStore store, String key)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    return readInt(store, null, key);
-  }
-
-  public static void readAssertEquals(BlobStore store, String key, int value)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertEquals(value, readInt(store, key));
-  }
-
-  // Checks for assertion when we turn on security
-  public void readAssertEqualsWithAuth(BlobStore store, Subject who, String 
key, int value)
-          throws IOException, KeyNotFoundException, AuthorizationException {
-    assertEquals(value, readInt(store, who, key));
-  }
-
-  private HdfsBlobStore initHdfs(String dirName)
-          throws Exception {
-    if (hadoopConf == null) {
-      hadoopConf = new Configuration();
+
+    private static class AutoCloseableBlobStoreContainer implements 
AutoCloseable {
+
+        private final HdfsBlobStore blobStore;
+
+        public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) {
+            this.blobStore = blobStore;
+        }
+
+        @Override
+        public void close() throws Exception {
+            this.blobStore.shutdown();
+        }
+
     }
-    try {
-      if (dfscluster == null) {
-        dfscluster = new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
-        dfscluster.waitActive();
-      }
-    } catch (IOException e) {
-      LOG.error("error creating MiniDFSCluster");
+
+    @Test
+    public void testHdfsReplication()
+        throws Exception {
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstoreReplication")) {
+            testReplication("/storm/blobstoreReplication/test", 
container.blobStore);
+        }
     }
-    Map conf = new HashMap();
-    conf.put(Config.BLOBSTORE_DIR, dirName);
-    
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal");
-    conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
-    HdfsBlobStore store = new HdfsBlobStore();
-    store.prepareInternal(conf, null, dfscluster.getConfiguration(0));
-    return store;
-  }
-
-  @Test
-  public void testHdfsReplication()
-          throws Exception {
-    BlobStore store = initHdfs("/storm/blobstoreReplication");
-    testReplication("/storm/blobstoreReplication/test", store);
-  }
-
-  @Test
-  public void testBasicHdfs()
-          throws Exception {
-    testBasic(initHdfs("/storm/blobstore1"));
-  }
-
-  @Test
-  public void testMultipleHdfs()
-          throws Exception {
-    // use different blobstore dir so it doesn't conflict with other test
-    testMultiple(initHdfs("/storm/blobstore2"));
-  }
-
-  @Test
-  public void testHdfsWithAuth()
-          throws Exception {
-    // use different blobstore dir so it doesn't conflict with other tests
-    testWithAuthentication(initHdfs("/storm/blobstore3"));
-  }
-
-  // Test for replication.
-  public void testReplication(String path, BlobStore store)
-          throws Exception {
-    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    metadata.set_replication_factor(4);
-    AtomicOutputStream out = store.createBlob("test", metadata, null);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", null), 4);
-    store.deleteBlob("test", null);
-
-    //Test for replication with NIMBUS as user
-    Subject admin = getSubject("admin");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, admin);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 4);
-    store.updateBlobReplication("test", 5, admin);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 5);
-    store.deleteBlob("test", admin);
-
-    //Test for replication using SUPERVISOR access
-    Subject supervisor = getSubject("supervisor");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, supervisor);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 4);
-    store.updateBlobReplication("test", 5, supervisor);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 5);
-    store.deleteBlob("test", supervisor);
-
-    //Test for a user having read or write or admin access to read replication 
for a blob
-    String createSubject = "createSubject";
-    String writeSubject = "writeSubject";
-    String adminSubject = "adminSubject";
-    Subject who = getSubject(createSubject);
-    AccessControl writeAccess = new AccessControl(AccessControlType.USER, 
READ);
-    AccessControl adminAccess = new AccessControl(AccessControlType.USER, 
ADMIN);
-    writeAccess.set_name(writeSubject);
-    adminAccess.set_name(adminSubject);
-    List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
-    metadata = new SettableBlobMeta(acl);
-    metadata.set_replication_factor(4);
-    out = store.createBlob("test", metadata, who);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    who = getSubject(writeSubject);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 4);
-
-    //Test for a user having WRITE or ADMIN privileges to change replication 
of a blob
-    who = getSubject(adminSubject);
-    store.updateBlobReplication("test", 5, who);
-    assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 5);
-    store.deleteBlob("test", getSubject(createSubject));
-  }
-
-  public Subject getSubject(String name) {
-    Subject subject = new Subject();
-    SingleUserPrincipal user = new SingleUserPrincipal(name);
-    subject.getPrincipals().add(user);
-    return subject;
-  }
-
-  // Check for Blobstore with authentication
-  public void testWithAuthentication(BlobStore store)
-          throws Exception {
-    //Test for Nimbus Admin
-    Subject admin = getSubject("admin");
-    assertStoreHasExactly(store);
-    SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    AtomicOutputStream out = store.createBlob("test", metadata, admin);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", admin);
-
-    //Test for Supervisor Admin
-    Subject supervisor = getSubject("supervisor");
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, supervisor);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", supervisor);
-
-    //Test for Nimbus itself as a user
-    Subject nimbus = getNimbusSubject();
-    assertStoreHasExactly(store);
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, nimbus);
-    assertStoreHasExactly(store, "test");
-    out.write(1);
-    out.close();
-    store.deleteBlob("test", nimbus);
-
-    // Test with a dummy test_subject for cases where subject !=null (security 
turned on)
-    Subject who = getSubject("test_subject");
-    assertStoreHasExactly(store);
-
-    // Tests for case when subject != null (security turned on) and
-    // acls for the blob are set to WORLD_EVERYTHING
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    out = store.createBlob("test", metadata, who);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test", 1);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", who);
-    assertStoreHasExactly(store);
-
-    // Tests for case when subject != null (security turned on) and
-    // acls are not set for the blob (DEFAULT)
-    LOG.info("Creating test again");
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    out = store.createBlob("test", metadata, who);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should 
not contain WORLD_EVERYTHING because
-    // the subject is neither null nor empty. The ACL should however contain 
USER_EVERYTHING as user needs to have
-    // complete access to the blob
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test", 2);
-
-    LOG.info("Updating test");
-    out = store.updateBlob("test", who);
-    out.write(3);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEqualsWithAuth(store, who, "test", 3);
-
-    LOG.info("Updating test again");
-    out = store.updateBlob("test", who);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-    assertStoreHasExactly(store, "test");
-    readAssertEqualsWithAuth(store, who, "test", 3);
-
-    //Test for subject with no principals and acls set to WORLD_EVERYTHING
-    who = new Subject();
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    LOG.info("Creating test");
-    out = store.createBlob("test-empty-subject-WE", metadata, who);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test-empty-subject-WE", "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
-    //Test for subject with no principals and acls set to DEFAULT
-    who = new Subject();
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-    LOG.info("Creating other");
-    out = store.createBlob("test-empty-subject-DEF", metadata, who);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
-    if (store instanceof HdfsBlobStore) {
-      ((HdfsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
+
+    @Test
+    public void testBasicHdfs()
+        throws Exception {
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstore1")) {
+            testBasic(container.blobStore);
+        }
     }
-    try {
-      out.close();
-    } catch (IOException e) {
-      //This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    @Test
+    public void testMultipleHdfs()
+        throws Exception {
+        // use different blobstore dir so it doesn't conflict with other test
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstore2")) {
+            testMultiple(container.blobStore);
+        }
     }
-  }
-
-  public void testBasic(BlobStore store)
-          throws Exception {
-    assertStoreHasExactly(store);
-    LOG.info("Creating test");
-    // Tests for case when subject == null (security turned off) and
-    // acls for the blob are set to WORLD_EVERYTHING
-    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
-            .WORLD_EVERYTHING);
-    AtomicOutputStream out = store.createBlob("test", metadata, null);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    // Testing whether acls are set to WORLD_EVERYTHING
-    assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-    readAssertEquals(store, "test", 1);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", null);
-    assertStoreHasExactly(store);
-
-    // The following tests are run for both hdfs and local store to test the
-    // update blob interface
-    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-    LOG.info("Creating test again");
-    out = store.createBlob("test", metadata, null);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 2);
-    LOG.info("Updating test");
-    out = store.updateBlob("test", null);
-    out.write(3);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-
-    LOG.info("Updating test again");
-    out = store.updateBlob("test", null);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-
-    if (store instanceof HdfsBlobStore) {
-      ((HdfsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
+
+    @Test
+    public void testHdfsWithAuth()
+        throws Exception {
+        // use different blobstore dir so it doesn't conflict with other tests
+        try (AutoCloseableBlobStoreContainer container = 
initHdfs("/storm/blobstore3")) {
+            testWithAuthentication(container.blobStore);
+        }
     }
-    try {
-      out.close();
-    } catch (IOException e) {
-      //This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    // Test for replication.
+    public void testReplication(String path, BlobStore store)
+        throws Exception {
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", null), 4);
+        store.deleteBlob("test", null);
+
+        //Test for replication with NIMBUS as user
+        Subject admin = getSubject("admin");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
admin)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 4);
+        store.updateBlobReplication("test", 5, admin);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", admin), 5);
+        store.deleteBlob("test", admin);
+
+        //Test for replication using SUPERVISOR access
+        Subject supervisor = getSubject("supervisor");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
supervisor)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 4);
+        store.updateBlobReplication("test", 5, supervisor);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", supervisor), 5);
+        store.deleteBlob("test", supervisor);
+
+        //Test for a user having read or write or admin access to read 
replication for a blob
+        String createSubject = "createSubject";
+        String writeSubject = "writeSubject";
+        String adminSubject = "adminSubject";
+        Subject who = getSubject(createSubject);
+        AccessControl writeAccess = new AccessControl(AccessControlType.USER, 
READ);
+        AccessControl adminAccess = new AccessControl(AccessControlType.USER, 
ADMIN);
+        writeAccess.set_name(writeSubject);
+        adminAccess.set_name(adminSubject);
+        List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess);
+        metadata = new SettableBlobMeta(acl);
+        metadata.set_replication_factor(4);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        who = getSubject(writeSubject);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 4);
+
+        //Test for a user having WRITE or ADMIN privileges to change 
replication of a blob
+        who = getSubject(adminSubject);
+        store.updateBlobReplication("test", 5, who);
+        assertEquals("Blobstore replication not matching", 
store.getBlobReplication("test", who), 5);
+        store.deleteBlob("test", getSubject(createSubject));
     }
-  }
-
-
-  public void testMultiple(BlobStore store)
-          throws Exception {
-    assertStoreHasExactly(store);
-    LOG.info("Creating test");
-    AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler
-            .WORLD_EVERYTHING), null);
-    out.write(1);
-    out.close();
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 1);
-
-    LOG.info("Creating other");
-    out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-            null);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 1);
-    readAssertEquals(store, "other", 2);
-
-    LOG.info("Updating other");
-    out = store.updateBlob("other", null);
-    out.write(5);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 1);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Deleting test");
-    store.deleteBlob("test", null);
-    assertStoreHasExactly(store, "other");
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Creating test again");
-    out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
-            null);
-    out.write(2);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 2);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Updating test");
-    out = store.updateBlob("test", null);
-    out.write(3);
-    out.close();
-    assertStoreHasExactly(store, "test", "other");
-    readAssertEquals(store, "test", 3);
-    readAssertEquals(store, "other", 5);
-
-    LOG.info("Deleting other");
-    store.deleteBlob("other", null);
-    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-
-    LOG.info("Updating test again");
-    out = store.updateBlob("test", null);
-    out.write(4);
-    out.flush();
-    LOG.info("SLEEPING");
-    Thread.sleep(2);
-
-    if (store instanceof HdfsBlobStore) {
-      ((HdfsBlobStore) store).fullCleanup(1);
-    } else {
-      fail("Error the blobstore is of unknowntype");
-    }    assertStoreHasExactly(store, "test");
-    readAssertEquals(store, "test", 3);
-    try {
-      out.close();
-    } catch (IOException e) {
-      //This is likely to happen when we try to commit something that
-      // was cleaned up.  This is expected and acceptable.
+
+    public Subject getSubject(String name) {
+        Subject subject = new Subject();
+        SingleUserPrincipal user = new SingleUserPrincipal(name);
+        subject.getPrincipals().add(user);
+        return subject;
+    }
+
+    // Check for Blobstore with authentication
+    public void testWithAuthentication(BlobStore store)
+        throws Exception {
+        //Test for Nimbus Admin
+        Subject admin = getSubject("admin");
+        assertStoreHasExactly(store);
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
admin)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", admin);
+
+        //Test for Supervisor Admin
+        Subject supervisor = getSubject("supervisor");
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
supervisor)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", supervisor);
+
+        //Test for Nimbus itself as a user
+        Subject nimbus = getNimbusSubject();
+        assertStoreHasExactly(store);
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
nimbus)) {
+            assertStoreHasExactly(store, "test");
+            out.write(1);
+        }
+        store.deleteBlob("test", nimbus);
+
+        // Test with a dummy test_subject for cases where subject !=null 
(security turned on)
+        Subject who = getSubject("test_subject");
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", who);
+        assertStoreHasExactly(store);
+
+        // Tests for case when subject != null (security turned on) and
+        // acls are not set for the blob (DEFAULT)
+        LOG.info("Creating test again");
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) 
{
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING. Here the acl 
should not contain WORLD_EVERYTHING because
+        // the subject is neither null nor empty. The ACL should however 
contain USER_EVERYTHING as user needs to have
+        // complete access to the blob
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
!metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test", 2);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", who)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+        assertStoreHasExactly(store, "test");
+        readAssertEqualsWithAuth(store, who, "test", 3);
+
+        //Test for subject with no principals and acls set to WORLD_EVERYTHING
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = 
store.createBlob("test-empty-subject-WE", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+        //Test for subject with no principals and acls set to DEFAULT
+        who = new Subject();
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = 
store.createBlob("test-empty-subject-DEF", metadata, who)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test-empty-subject-DEF", "test", 
"test-empty-subject-WE");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+    }
+
+    public void testBasic(BlobStore store)
+        throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        // Tests for case when subject == null (security turned off) and
+        // acls for the blob are set to WORLD_EVERYTHING
+        SettableBlobMeta metadata = new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        // Testing whether acls are set to WORLD_EVERYTHING
+        assertTrue("ACL does not contain WORLD_EVERYTHING", 
metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store);
+
+        // The following tests are run for both hdfs and local store to test 
the
+        // update blob interface
+        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", metadata, 
null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 2);
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+    }
+
+    public void testMultiple(BlobStore store)
+        throws Exception {
+        assertStoreHasExactly(store);
+        LOG.info("Creating test");
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) {
+            out.write(1);
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 1);
+
+        LOG.info("Creating other");
+        try (AtomicOutputStream out = store.createBlob("other", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 2);
+
+        LOG.info("Updating other");
+        try (AtomicOutputStream out = store.updateBlob("other", null)) {
+            out.write(5);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 1);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting test");
+        store.deleteBlob("test", null);
+        assertStoreHasExactly(store, "other");
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Creating test again");
+        try (AtomicOutputStream out = store.createBlob("test", new 
SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+            null)) {
+            out.write(2);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 2);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Updating test");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(3);
+        }
+        assertStoreHasExactly(store, "test", "other");
+        readAssertEquals(store, "test", 3);
+        readAssertEquals(store, "other", 5);
+
+        LOG.info("Deleting other");
+        store.deleteBlob("other", null);
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
+
+        LOG.info("Updating test again");
+        try (AtomicOutputStream out = store.updateBlob("test", null)) {
+            out.write(4);
+        }
+        LOG.info("SLEEPING");
+        Thread.sleep(2);
+
+        if (store instanceof HdfsBlobStore) {
+            ((HdfsBlobStore) store).fullCleanup(1);
+        } else {
+            fail("Error the blobstore is of unknowntype");
+        }
+        assertStoreHasExactly(store, "test");
+        readAssertEquals(store, "test", 3);
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
index c49c44b..fd80631 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -23,9 +24,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,194 +37,190 @@ import java.util.Map;
 
 import static org.junit.Assert.*;
 
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.ClassRule;
+
 public class HdfsBlobStoreImplTest {
+
+    @ClassRule
+    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new 
MiniDFSClusterRule();
+
     private static final Logger LOG = 
LoggerFactory.getLogger(HdfsBlobStoreImplTest.class);
 
-    protected static Configuration hadoopConf;
-    protected static MiniDFSCluster dfscluster;
     // key dir needs to be number 0 to number of buckets, choose one so we 
know where to look
     private static String KEYDIR = "0";
     private Path blobDir = new Path("/storm/blobstore1");
     private Path fullKeyDir = new Path(blobDir, KEYDIR);
     private String BLOBSTORE_DATA = "data";
 
-    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl {
+    public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements 
AutoCloseable {
 
         public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) 
throws IOException {
             super(path, conf);
         }
 
         public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf,
-                                     Configuration hconf) throws IOException {
+            Configuration hconf) throws IOException {
             super(path, conf, hconf);
         }
 
         protected Path getKeyDir(String key) {
             return new Path(new Path(blobDir, KEYDIR), key);
         }
-    }
-
-    @BeforeClass
-    public static void init() {
-        System.setProperty("test.build.data", "target/test/data");
-        if (hadoopConf == null) {
-            hadoopConf = new Configuration();
-        }
-        try {
-            if (dfscluster == null) {
-                dfscluster = new MiniDFSCluster.Builder(hadoopConf).build();
-                dfscluster.waitActive();
-            }
-        } catch (IOException e) {
-            LOG.error("error creating MiniDFSCluster");
-        }
-    }
 
-    @AfterClass
-    public static void cleanup() throws IOException {
-        if (dfscluster != null) {
-            dfscluster.shutdown();
+        @Override
+        public void close() throws Exception {
+            this.shutdown();
         }
     }
 
     // Be careful about adding additional tests as the dfscluster will be 
shared
-
     @Test
     public void testMultiple() throws Exception {
         String testString = "testingblob";
         String validKey = "validkeyBasic";
 
-        FileSystem fs = dfscluster.getFileSystem();
+        //Will be closed automatically when shutting down the DFS cluster
+        FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem();
         Map conf = new HashMap();
 
-        TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, 
hadoopConf);
-        // should have created blobDir
-        assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
-        assertEquals("BlobStore dir was created with wrong permissions",
+        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, 
conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+            // should have created blobDir
+            assertTrue("BlobStore dir wasn't created", fs.exists(blobDir));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, 
fs.getFileStatus(blobDir).getPermission());
 
-        // test exist with non-existent key
-        assertFalse("file exists but shouldn't", hbs.exists("bogus"));
-
-        // test write
-        BlobStoreFile pfile = hbs.write(validKey, false);
-        // Adding metadata to avoid null pointer exception
-        SettableBlobMeta meta = new SettableBlobMeta();
-        meta.set_replication_factor(1);
-        pfile.setMetadata(meta);
-        OutputStream ios = pfile.getOutputStream();
-        ios.write(testString.getBytes(Charset.forName("UTF-8")));
-        ios.close();
-
-        // test commit creates properly
-        assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
-        pfile.commit();
-        Path dataFile = new Path(new Path(fullKeyDir, validKey), 
BLOBSTORE_DATA);
-        assertTrue("blob data not committed", fs.exists(dataFile));
-        assertEquals("BlobStore dir was created with wrong permissions",
+            // test exist with non-existent key
+            assertFalse("file exists but shouldn't", hbs.exists("bogus"));
+
+            // test write
+            BlobStoreFile pfile = hbs.write(validKey, false);
+            // Adding metadata to avoid null pointer exception
+            SettableBlobMeta meta = new SettableBlobMeta();
+            meta.set_replication_factor(1);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit creates properly
+            assertTrue("BlobStore key dir wasn't created", 
fs.exists(fullKeyDir));
+            pfile.commit();
+            Path dataFile = new Path(new Path(fullKeyDir, validKey), 
BLOBSTORE_DATA);
+            assertTrue("blob data not committed", fs.exists(dataFile));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile).getPermission());
-        assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
-        // test read
-        BlobStoreFile readpFile = hbs.read(validKey);
-        String readString = IOUtils.toString(readpFile.getInputStream(), 
"UTF-8");
-        assertEquals("string read from blob doesn't match", testString, 
readString);
-
-        // test listkeys
-        Iterator<String> keys = hbs.listKeys();
-        assertTrue("blob has one key", keys.hasNext());
-        assertEquals("one key in blobstore", validKey, keys.next());
-
-        // delete
-        hbs.deleteKey(validKey);
-        assertFalse("key not deleted", fs.exists(dataFile));
-        assertFalse("key not deleted", hbs.exists(validKey));
-
-        // Now do multiple
-        String testString2 = "testingblob2";
-        String validKey2= "validkey2";
-
-        // test write
-        pfile = hbs.write(validKey, false);
-        pfile.setMetadata(meta);
-        ios = pfile.getOutputStream();
-        ios.write(testString.getBytes(Charset.forName("UTF-8")));
-        ios.close();
-
-        // test commit creates properly
-        assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir));
-        pfile.commit();
-        assertTrue("blob data not committed", fs.exists(dataFile));
-        assertEquals("BlobStore dir was created with wrong permissions",
+            assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+            // test read
+            BlobStoreFile readpFile = hbs.read(validKey);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, 
StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", 
testString, readString);
+            }
+
+            // test listkeys
+            Iterator<String> keys = hbs.listKeys();
+            assertTrue("blob has one key", keys.hasNext());
+            assertEquals("one key in blobstore", validKey, keys.next());
+
+            // delete
+            hbs.deleteKey(validKey);
+            assertFalse("key not deleted", fs.exists(dataFile));
+            assertFalse("key not deleted", hbs.exists(validKey));
+
+            // Now do multiple
+            String testString2 = "testingblob2";
+            String validKey2 = "validkey2";
+
+            // test write
+            pfile = hbs.write(validKey, false);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit creates properly
+            assertTrue("BlobStore key dir wasn't created", 
fs.exists(fullKeyDir));
+            pfile.commit();
+            assertTrue("blob data not committed", fs.exists(dataFile));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile).getPermission());
-        assertTrue("key doesn't exist but should", hbs.exists(validKey));
-
-        // test write again
-        pfile = hbs.write(validKey2, false);
-        pfile.setMetadata(meta);
-        OutputStream ios2 = pfile.getOutputStream();
-        ios2.write(testString2.getBytes(Charset.forName("UTF-8")));
-        ios2.close();
-
-        // test commit second creates properly
-        pfile.commit();
-        Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), 
BLOBSTORE_DATA);
-        assertTrue("blob data not committed", fs.exists(dataFile2));
-        assertEquals("BlobStore dir was created with wrong permissions",
+            assertTrue("key doesn't exist but should", hbs.exists(validKey));
+
+            // test write again
+            pfile = hbs.write(validKey2, false);
+            pfile.setMetadata(meta);
+            try (OutputStream ios2 = pfile.getOutputStream()) {
+                ios2.write(testString2.getBytes(StandardCharsets.UTF_8));
+            }
+
+            // test commit second creates properly
+            pfile.commit();
+            Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), 
BLOBSTORE_DATA);
+            assertTrue("blob data not committed", fs.exists(dataFile2));
+            assertEquals("BlobStore dir was created with wrong permissions",
                 HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, 
fs.getFileStatus(dataFile2).getPermission());
-        assertTrue("key doesn't exist but should", hbs.exists(validKey2));
-
-        // test listkeys
-        keys = hbs.listKeys();
-        int total = 0;
-        boolean key1Found = false;
-        boolean key2Found = false;
-        while(keys.hasNext()) {
-            total++;
-            String key = keys.next();
-            if (key.equals(validKey)) {
-                key1Found = true;
-            } else if (key.equals(validKey2)) {
-                key2Found = true;
-            } else {
-                fail("Found key that wasn't expected: " + key);
+            assertTrue("key doesn't exist but should", hbs.exists(validKey2));
+
+            // test listkeys
+            keys = hbs.listKeys();
+            int total = 0;
+            boolean key1Found = false;
+            boolean key2Found = false;
+            while (keys.hasNext()) {
+                total++;
+                String key = keys.next();
+                if (key.equals(validKey)) {
+                    key1Found = true;
+                } else if (key.equals(validKey2)) {
+                    key2Found = true;
+                } else {
+                    fail("Found key that wasn't expected: " + key);
+                }
             }
+            assertEquals("number of keys is wrong", 2, total);
+            assertTrue("blobstore missing key1", key1Found);
+            assertTrue("blobstore missing key2", key2Found);
+
+            // test read
+            readpFile = hbs.read(validKey);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, 
StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", 
testString, readString);
+            }
+
+            // test read
+            readpFile = hbs.read(validKey2);
+            try (InputStream inStream = readpFile.getInputStream()) {
+                String readString = IOUtils.toString(inStream, 
StandardCharsets.UTF_8);
+                assertEquals("string read from blob doesn't match", 
testString2, readString);
+            }
+
+            hbs.deleteKey(validKey);
+            assertFalse("key not deleted", hbs.exists(validKey));
+            hbs.deleteKey(validKey2);
+            assertFalse("key not deleted", hbs.exists(validKey2));
         }
-        assertEquals("number of keys is wrong", 2, total);
-        assertTrue("blobstore missing key1", key1Found);
-        assertTrue("blobstore missing key2", key2Found);
-
-        // test read
-        readpFile = hbs.read(validKey);
-        readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8");
-        assertEquals("string read from blob doesn't match", testString, 
readString);
-
-        // test read
-        readpFile = hbs.read(validKey2);
-        readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8");
-        assertEquals("string read from blob doesn't match", testString2, 
readString);
-
-        hbs.deleteKey(validKey);
-        assertFalse("key not deleted", hbs.exists(validKey));
-        hbs.deleteKey(validKey2);
-        assertFalse("key not deleted", hbs.exists(validKey2));
     }
 
     @Test
-    public void testGetFileLength() throws IOException {
-        FileSystem fs = dfscluster.getFileSystem();
-        Map conf = new HashMap();
+    public void testGetFileLength() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
         String validKey = "validkeyBasic";
         String testString = "testingblob";
-        TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, 
hadoopConf);
-        BlobStoreFile pfile = hbs.write(validKey, false);
-        // Adding metadata to avoid null pointer exception
-        SettableBlobMeta meta = new SettableBlobMeta();
-        meta.set_replication_factor(1);
-        pfile.setMetadata(meta);
-        OutputStream ios = pfile.getOutputStream();
-        ios.write(testString.getBytes(Charset.forName("UTF-8")));
-        ios.close();
-        assertEquals(testString.getBytes(Charset.forName("UTF-8")).length, 
pfile.getFileLength());
+        try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, 
conf, DFS_CLUSTER_RULE.getHadoopConf())) {
+            BlobStoreFile pfile = hbs.write(validKey, false);
+            // Adding metadata to avoid null pointer exception
+            SettableBlobMeta meta = new SettableBlobMeta();
+            meta.set_replication_factor(1);
+            pfile.setMetadata(meta);
+            try (OutputStream ios = pfile.getOutputStream()) {
+                ios.write(testString.getBytes(StandardCharsets.UTF_8));
+            }
+            assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, 
pfile.getFileLength());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
index cd828da..0b08e1b 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -40,49 +41,71 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.junit.Before;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.Assert;
 
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
-
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule.Java7Supplier;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
 public class AvroGenericRecordBoltTest {
 
-    private String hdfsURI;
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new 
Java7Supplier<Configuration>() {
+        @Override
+        public Configuration get() {
+            Configuration conf = new Configuration();
+            conf.set("fs.trash.interval", "10");
+            conf.setBoolean("dfs.permissions", true);
+            File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+            return conf;
+        }
+    });
+    @Mock
+    private OutputCollector collector;
+    @Mock
+    private TopologyContext topologyContext;
+
     private DistributedFileSystem fs;
-    private MiniDFSCluster hdfsCluster;
+    private String hdfsURI;
     private static final String testRoot = "/unittest";
-    private static final Schema schema1;
-    private static final Schema schema2;
-    private static final Tuple tuple1;
-    private static final Tuple tuple2;
-    private static final String schemaV1 = "{\"type\":\"record\"," +
-            "\"name\":\"myrecord\"," +
-            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
-            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
-
-    private static final String schemaV2 = "{\"type\":\"record\"," +
-            "\"name\":\"myrecord\"," +
-            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
-            "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," +
-            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
-
-    static {
-
+    private static Schema schema1;
+    private static Schema schema2;
+    private static Tuple tuple1;
+    private static Tuple tuple2;
+    private static final String schemaV1 = "{\"type\":\"record\","
+        + "\"name\":\"myrecord\","
+        + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},"
+        + "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+
+    private static final String schemaV2 = "{\"type\":\"record\","
+        + "\"name\":\"myrecord\","
+        + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},"
+        + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" },"
+        + "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+
+    @BeforeClass
+    public static void setupClass() {
         Schema.Parser parser = new Schema.Parser();
         schema1 = parser.parse(schemaV1);
 
@@ -100,33 +123,19 @@ public class AvroGenericRecordBoltTest {
         tuple2 = generateTestTuple(builder2.build());
     }
 
-    @Mock private OutputCollector collector;
-    @Mock private TopologyContext topologyContext;
-
     @Before
     public void setup() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        Configuration conf = new Configuration();
-        conf.set("fs.trash.interval", "10");
-        conf.setBoolean("dfs.permissions", true);
-        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
-        FileUtil.fullyDelete(baseDir);
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-
-        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-        hdfsCluster = builder.build();
-        fs = hdfsCluster.getFileSystem();
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
         hdfsURI = fs.getUri() + "/";
     }
 
     @After
     public void shutDown() throws IOException {
         fs.close();
-        hdfsCluster.shutdown();
     }
 
-    @Test public void multipleTuplesOneFile() throws IOException
-    {
+    @Test
+    public void multipleTuplesOneFile() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -139,8 +148,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void multipleTuplesMutliplesFiles() throws IOException
-    {
+    @Test
+    public void multipleTuplesMutliplesFiles() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, 
schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -153,8 +162,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void forwardSchemaChangeWorks() throws IOException
-    {
+    @Test
+    public void forwardSchemaChangeWorks() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV1);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -167,8 +176,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void backwardSchemaChangeWorks() throws IOException
-    {
+    @Test
+    public void backwardSchemaChangeWorks() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -180,8 +189,8 @@ public class AvroGenericRecordBoltTest {
         verifyAllAvroFiles(testRoot);
     }
 
-    @Test public void schemaThrashing() throws IOException
-    {
+    @Test
+    public void schemaThrashing() throws IOException {
         AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2);
 
         bolt.prepare(new Config(), topologyContext, collector);
@@ -206,19 +215,19 @@ public class AvroGenericRecordBoltTest {
         FileNameFormat fieldsFileNameFormat = new 
DefaultFileNameFormat().withPath(testRoot);
 
         FileRotationPolicy rotationPolicy =
-                new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
+            new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
 
         return new AvroGenericRecordBolt()
-                .withFsUrl(nameNodeAddr)
-                .withFileNameFormat(fieldsFileNameFormat)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(fieldsSyncPolicy);
+            .withFsUrl(nameNodeAddr)
+            .withFileNameFormat(fieldsFileNameFormat)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(fieldsSyncPolicy);
     }
 
     private static Tuple generateTestTuple(GenericRecord record) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+            new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("record");
@@ -250,24 +259,23 @@ public class AvroGenericRecordBoltTest {
         return nonZero;
     }
 
-    private void fileIsGoodAvro (Path path) throws IOException {
+    private void fileIsGoodAvro(Path path) throws IOException {
         DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-        FSDataInputStream in = fs.open(path, 0);
-        FileOutputStream out = new FileOutputStream("target/FOO.avro");
-
-        byte[] buffer = new byte[100];
-        int bytesRead;
-        while ((bytesRead = in.read(buffer)) > 0) {
-            out.write(buffer, 0, bytesRead);
+        try (FSDataInputStream in = fs.open(path, 0); FileOutputStream out = 
new FileOutputStream("target/FOO.avro")) {
+            byte[] buffer = new byte[100];
+            int bytesRead;
+            while ((bytesRead = in.read(buffer)) > 0) {
+                out.write(buffer, 0, bytesRead);
+            }
         }
-        out.close();
 
         java.io.File file = new File("target/FOO.avro");
 
-        DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<>(file, datumReader);
-        GenericRecord user = null;
-        while (dataFileReader.hasNext()) {
-            user = dataFileReader.next(user);
+        try (DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<>(file, datumReader)) {
+            GenericRecord user = null;
+            while (dataFileReader.hasNext()) {
+                user = dataFileReader.next(user);
+            }
         }
 
         file.delete();

http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index e8f0702..9386c9f 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -44,7 +45,7 @@ import org.junit.Assert;
 
 import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+
 import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -59,40 +60,50 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestHdfsBolt {
 
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new 
MiniDFSClusterRule.Java7Supplier<Configuration>() {
+        @Override
+        public Configuration get() {
+            Configuration conf = new Configuration();
+            conf.set("fs.trash.interval", "10");
+            conf.setBoolean("dfs.permissions", true);
+            File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+            return conf;
+        }
+    });
+
     private String hdfsURI;
     private DistributedFileSystem fs;
-    private MiniDFSCluster hdfsCluster;
     private static final String testRoot = "/unittest";
     Tuple tuple1 = generateTestTuple(1, "First Tuple", "SFO", "CA");
     Tuple tuple2 = generateTestTuple(1, "Second Tuple", "SJO", "CA");
 
-    @Mock private OutputCollector collector;
-    @Mock private TopologyContext topologyContext;
-    @Rule public ExpectedException thrown = ExpectedException.none();
+    @Mock
+    private OutputCollector collector;
+    @Mock
+    private TopologyContext topologyContext;
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
 
     @Before
     public void setup() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        Configuration conf = new Configuration();
-        conf.set("fs.trash.interval", "10");
-        conf.setBoolean("dfs.permissions", true);
-        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
-        FileUtil.fullyDelete(baseDir);
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-
-        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-        hdfsCluster = builder.build();
-        fs = hdfsCluster.getFileSystem();
-        hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
+        hdfsURI = "hdfs://localhost:" + 
dfsClusterRule.getDfscluster().getNameNodePort() + "/";
     }
 
     @After
     public void shutDown() throws IOException {
         fs.close();
-        hdfsCluster.shutdown();
     }
 
     @Test
@@ -134,8 +145,7 @@ public class TestHdfsBolt {
     }
 
     @Test
-    public void testTwoTuplesOneFile() throws IOException
-    {
+    public void testTwoTuplesOneFile() throws IOException {
         HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
         bolt.execute(tuple1);
@@ -150,8 +160,7 @@ public class TestHdfsBolt {
     }
 
     @Test
-    public void testFailedSync() throws IOException
-    {
+    public void testFailedSync() throws IOException {
         HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
         bolt.execute(tuple1);
@@ -167,27 +176,23 @@ public class TestHdfsBolt {
     // One tuple and one rotation should yield one file with data
     // The failed executions should not cause rotations and any new files
     @Test
-    public void testFailureFilecount() throws IOException, InterruptedException
-    {
+    public void testFailureFilecount() throws IOException, 
InterruptedException {
         HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .000001f);
         bolt.prepare(new Config(), topologyContext, collector);
 
         bolt.execute(tuple1);
         fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-        try
-        {
+        try {
             bolt.execute(tuple2);
         } catch (RuntimeException e) {
             //
         }
-        try
-        {
+        try {
             bolt.execute(tuple2);
         } catch (RuntimeException e) {
             //
         }
-        try
-        {
+        try {
             bolt.execute(tuple2);
         } catch (RuntimeException e) {
             //
@@ -198,8 +203,7 @@ public class TestHdfsBolt {
     }
 
     @Test
-    public void testTickTuples() throws IOException
-    {
+    public void testTickTuples() throws IOException {
         HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
 
@@ -226,36 +230,28 @@ public class TestHdfsBolt {
         SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync);
 
         FileRotationPolicy fieldsRotationPolicy =
-                new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
+            new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
 
         FileNameFormat fieldsFileNameFormat = new 
DefaultFileNameFormat().withPath(testRoot);
 
         return new HdfsBolt()
-                .withFsUrl(nameNodeAddr)
-                .withFileNameFormat(fieldsFileNameFormat)
-                .withRecordFormat(fieldsFormat)
-                .withRotationPolicy(fieldsRotationPolicy)
-                .withSyncPolicy(fieldsSyncPolicy);
+            .withFsUrl(nameNodeAddr)
+            .withFileNameFormat(fieldsFileNameFormat)
+            .withRecordFormat(fieldsFormat)
+            .withRotationPolicy(fieldsRotationPolicy)
+            .withSyncPolicy(fieldsSyncPolicy);
     }
 
-    private Tuple generateTestTuple(Object id, Object msg,Object city,Object 
state) {
+    private Tuple generateTestTuple(Object id, Object msg, Object city, Object 
state) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+            new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
-                return new Fields("id", "msg","city","state");
+                return new Fields("id", "msg", "city", "state");
             }
         };
-        return new TupleImpl(topologyContext, new Values(id, msg,city,state), 
1, "");
-    }
-
-    private void printFiles(String path) throws IOException {
-        Path p = new Path(path);
-        FileStatus[] fileStatuses = fs.listStatus(p);
-        for (FileStatus file : fileStatuses) {
-            System.out.println("@@@ " + file.getPath() + " [" + file.getLen() 
+ "]");
-        }
+        return new TupleImpl(topologyContext, new Values(id, msg, city, 
state), 1, "");
     }
 
     // Generally used to compare how files were actually written and compare 
to expectations based on total
@@ -264,9 +260,11 @@ public class TestHdfsBolt {
         Path p = new Path(path);
         int nonZero = 0;
 
-        for (FileStatus file : fs.listStatus(p))
-            if (file.getLen() > 0)
+        for (FileStatus file : fs.listStatus(p)) {
+            if (file.getLen() > 0) {
                 nonZero++;
+            }
+        }
 
         return nonZero;
     }
@@ -275,9 +273,11 @@ public class TestHdfsBolt {
         Path p = new Path(path);
         int zeroLength = 0;
 
-        for (FileStatus file : fs.listStatus(p))
-            if (file.getLen() == 0)
+        for (FileStatus file : fs.listStatus(p)) {
+            if (file.getLen() == 0) {
                 zeroLength++;
+            }
+        }
 
         return zeroLength;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
index 9913d9d..7fee578 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,8 +18,10 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
 import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -39,13 +42,10 @@ import org.junit.*;
 
 import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -56,42 +56,51 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestSequenceFileBolt {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestSequenceFileBolt.class);
 
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new 
MiniDFSClusterRule.Java7Supplier<Configuration>() {
+        @Override
+        public Configuration get() {
+            Configuration conf = new Configuration();
+            conf.set("fs.trash.interval", "10");
+            conf.setBoolean("dfs.permissions", true);
+            File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+            return conf;
+        }
+    });
+
     private String hdfsURI;
     private DistributedFileSystem fs;
-    private MiniDFSCluster hdfsCluster;
     private static final String testRoot = "/unittest";
     Tuple tuple1 = generateTestTuple(1l, "first tuple");
     Tuple tuple2 = generateTestTuple(2l, "second tuple");
 
-    @Mock private OutputCollector collector;
-    @Mock private TopologyContext topologyContext;
-    @Rule public ExpectedException thrown = ExpectedException.none();
+    @Mock
+    private OutputCollector collector;
+    @Mock
+    private TopologyContext topologyContext;
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
 
     @Before
     public void setup() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        Configuration conf = new Configuration();
-        conf.set("fs.trash.interval", "10");
-        conf.setBoolean("dfs.permissions", true);
-        File baseDir = new File("./target/hdfs/").getAbsoluteFile();
-        FileUtil.fullyDelete(baseDir);
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-
-        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-        hdfsCluster = builder.build();
-        fs = hdfsCluster.getFileSystem();
-        hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
+        hdfsURI = "hdfs://localhost:" + 
dfsClusterRule.getDfscluster().getNameNodePort() + "/";
     }
 
     @After
     public void shutDown() throws IOException {
         fs.close();
-        hdfsCluster.shutdown();
     }
 
     @Test
@@ -109,8 +118,7 @@ public class TestSequenceFileBolt {
     }
 
     @Test
-    public void testTwoTuplesOneFile() throws IOException
-    {
+    public void testTwoTuplesOneFile() throws IOException {
         SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
         bolt.execute(tuple1);
@@ -125,8 +133,7 @@ public class TestSequenceFileBolt {
     }
 
     @Test
-    public void testFailedSync() throws IOException
-    {
+    public void testFailedSync() throws IOException {
         SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f);
         bolt.prepare(new Config(), topologyContext, collector);
         bolt.execute(tuple1);
@@ -142,24 +149,24 @@ public class TestSequenceFileBolt {
         SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync);
 
         FileRotationPolicy fieldsRotationPolicy =
-                new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
+            new FileSizeRotationPolicy(rotationSizeMB, 
FileSizeRotationPolicy.Units.MB);
 
         FileNameFormat fieldsFileNameFormat = new 
DefaultFileNameFormat().withPath(testRoot);
 
         SequenceFormat seqFormat = new DefaultSequenceFormat("key", "value");
 
         return new SequenceFileBolt()
-                .withFsUrl(nameNodeAddr)
-                .withFileNameFormat(fieldsFileNameFormat)
-                .withRotationPolicy(fieldsRotationPolicy)
-                .withSequenceFormat(seqFormat)
-                .withSyncPolicy(fieldsSyncPolicy);
+            .withFsUrl(nameNodeAddr)
+            .withFileNameFormat(fieldsFileNameFormat)
+            .withRotationPolicy(fieldsRotationPolicy)
+            .withSequenceFormat(seqFormat)
+            .withSyncPolicy(fieldsSyncPolicy);
     }
 
     private Tuple generateTestTuple(Long key, String value) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+            new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("key", "value");

Reply via email to