http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 725fa11..df131a4 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -9,388 +10,380 @@
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 package org.apache.storm.hdfs.spout;
 
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.storm.hdfs.common.HdfsUtils;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.Rule;
 
 public class TestFileLock {
 
-  static MiniDFSCluster.Builder builder;
-  static MiniDFSCluster hdfsCluster;
-  static FileSystem fs;
-  static String hdfsURI;
-  static HdfsConfiguration conf = new  HdfsConfiguration();
-
-  private Path filesDir = new Path("/tmp/filesdir");
-  private Path locksDir = new Path("/tmp/locskdir");
-
-  @BeforeClass
-  public static void setupClass() throws IOException {
-    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
-    builder = new MiniDFSCluster.Builder(new Configuration());
-    hdfsCluster = builder.build();
-    fs  = hdfsCluster.getFileSystem();
-    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
-  }
-
-  @AfterClass
-  public static void teardownClass() throws IOException {
-    fs.close();
-    hdfsCluster.shutdown();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    assert fs.mkdirs(filesDir) ;
-    assert fs.mkdirs(locksDir) ;
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    fs.delete(filesDir, true);
-    fs.delete(locksDir, true);
-  }
-
-  @Test
-  public void testBasicLocking() throws Exception {
-  // create empty files in filesDir
-    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
-    Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
-    fs.create(file1).close();
-    fs.create(file2).close(); // create empty file
-
-    // acquire lock on file1 and verify if worked
-    FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    Assert.assertNotNull(lock1a);
-    Assert.assertTrue(fs.exists(lock1a.getLockFile()));
-    Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify 
lock file location
-    Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // 
verify lock filename
-
-    // acquire another lock on file1 and verify it failed
-    FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    Assert.assertNull(lock1b);
-
-    // release lock on file1 and check
-    lock1a.release();
-    Assert.assertFalse(fs.exists(lock1a.getLockFile()));
-
-    // Retry locking and verify
-    FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    Assert.assertNotNull(lock1c);
-    Assert.assertTrue(fs.exists(lock1c.getLockFile()));
-    Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify 
lock file location
-    Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // 
verify lock filename
-
-    // try locking another file2 at the same time
-    FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1");
-    Assert.assertNotNull(lock2a);
-    Assert.assertTrue(fs.exists(lock2a.getLockFile()));
-    Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify 
lock file location
-    Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); // 
verify lock filename
-
-    // release both locks
-    lock2a.release();
-    Assert.assertFalse(fs.exists(lock2a.getLockFile()));
-    lock1c.release();
-    Assert.assertFalse(fs.exists(lock1c.getLockFile()));
-  }
-
-  @Test
-  public void testHeartbeat() throws Exception {
-    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
-    fs.create(file1).close();
-
-    // acquire lock on file1
-    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    Assert.assertNotNull(lock1);
-    Assert.assertTrue(fs.exists(lock1.getLockFile()));
-
-    ArrayList<String> lines = readTextFile(lock1.getLockFile());
-    Assert.assertEquals("heartbeats appear to be missing", 1, lines.size());
-
-    // hearbeat upon it
-    lock1.heartbeat("1");
-    lock1.heartbeat("2");
-    lock1.heartbeat("3");
-
-    lines = readTextFile(lock1.getLockFile());
-    Assert.assertEquals("heartbeats appear to be missing", 4, lines.size());
-
-    lock1.heartbeat("4");
-    lock1.heartbeat("5");
-    lock1.heartbeat("6");
-
-    lines = readTextFile(lock1.getLockFile());
-    Assert.assertEquals("heartbeats appear to be missing", 7,  lines.size());
-
-    lock1.release();
-    lines = readTextFile(lock1.getLockFile());
-    Assert.assertNull(lines);
-    Assert.assertFalse(fs.exists(lock1.getLockFile()));
-  }
-
-  @Test
-  public void testConcurrentLocking() throws IOException, InterruptedException 
{
-    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
-    fs.create(file1).close();
-
-    FileLockingThread[] thds = startThreads(100, file1, locksDir);
-    for (FileLockingThread thd : thds) {
-      thd.join();
-      if( !thd.cleanExit) {
-        System.err.println(thd.getName() + " did not exit cleanly");
-      }
-      Assert.assertTrue(thd.cleanExit);
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule();
+
+    private FileSystem fs;
+    private HdfsConfiguration conf = new HdfsConfiguration();
+
+    private final Path filesDir = new Path("/tmp/filesdir");
+    private final Path locksDir = new Path("/tmp/locksdir");
+
+    @Before
+    public void setup() throws IOException {
+        conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, "5000");
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
+        assert fs.mkdirs(filesDir);
+        assert fs.mkdirs(locksDir);
     }
 
-    Path lockFile = new Path(locksDir + Path.SEPARATOR + file1.getName());
-    Assert.assertFalse(fs.exists(lockFile));
-  }
+    @After
+    public void teardown() throws IOException {
+        fs.delete(filesDir, true);
+        fs.delete(locksDir, true);
+        fs.close();
+    }
 
-  private FileLockingThread[] startThreads(int thdCount, Path fileToLock, Path 
locksDir)
-          throws IOException {
-    FileLockingThread[] result = new FileLockingThread[thdCount];
-    for (int i = 0; i < thdCount; i++) {
-      result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, "spout" + 
Integer.toString(i));
+    @Test
+    public void testBasicLocking() throws Exception {
+        // create empty files in filesDir
+        Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+        Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+        fs.create(file1).close();
+        fs.create(file2).close(); // create empty file
+
+        // acquire lock on file1 and verify if worked
+        FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        Assert.assertNotNull(lock1a);
+        Assert.assertTrue(fs.exists(lock1a.getLockFile()));
+        Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // 
verify lock file location
+        Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); 
// verify lock filename
+
+        // acquire another lock on file1 and verify it failed
+        FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        Assert.assertNull(lock1b);
+
+        // release lock on file1 and check
+        lock1a.release();
+        Assert.assertFalse(fs.exists(lock1a.getLockFile()));
+
+        // Retry locking and verify
+        FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        Assert.assertNotNull(lock1c);
+        Assert.assertTrue(fs.exists(lock1c.getLockFile()));
+        Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // 
verify lock file location
+        Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); 
// verify lock filename
+
+        // try locking another file2 at the same time
+        FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1");
+        Assert.assertNotNull(lock2a);
+        Assert.assertTrue(fs.exists(lock2a.getLockFile()));
+        Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // 
verify lock file location
+        Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); 
// verify lock filename
+
+        // release both locks
+        lock2a.release();
+        Assert.assertFalse(fs.exists(lock2a.getLockFile()));
+        lock1c.release();
+        Assert.assertFalse(fs.exists(lock1c.getLockFile()));
     }
 
-    for (FileLockingThread thd : result) {
-      thd.start();
+    @Test
+    public void testHeartbeat() throws Exception {
+        Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+        fs.create(file1).close();
+
+        // acquire lock on file1
+        FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        Assert.assertNotNull(lock1);
+        Assert.assertTrue(fs.exists(lock1.getLockFile()));
+
+        ArrayList<String> lines = readTextFile(lock1.getLockFile());
+        Assert.assertEquals("heartbeats appear to be missing", 1, 
lines.size());
+
+        // hearbeat upon it
+        lock1.heartbeat("1");
+        lock1.heartbeat("2");
+        lock1.heartbeat("3");
+
+        lines = readTextFile(lock1.getLockFile());
+        Assert.assertEquals("heartbeats appear to be missing", 4, 
lines.size());
+
+        lock1.heartbeat("4");
+        lock1.heartbeat("5");
+        lock1.heartbeat("6");
+
+        lines = readTextFile(lock1.getLockFile());
+        Assert.assertEquals("heartbeats appear to be missing", 7, 
lines.size());
+
+        lock1.release();
+        lines = readTextFile(lock1.getLockFile());
+        Assert.assertNull(lines);
+        Assert.assertFalse(fs.exists(lock1.getLockFile()));
     }
-    return result;
-  }
-
-
-  @Test
-  public void testStaleLockDetection_SingleLock() throws Exception {
-    final int LOCK_EXPIRY_SEC = 1;
-    final int WAIT_MSEC = 1500;
-    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
-    fs.create(file1).close();
-    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    try {
-      // acquire lock on file1
-      Assert.assertNotNull(lock1);
-      Assert.assertTrue(fs.exists(lock1.getLockFile()));
-      Thread.sleep(WAIT_MSEC);   // wait for lock to expire
-      HdfsUtils.Pair<Path, FileLock.LogEntry> expired = 
FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
-      Assert.assertNotNull(expired);
-
-      // heartbeat, ensure its no longer stale and read back the heartbeat data
-      lock1.heartbeat("1");
-      expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1);
-      Assert.assertNull(expired);
-
-      FileLock.LogEntry lastEntry = lock1.getLastLogEntry();
-      Assert.assertNotNull(lastEntry);
-      Assert.assertEquals("1", lastEntry.fileOffset);
-
-      // wait and check for expiry again
-      Thread.sleep(WAIT_MSEC);
-      expired = FileLock.locateOldestExpiredLock(fs, locksDir, 
LOCK_EXPIRY_SEC);
-      Assert.assertNotNull(expired);
-    } finally {
-      lock1.release();
-      fs.delete(file1, false);
+
+    @Test
+    public void testConcurrentLocking() throws IOException, 
InterruptedException {
+        Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+        fs.create(file1).close();
+
+        FileLockingThread[] threads = null;
+        try {
+            threads = startThreads(100, file1, locksDir);
+            for (FileLockingThread thd : threads) {
+                thd.join(30_000);
+                Assert.assertTrue(thd.getName() + " did not exit cleanly", 
thd.cleanExit);
+            }
+
+            Path lockFile = new Path(locksDir + Path.SEPARATOR + 
file1.getName());
+            Assert.assertFalse(fs.exists(lockFile));
+        } finally {
+            if (threads != null) {
+                for (FileLockingThread thread : threads) {
+                    thread.interrupt();
+                    thread.join(30_000);
+                    if (thread.isAlive()) {
+                        throw new RuntimeException("Failed to stop threads 
within 30 seconds, threads may leak into other tests");
+                    }
+                }
+            }
+        }
     }
-  }
-
-  @Test
-  public void testStaleLockDetection_MultipleLocks() throws Exception {
-    final int LOCK_EXPIRY_SEC = 1;
-    final int WAIT_MSEC = 1500;
-    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
-    Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
-    Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
-
-    fs.create(file1).close();
-    fs.create(file2).close();
-    fs.create(file3).close();
-
-    // 1) acquire locks on file1,file2,file3
-    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
-    FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
-    Assert.assertNotNull(lock1);
-    Assert.assertNotNull(lock2);
-    Assert.assertNotNull(lock3);
-
-    try {
-      HdfsUtils.Pair<Path, FileLock.LogEntry> expired = 
FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
-      Assert.assertNull(expired);
-
-      // 2) wait for all 3 locks to expire then heart beat on 2 locks and 
verify stale lock
-      Thread.sleep(WAIT_MSEC);
-      lock1.heartbeat("1");
-      lock2.heartbeat("1");
-
-      expired = FileLock.locateOldestExpiredLock(fs, locksDir, 
LOCK_EXPIRY_SEC);
-      Assert.assertNotNull(expired);
-      Assert.assertEquals("spout3", expired.getValue().componentID);
-    } finally {
-      lock1.release();
-      lock2.release();
-      lock3.release();
-      fs.delete(file1, false);
-      fs.delete(file2, false);
-      fs.delete(file3, false);
+
+    private FileLockingThread[] startThreads(int thdCount, Path fileToLock, 
Path locksDir)
+        throws IOException {
+        FileLockingThread[] result = new FileLockingThread[thdCount];
+        for (int i = 0; i < thdCount; i++) {
+            result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, 
"spout" + Integer.toString(i));
+        }
+
+        for (FileLockingThread thd : result) {
+            thd.start();
+        }
+        return result;
     }
-  }
-
-  @Test
-  public void testLockRecovery() throws Exception {
-    final int LOCK_EXPIRY_SEC = 1;
-    final int WAIT_MSEC = LOCK_EXPIRY_SEC*1000 + 500;
-    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
-    Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
-    Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
-
-    fs.create(file1).close();
-    fs.create(file2).close();
-    fs.create(file3).close();
-
-    // 1) acquire locks on file1,file2,file3
-    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
-    FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
-    FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
-    Assert.assertNotNull(lock1);
-    Assert.assertNotNull(lock2);
-    Assert.assertNotNull(lock3);
-
-    try {
-      HdfsUtils.Pair<Path, FileLock.LogEntry> expired = 
FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
-      Assert.assertNull(expired);
-
-      // 1) Simulate lock file lease expiring and getting closed by HDFS
-      closeUnderlyingLockFile(lock3);
-
-      // 2) wait for all 3 locks to expire then heart beat on 2 locks
-      Thread.sleep(WAIT_MSEC*2); // wait for locks to expire
-      lock1.heartbeat("1");
-      lock2.heartbeat("1");
-
-      // 3) Take ownership of stale lock
-      FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, 
LOCK_EXPIRY_SEC, "spout1");
-      Assert.assertNotNull(lock3b);
-      Assert.assertEquals("Expected lock3 file", 
Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), 
lock3.getLockFile());
-    } finally {
-      lock1.release();
-      lock2.release();
-      lock3.release();
-      fs.delete(file1, false);
-      fs.delete(file2, false);
-      try {
-        fs.delete(file3, false);
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
+
+    @Test
+    public void testStaleLockDetection_SingleLock() throws Exception {
+        final int LOCK_EXPIRY_SEC = 1;
+        final int WAIT_MSEC = 1500;
+        Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+        fs.create(file1).close();
+        FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        try {
+            // acquire lock on file1
+            Assert.assertNotNull(lock1);
+            Assert.assertTrue(fs.exists(lock1.getLockFile()));
+            Thread.sleep(WAIT_MSEC);   // wait for lock to expire
+            HdfsUtils.Pair<Path, FileLock.LogEntry> expired = 
FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+            Assert.assertNotNull(expired);
+
+            // heartbeat, ensure its no longer stale and read back the 
heartbeat data
+            lock1.heartbeat("1");
+            expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1);
+            Assert.assertNull(expired);
+
+            FileLock.LogEntry lastEntry = lock1.getLastLogEntry();
+            Assert.assertNotNull(lastEntry);
+            Assert.assertEquals("1", lastEntry.fileOffset);
+
+            // wait and check for expiry again
+            Thread.sleep(WAIT_MSEC);
+            expired = FileLock.locateOldestExpiredLock(fs, locksDir, 
LOCK_EXPIRY_SEC);
+            Assert.assertNotNull(expired);
+        } finally {
+            lock1.release();
+            fs.delete(file1, false);
+        }
     }
-  }
-
-  public static void closeUnderlyingLockFile(FileLock lock) throws 
ReflectiveOperationException {
-    Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile");
-    m.setAccessible(true);
-    m.invoke(lock);
-  }
-
-  /** return null if file not found */
-  private ArrayList<String> readTextFile(Path file) throws IOException {
-    FSDataInputStream os = null;
-    try {
-      os = fs.open(file);
-      if (os == null) {
-        return null;
-      }
-      BufferedReader reader = new BufferedReader(new InputStreamReader(os));
-      ArrayList<String> lines = new ArrayList<>();
-      for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
-        lines.add(line);
-      }
-      return lines;
-    } catch( FileNotFoundException e) {
-      return null;
-    } finally {
-      if(os!=null) {
-        os.close();
-      }
+
+    @Test
+    public void testStaleLockDetection_MultipleLocks() throws Exception {
+        final int LOCK_EXPIRY_SEC = 1;
+        final int WAIT_MSEC = 1500;
+        Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+        Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+        Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
+
+        fs.create(file1).close();
+        fs.create(file2).close();
+        fs.create(file3).close();
+
+        // 1) acquire locks on file1,file2,file3
+        FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
+        FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
+        Assert.assertNotNull(lock1);
+        Assert.assertNotNull(lock2);
+        Assert.assertNotNull(lock3);
+
+        try {
+            HdfsUtils.Pair<Path, FileLock.LogEntry> expired = 
FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+            Assert.assertNull(expired);
+
+            // 2) wait for all 3 locks to expire then heart beat on 2 locks 
and verify stale lock
+            Thread.sleep(WAIT_MSEC);
+            lock1.heartbeat("1");
+            lock2.heartbeat("1");
+
+            expired = FileLock.locateOldestExpiredLock(fs, locksDir, 
LOCK_EXPIRY_SEC);
+            Assert.assertNotNull(expired);
+            Assert.assertEquals("spout3", expired.getValue().componentID);
+        } finally {
+            lock1.release();
+            lock2.release();
+            lock3.release();
+            fs.delete(file1, false);
+            fs.delete(file2, false);
+            fs.delete(file3, false);
+        }
     }
-  }
 
-  class FileLockingThread extends Thread {
+    @Test
+    public void testLockRecovery() throws Exception {
+        final int LOCK_EXPIRY_SEC = 1;
+        final int WAIT_MSEC = LOCK_EXPIRY_SEC * 1000 + 500;
+        Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+        Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+        Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
+
+        fs.create(file1).close();
+        fs.create(file2).close();
+        fs.create(file3).close();
+
+        // 1) acquire locks on file1,file2,file3
+        FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+        FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
+        FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
+        Assert.assertNotNull(lock1);
+        Assert.assertNotNull(lock2);
+        Assert.assertNotNull(lock3);
 
-    private int thdNum;
-    private final FileSystem fs;
-    public boolean cleanExit = false;
-    private Path fileToLock;
-    private Path locksDir;
-    private String spoutId;
+        try {
+            HdfsUtils.Pair<Path, FileLock.LogEntry> expired = 
FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+            Assert.assertNull(expired);
+
+            // 1) Simulate lock file lease expiring and getting closed by HDFS
+            closeUnderlyingLockFile(lock3);
+
+            // 2) wait for all 3 locks to expire then heart beat on 2 locks
+            Thread.sleep(WAIT_MSEC * 2); // wait for locks to expire
+            lock1.heartbeat("1");
+            lock2.heartbeat("1");
+
+            // 3) Take ownership of stale lock
+            FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, 
LOCK_EXPIRY_SEC, "spout1");
+            Assert.assertNotNull(lock3b);
+            Assert.assertEquals("Expected lock3 file", 
Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), 
lock3.getLockFile());
+        } finally {
+            lock1.release();
+            lock2.release();
+            lock3.release();
+            fs.delete(file1, false);
+            fs.delete(file2, false);
+            try {
+                fs.delete(file3, false);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
 
-    public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, Path 
locksDir, String spoutId)
-            throws IOException {
-      this.thdNum = thdNum;
-      this.fs = fs;
-      this.fileToLock = fileToLock;
-      this.locksDir = locksDir;
-      this.spoutId = spoutId;
+    public static void closeUnderlyingLockFile(FileLock lock) throws 
ReflectiveOperationException {
+        Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile");
+        m.setAccessible(true);
+        m.invoke(lock);
     }
 
-    @Override
-    public void run() {
-      Thread.currentThread().setName("FileLockingThread-" + thdNum);
-      FileLock lock = null;
-      try {
-        do {
-          System.err.println("Trying lock - " + getName());
-          lock = FileLock.tryLock(fs, this.fileToLock, this.locksDir, spoutId);
-          System.err.println("Acquired lock - " + getName());
-          if(lock==null) {
-            System.out.println("Retrying lock - " + getName());
-          }
-        } while (lock==null);
-        cleanExit= true;
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-      finally {
+    /**
+     * return null if file not found
+     */
+    private ArrayList<String> readTextFile(Path file) throws IOException {
+        FSDataInputStream os = null;
         try {
-          if(lock!=null) {
-            lock.release();
-            System.err.println("Released lock - " + getName());
-          }
-        } catch (IOException e) {
-          e.printStackTrace(System.err);
+            os = fs.open(file);
+            if (os == null) {
+                return null;
+            }
+            BufferedReader reader = new BufferedReader(new 
InputStreamReader(os));
+            ArrayList<String> lines = new ArrayList<>();
+            for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+                lines.add(line);
+            }
+            return lines;
+        } catch (FileNotFoundException e) {
+            return null;
+        } finally {
+            if (os != null) {
+                os.close();
+            }
+        }
+    }
+
+    class FileLockingThread extends Thread {
+
+        private int thdNum;
+        private final FileSystem fs;
+        public boolean cleanExit = false;
+        private Path fileToLock;
+        private Path locksDir;
+        private String spoutId;
+
+        public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, 
Path locksDir, String spoutId)
+            throws IOException {
+            this.thdNum = thdNum;
+            this.fs = fs;
+            this.fileToLock = fileToLock;
+            this.locksDir = locksDir;
+            this.spoutId = spoutId;
         }
-      }
-      System.err.println("Thread exiting - " + getName());
-    } // run()
 
-  } // class FileLockingThread
+        @Override
+        public void run() {
+            Thread.currentThread().setName("FileLockingThread-" + thdNum);
+            FileLock lock = null;
+            try {
+                do {
+                    System.err.println("Trying lock - " + getName());
+                    lock = FileLock.tryLock(fs, this.fileToLock, 
this.locksDir, spoutId);
+                    System.err.println("Acquired lock - " + getName());
+                    if (lock == null) {
+                        System.out.println("Retrying lock - " + getName());
+                    }
+                } while (lock == null && 
!Thread.currentThread().isInterrupted());
+                cleanExit = true;
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                try {
+                    if (lock != null) {
+                        lock.release();
+                        System.err.println("Released lock - " + getName());
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace(System.err);
+                }
+            }
+            System.err.println("Thread exiting - " + getName());
+        } // run()
+
+    } // class FileLockingThread
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/caca8292/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
index 6628cc9..8395d27 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -9,196 +10,193 @@
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-
 package org.apache.storm.hdfs.spout;
 
-import org.apache.hadoop.conf.Configuration;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.notNull;
+
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
+import org.junit.Rule;
 
 public class TestHdfsSemantics {
 
-  static MiniDFSCluster.Builder builder;
-  static MiniDFSCluster hdfsCluster;
-  static FileSystem fs;
-  static String hdfsURI;
-  static HdfsConfiguration conf = new  HdfsConfiguration();
-
-  private Path dir = new Path("/tmp/filesdir");
-
-  @BeforeClass
-  public static void setupClass() throws IOException {
-    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
-    builder = new MiniDFSCluster.Builder(new Configuration());
-    hdfsCluster = builder.build();
-    fs  = hdfsCluster.getFileSystem();
-    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
-  }
-
-  @AfterClass
-  public static void teardownClass() throws IOException {
-    fs.close();
-    hdfsCluster.shutdown();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    assert fs.mkdirs(dir) ;
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    fs.delete(dir, true);
-  }
-
-
-  @Test
-  public void testDeleteSemantics() throws Exception {
-    Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
-//    try {
-    // 1) Delete absent file - should return false
-    Assert.assertFalse(fs.exists(file));
-    try {
-      Assert.assertFalse(fs.delete(file, false));
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+    @Rule
+    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule();
 
-    // 2) deleting open file - should return true
-    fs.create(file, false);
-    Assert.assertTrue(fs.delete(file, false));
-
-    // 3) deleting closed file  - should return true
-    FSDataOutputStream os = fs.create(file, false);
-    os.close();
-    Assert.assertTrue(fs.exists(file));
-    Assert.assertTrue(fs.delete(file, false));
-    Assert.assertFalse(fs.exists(file));
-  }
-
-  @Test
-  public void testConcurrentDeletion() throws Exception {
-    Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
-    fs.create(file).close();
-    // 1 concurrent deletion - only one thread should succeed
-    FileDeletionThread[] thds = startThreads(10, file);
-    int successCount=0;
-    for (FileDeletionThread thd : thds) {
-      thd.join();
-      if( thd.succeeded)
-        successCount++;
-      if(thd.exception!=null)
-        Assert.assertNotNull(thd.exception);
-    }
-    System.err.println(successCount);
-    Assert.assertEquals(1, successCount);
-
-  }
-
-  @Test
-  public void testAppendSemantics() throws Exception {
-    //1 try to append to an open file
-    Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
-    FSDataOutputStream os1 = fs.create(file1, false);
-    try {
-      fs.append(file1); // should fail
-      Assert.assertTrue("Append did not throw an exception", false);
-    } catch (RemoteException e) {
-      // expecting AlreadyBeingCreatedException inside RemoteException
-      Assert.assertEquals(AlreadyBeingCreatedException.class, 
e.unwrapRemoteException().getClass());
+    private FileSystem fs;
+    private final HdfsConfiguration conf = new HdfsConfiguration();
+
+    private final Path dir = new Path("/tmp/filesdir");
+
+    @Before
+    public void setup() throws IOException {
+        conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, "5000");
+        fs = dfsClusterRule.getDfscluster().getFileSystem();
+        assert fs.mkdirs(dir);
     }
 
-    //2 try to append to a closed file
-    os1.close();
-    FSDataOutputStream os2 = fs.append(file1); // should pass
-    os2.close();
-  }
-
-  @Test
-  public void testDoubleCreateSemantics() throws Exception {
-    //1 create an already existing open file w/o override flag
-    Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
-    FSDataOutputStream os1 = fs.create(file1, false);
-    try {
-      fs.create(file1, false); // should fail
-      Assert.assertTrue("Create did not throw an exception", false);
-    } catch (RemoteException e) {
-      Assert.assertEquals(AlreadyBeingCreatedException.class, 
e.unwrapRemoteException().getClass());
+    @After
+    public void teardown() throws IOException {
+        fs.delete(dir, true);
+        fs.close();
     }
-    //2 close file and retry creation
-    os1.close();
-    try {
-      fs.create(file1, false);  // should still fail
-    } catch (FileAlreadyExistsException e) {
-      // expecting this exception
+
+    @Test
+    public void testDeleteSemantics() throws Exception {
+        Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+//    try {
+        // 1) Delete absent file - should return false
+        Assert.assertFalse(fs.exists(file));
+        try {
+            Assert.assertFalse(fs.delete(file, false));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        // 2) deleting open file - should return true
+        fs.create(file, false);
+        Assert.assertTrue(fs.delete(file, false));
+
+        // 3) deleting closed file  - should return true
+        FSDataOutputStream os = fs.create(file, false);
+        os.close();
+        Assert.assertTrue(fs.exists(file));
+        Assert.assertTrue(fs.delete(file, false));
+        Assert.assertFalse(fs.exists(file));
     }
 
-    //3 delete file and retry creation
-    fs.delete(file1, false);
-    FSDataOutputStream os2 = fs.create(file1, false);  // should pass
-    Assert.assertNotNull(os2);
-    os2.close();
-  }
+    @Test
+    public void testConcurrentDeletion() throws Exception {
+        Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+        fs.create(file).close();
+        // 1 concurrent deletion - only one thread should succeed
+        FileDeletionThread[] threads = null;
+        try {
+            threads = startThreads(10, file);
+            int successCount = 0;
+            for (FileDeletionThread thd : threads) {
+                thd.join(30_000);
+                if (thd.succeeded) {
+                    successCount++;
+                }
+                if (thd.exception != null) {
+                    Assert.assertNotNull(thd.exception);
+                }
+            }
+            System.err.println(successCount);
+            Assert.assertEquals(1, successCount);
+        } finally {
+            if (threads != null) {
+                for (FileDeletionThread thread : threads) {
+                    thread.interrupt();
+                    thread.join(30_000);
+                    if (thread.isAlive()) {
+                        throw new RuntimeException("Failed to stop threads 
within 30 seconds, threads may leak into other tests");
+                    }
+                }
+            }
+        }
+    }
 
+    @Test
+    public void testAppendSemantics() throws Exception {
+        //1 try to append to an open file
+        Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+        try (FSDataOutputStream os1 = fs.create(file1, false)) {
+            fs.append(file1); // should fail
+            fail("Append did not throw an exception");
+        } catch (RemoteException e) {
+            // expecting AlreadyBeingCreatedException inside RemoteException
+            Assert.assertEquals(AlreadyBeingCreatedException.class, 
e.unwrapRemoteException().getClass());
+        }
+
+        //2 try to append to a closed file
+        try (FSDataOutputStream os2 = fs.append(file1)) { 
+            assertThat(os2, notNull());
+        }
+    }
 
-  private FileDeletionThread[] startThreads(int thdCount, Path file)
-          throws IOException {
-    FileDeletionThread[] result = new FileDeletionThread[thdCount];
-    for (int i = 0; i < thdCount; i++) {
-      result[i] = new FileDeletionThread(i, fs, file);
+    @Test
+    public void testDoubleCreateSemantics() throws Exception {
+        //1 create an already existing open file w/o override flag
+        Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+        try (FSDataOutputStream os1 = fs.create(file1, false)) {
+            fs.create(file1, false); // should fail
+            fail("Create did not throw an exception");
+        } catch (RemoteException e) {
+            Assert.assertEquals(AlreadyBeingCreatedException.class, 
e.unwrapRemoteException().getClass());
+        }
+        //2 close file and retry creation
+        try {
+            fs.create(file1, false);  // should still fail
+            fail("Create did not throw an exception");
+        } catch (FileAlreadyExistsException e) {
+            // expecting this exception
+        }
+
+        //3 delete file and retry creation
+        fs.delete(file1, false);
+        try (FSDataOutputStream os2 = fs.create(file1, false)) {
+            Assert.assertNotNull(os2);
+        }
     }
 
-    for (FileDeletionThread thd : result) {
-      thd.start();
+    private FileDeletionThread[] startThreads(int thdCount, Path file)
+        throws IOException {
+        FileDeletionThread[] result = new FileDeletionThread[thdCount];
+        for (int i = 0; i < thdCount; i++) {
+            result[i] = new FileDeletionThread(i, fs, file);
+        }
+
+        for (FileDeletionThread thd : result) {
+            thd.start();
+        }
+        return result;
     }
-    return result;
-  }
 
-  private static class FileDeletionThread extends Thread {
+    private static class FileDeletionThread extends Thread {
 
-    private final int thdNum;
-    private final FileSystem fs;
-    private final Path file;
-    public boolean succeeded;
-    public Exception exception = null;
+        private final int thdNum;
+        private final FileSystem fs;
+        private final Path file;
+        public boolean succeeded;
+        public Exception exception = null;
 
-    public FileDeletionThread(int thdNum, FileSystem fs, Path file)
+        public FileDeletionThread(int thdNum, FileSystem fs, Path file)
             throws IOException {
-      this.thdNum = thdNum;
-      this.fs = fs;
-      this.file = file;
-    }
-
-    @Override
-    public void run() {
-      Thread.currentThread().setName("FileDeletionThread-" + thdNum);
-      try {
-        succeeded = fs.delete(file, false);
-      } catch (Exception e) {
-        exception = e;
-      }
-    } // run()
-
-  } // class FileLockingThread
+            this.thdNum = thdNum;
+            this.fs = fs;
+            this.file = file;
+        }
+
+        @Override
+        public void run() {
+            Thread.currentThread().setName("FileDeletionThread-" + thdNum);
+            try {
+                succeeded = fs.delete(file, false);
+            } catch (Exception e) {
+                exception = e;
+            }
+        } // run()
+
+    } // class FileLockingThread
 }

Reply via email to