http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java index b96f1ff..0d1f146 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -9,180 +10,170 @@ * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ - package org.apache.storm.hdfs.spout; - -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.Rule; public class TestDirLock { - static MiniDFSCluster.Builder builder; - static MiniDFSCluster hdfsCluster; - static FileSystem fs; - static String hdfsURI; - static HdfsConfiguration conf = new HdfsConfiguration(); - static final int LOCK_EXPIRY_SEC = 1; - private Path locksDir = new Path("/tmp/lockdir"); - - @BeforeClass - public static void setupClass() throws IOException { - conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); - builder = new MiniDFSCluster.Builder(new Configuration()); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; - } - - @AfterClass - public static void teardownClass() throws IOException { - fs.close(); - hdfsCluster.shutdown(); - } - - @Before - public void setUp() throws Exception { - assert fs.mkdirs(locksDir) ; - } - - @After - public void tearDown() throws Exception { - fs.delete(locksDir, true); - } - - - @Test - public void testBasicLocking() throws Exception { - // 1 grab lock - DirLock lock = DirLock.tryLock(fs, locksDir); - Assert.assertTrue(fs.exists(lock.getLockFile())); - - // 2 try to grab another lock while dir is locked - DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail - Assert.assertNull(lock2); - - // 3 let go first lock - lock.release(); - Assert.assertFalse(fs.exists(lock.getLockFile())); - - // 4 try locking again - lock2 = DirLock.tryLock(fs, locksDir); - Assert.assertTrue(fs.exists(lock2.getLockFile())); - lock2.release(); - Assert.assertFalse(fs.exists(lock.getLockFile())); - lock2.release(); // should be throw - } - - - @Test - public void testConcurrentLocking() throws Exception { - DirLockingThread[] thds = startThreads(100, locksDir); - for (DirLockingThread thd : thds) { - thd.join(); - if( !thd.cleanExit ) { - System.err.println(thd.getName() + " did not exit cleanly"); - } - Assert.assertTrue(thd.cleanExit); - } + @Rule + public MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule(); - Path lockFile = new Path(locksDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE); - Assert.assertFalse(fs.exists(lockFile)); - } + private static final int LOCK_EXPIRY_SEC = 1; - private DirLockingThread[] startThreads(int thdCount, Path dir) - throws IOException { - DirLockingThread[] result = new DirLockingThread[thdCount]; - for (int i = 0; i < thdCount; i++) { - result[i] = new DirLockingThread(i, fs, dir); + private FileSystem fs; + private HdfsConfiguration conf = new HdfsConfiguration(); + private final Path locksDir = new Path("/tmp/lockdir"); + + @Before + public void setUp() throws IOException { + conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, "5000"); + fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem(); + assert fs.mkdirs(locksDir); + } + + @After + public void teardownClass() throws IOException { + fs.delete(locksDir, true); + fs.close(); } - for (DirLockingThread thd : result) { - thd.start(); + @Test + public void testBasicLocking() throws Exception { + // 1 grab lock + DirLock lock = DirLock.tryLock(fs, locksDir); + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // 2 try to grab another lock while dir is locked + DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail + Assert.assertNull(lock2); + + // 3 let go first lock + lock.release(); + Assert.assertFalse(fs.exists(lock.getLockFile())); + + // 4 try locking again + lock2 = DirLock.tryLock(fs, locksDir); + Assert.assertTrue(fs.exists(lock2.getLockFile())); + lock2.release(); + Assert.assertFalse(fs.exists(lock.getLockFile())); + lock2.release(); // should be throw } - return result; - } - @Test - public void testLockRecovery() throws Exception { - DirLock lock1 = DirLock.tryLock(fs, locksDir); // should pass - Assert.assertNotNull(lock1); + @Test + public void testConcurrentLocking() throws Exception { + DirLockingThread[] threads = null; + try { + threads = startThreads(100, locksDir); + for (DirLockingThread thd : threads) { + thd.join(30_000); + Assert.assertTrue(thd.getName() + " did not exit cleanly", thd.cleanExit); + } - DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should fail - Assert.assertNull(lock2); + Path lockFile = new Path(locksDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE); + Assert.assertFalse(fs.exists(lockFile)); + } finally { + if (threads != null) { + for (DirLockingThread thread : threads) { + thread.interrupt(); + thread.join(30_000); + if (thread.isAlive()) { + throw new RuntimeException("Failed to stop threads within 30 seconds, threads may leak into other tests"); + } + } + } + } + } - Thread.sleep(LOCK_EXPIRY_SEC*1000 + 500); // wait for lock to expire - Assert.assertTrue(fs.exists(lock1.getLockFile())); + private DirLockingThread[] startThreads(int thdCount, Path dir) + throws IOException { + DirLockingThread[] result = new DirLockingThread[thdCount]; + for (int i = 0; i < thdCount; i++) { + result[i] = new DirLockingThread(i, fs, dir); + } + + for (DirLockingThread thd : result) { + thd.start(); + } + return result; + } - DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should pass now - Assert.assertNotNull(lock3); - Assert.assertTrue(fs.exists(lock3.getLockFile())); - lock3.release(); - Assert.assertFalse(fs.exists(lock3.getLockFile())); - lock1.release(); // should not throw - } + @Test + public void testLockRecovery() throws Exception { + DirLock lock1 = DirLock.tryLock(fs, locksDir); // should pass + Assert.assertNotNull(lock1); - class DirLockingThread extends Thread { + DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should fail + Assert.assertNull(lock2); - private int thdNum; - private final FileSystem fs; - private final Path dir; - public boolean cleanExit = false; + Thread.sleep(LOCK_EXPIRY_SEC * 1000 + 500); // wait for lock to expire + Assert.assertTrue(fs.exists(lock1.getLockFile())); - public DirLockingThread(int thdNum,FileSystem fs, Path dir) - throws IOException { - this.thdNum = thdNum; - this.fs = fs; - this.dir = dir; + DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should pass now + Assert.assertNotNull(lock3); + Assert.assertTrue(fs.exists(lock3.getLockFile())); + lock3.release(); + Assert.assertFalse(fs.exists(lock3.getLockFile())); + lock1.release(); // should not throw } - @Override - public void run() { - Thread.currentThread().setName("DirLockingThread-" + thdNum); - DirLock lock = null; - try { - do { - System.err.println("Trying lock " + getName()); - lock = DirLock.tryLock(fs, dir); - System.err.println("Acquired lock " + getName()); - if(lock==null) { - System.out.println("Retrying lock - " + getName()); - } - } while (lock==null); - cleanExit= true; - } catch (Exception e) { - e.printStackTrace(); - } - finally { - try { - if(lock!=null) { - lock.release(); - System.err.println("Released lock " + getName()); + class DirLockingThread extends Thread { + + private int thdNum; + private final FileSystem fs; + private final Path dir; + public boolean cleanExit = false; + + public DirLockingThread(int thdNum, FileSystem fs, Path dir) + throws IOException { + this.thdNum = thdNum; + this.fs = fs; + this.dir = dir; + } + + @Override + public void run() { + Thread.currentThread().setName("DirLockingThread-" + thdNum); + DirLock lock = null; + try { + do { + System.err.println("Trying lock " + getName()); + lock = DirLock.tryLock(fs, dir); + System.err.println("Acquired lock " + getName()); + if (lock == null) { + System.out.println("Retrying lock - " + getName()); + } + } while (lock == null && !Thread.currentThread().isInterrupted()); + cleanExit = true; + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (lock != null) { + lock.release(); + System.err.println("Released lock " + getName()); + } + } catch (IOException e) { + e.printStackTrace(System.err); + } } - } catch (IOException e) { - e.printStackTrace(System.err); - } - } - System.err.println("Thread exiting " + getName()); - } // run() - - } // class DirLockingThread + System.err.println("Thread exiting " + getName()); + } // run() + + } // class DirLockingThread }
http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java index 725fa11..df131a4 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -9,388 +10,380 @@ * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.spout; - -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.storm.hdfs.common.HdfsUtils; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; - import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Method; import java.util.ArrayList; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.Rule; public class TestFileLock { - static MiniDFSCluster.Builder builder; - static MiniDFSCluster hdfsCluster; - static FileSystem fs; - static String hdfsURI; - static HdfsConfiguration conf = new HdfsConfiguration(); - - private Path filesDir = new Path("/tmp/filesdir"); - private Path locksDir = new Path("/tmp/locskdir"); - - @BeforeClass - public static void setupClass() throws IOException { - conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); - builder = new MiniDFSCluster.Builder(new Configuration()); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; - } - - @AfterClass - public static void teardownClass() throws IOException { - fs.close(); - hdfsCluster.shutdown(); - } - - @Before - public void setUp() throws Exception { - assert fs.mkdirs(filesDir) ; - assert fs.mkdirs(locksDir) ; - } - - @After - public void tearDown() throws Exception { - fs.delete(filesDir, true); - fs.delete(locksDir, true); - } - - @Test - public void testBasicLocking() throws Exception { - // create empty files in filesDir - Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); - Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); - fs.create(file1).close(); - fs.create(file2).close(); // create empty file - - // acquire lock on file1 and verify if worked - FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1"); - Assert.assertNotNull(lock1a); - Assert.assertTrue(fs.exists(lock1a.getLockFile())); - Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location - Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // verify lock filename - - // acquire another lock on file1 and verify it failed - FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1"); - Assert.assertNull(lock1b); - - // release lock on file1 and check - lock1a.release(); - Assert.assertFalse(fs.exists(lock1a.getLockFile())); - - // Retry locking and verify - FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1"); - Assert.assertNotNull(lock1c); - Assert.assertTrue(fs.exists(lock1c.getLockFile())); - Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location - Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // verify lock filename - - // try locking another file2 at the same time - FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1"); - Assert.assertNotNull(lock2a); - Assert.assertTrue(fs.exists(lock2a.getLockFile())); - Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location - Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); // verify lock filename - - // release both locks - lock2a.release(); - Assert.assertFalse(fs.exists(lock2a.getLockFile())); - lock1c.release(); - Assert.assertFalse(fs.exists(lock1c.getLockFile())); - } - - @Test - public void testHeartbeat() throws Exception { - Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); - fs.create(file1).close(); - - // acquire lock on file1 - FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); - Assert.assertNotNull(lock1); - Assert.assertTrue(fs.exists(lock1.getLockFile())); - - ArrayList<String> lines = readTextFile(lock1.getLockFile()); - Assert.assertEquals("heartbeats appear to be missing", 1, lines.size()); - - // hearbeat upon it - lock1.heartbeat("1"); - lock1.heartbeat("2"); - lock1.heartbeat("3"); - - lines = readTextFile(lock1.getLockFile()); - Assert.assertEquals("heartbeats appear to be missing", 4, lines.size()); - - lock1.heartbeat("4"); - lock1.heartbeat("5"); - lock1.heartbeat("6"); - - lines = readTextFile(lock1.getLockFile()); - Assert.assertEquals("heartbeats appear to be missing", 7, lines.size()); - - lock1.release(); - lines = readTextFile(lock1.getLockFile()); - Assert.assertNull(lines); - Assert.assertFalse(fs.exists(lock1.getLockFile())); - } - - @Test - public void testConcurrentLocking() throws IOException, InterruptedException { - Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); - fs.create(file1).close(); - - FileLockingThread[] thds = startThreads(100, file1, locksDir); - for (FileLockingThread thd : thds) { - thd.join(); - if( !thd.cleanExit) { - System.err.println(thd.getName() + " did not exit cleanly"); - } - Assert.assertTrue(thd.cleanExit); + @Rule + public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(); + + private FileSystem fs; + private HdfsConfiguration conf = new HdfsConfiguration(); + + private final Path filesDir = new Path("/tmp/filesdir"); + private final Path locksDir = new Path("/tmp/locksdir"); + + @Before + public void setup() throws IOException { + conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, "5000"); + fs = dfsClusterRule.getDfscluster().getFileSystem(); + assert fs.mkdirs(filesDir); + assert fs.mkdirs(locksDir); } - Path lockFile = new Path(locksDir + Path.SEPARATOR + file1.getName()); - Assert.assertFalse(fs.exists(lockFile)); - } + @After + public void teardown() throws IOException { + fs.delete(filesDir, true); + fs.delete(locksDir, true); + fs.close(); + } - private FileLockingThread[] startThreads(int thdCount, Path fileToLock, Path locksDir) - throws IOException { - FileLockingThread[] result = new FileLockingThread[thdCount]; - for (int i = 0; i < thdCount; i++) { - result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, "spout" + Integer.toString(i)); + @Test + public void testBasicLocking() throws Exception { + // create empty files in filesDir + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); + fs.create(file1).close(); + fs.create(file2).close(); // create empty file + + // acquire lock on file1 and verify if worked + FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNotNull(lock1a); + Assert.assertTrue(fs.exists(lock1a.getLockFile())); + Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location + Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // verify lock filename + + // acquire another lock on file1 and verify it failed + FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNull(lock1b); + + // release lock on file1 and check + lock1a.release(); + Assert.assertFalse(fs.exists(lock1a.getLockFile())); + + // Retry locking and verify + FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNotNull(lock1c); + Assert.assertTrue(fs.exists(lock1c.getLockFile())); + Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location + Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // verify lock filename + + // try locking another file2 at the same time + FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1"); + Assert.assertNotNull(lock2a); + Assert.assertTrue(fs.exists(lock2a.getLockFile())); + Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location + Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); // verify lock filename + + // release both locks + lock2a.release(); + Assert.assertFalse(fs.exists(lock2a.getLockFile())); + lock1c.release(); + Assert.assertFalse(fs.exists(lock1c.getLockFile())); } - for (FileLockingThread thd : result) { - thd.start(); + @Test + public void testHeartbeat() throws Exception { + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + fs.create(file1).close(); + + // acquire lock on file1 + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNotNull(lock1); + Assert.assertTrue(fs.exists(lock1.getLockFile())); + + ArrayList<String> lines = readTextFile(lock1.getLockFile()); + Assert.assertEquals("heartbeats appear to be missing", 1, lines.size()); + + // hearbeat upon it + lock1.heartbeat("1"); + lock1.heartbeat("2"); + lock1.heartbeat("3"); + + lines = readTextFile(lock1.getLockFile()); + Assert.assertEquals("heartbeats appear to be missing", 4, lines.size()); + + lock1.heartbeat("4"); + lock1.heartbeat("5"); + lock1.heartbeat("6"); + + lines = readTextFile(lock1.getLockFile()); + Assert.assertEquals("heartbeats appear to be missing", 7, lines.size()); + + lock1.release(); + lines = readTextFile(lock1.getLockFile()); + Assert.assertNull(lines); + Assert.assertFalse(fs.exists(lock1.getLockFile())); } - return result; - } - - - @Test - public void testStaleLockDetection_SingleLock() throws Exception { - final int LOCK_EXPIRY_SEC = 1; - final int WAIT_MSEC = 1500; - Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); - fs.create(file1).close(); - FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); - try { - // acquire lock on file1 - Assert.assertNotNull(lock1); - Assert.assertTrue(fs.exists(lock1.getLockFile())); - Thread.sleep(WAIT_MSEC); // wait for lock to expire - HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); - Assert.assertNotNull(expired); - - // heartbeat, ensure its no longer stale and read back the heartbeat data - lock1.heartbeat("1"); - expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1); - Assert.assertNull(expired); - - FileLock.LogEntry lastEntry = lock1.getLastLogEntry(); - Assert.assertNotNull(lastEntry); - Assert.assertEquals("1", lastEntry.fileOffset); - - // wait and check for expiry again - Thread.sleep(WAIT_MSEC); - expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); - Assert.assertNotNull(expired); - } finally { - lock1.release(); - fs.delete(file1, false); + + @Test + public void testConcurrentLocking() throws IOException, InterruptedException { + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + fs.create(file1).close(); + + FileLockingThread[] threads = null; + try { + threads = startThreads(100, file1, locksDir); + for (FileLockingThread thd : threads) { + thd.join(30_000); + Assert.assertTrue(thd.getName() + " did not exit cleanly", thd.cleanExit); + } + + Path lockFile = new Path(locksDir + Path.SEPARATOR + file1.getName()); + Assert.assertFalse(fs.exists(lockFile)); + } finally { + if (threads != null) { + for (FileLockingThread thread : threads) { + thread.interrupt(); + thread.join(30_000); + if (thread.isAlive()) { + throw new RuntimeException("Failed to stop threads within 30 seconds, threads may leak into other tests"); + } + } + } + } } - } - - @Test - public void testStaleLockDetection_MultipleLocks() throws Exception { - final int LOCK_EXPIRY_SEC = 1; - final int WAIT_MSEC = 1500; - Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); - Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); - Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); - - fs.create(file1).close(); - fs.create(file2).close(); - fs.create(file3).close(); - - // 1) acquire locks on file1,file2,file3 - FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); - FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2"); - FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3"); - Assert.assertNotNull(lock1); - Assert.assertNotNull(lock2); - Assert.assertNotNull(lock3); - - try { - HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); - Assert.assertNull(expired); - - // 2) wait for all 3 locks to expire then heart beat on 2 locks and verify stale lock - Thread.sleep(WAIT_MSEC); - lock1.heartbeat("1"); - lock2.heartbeat("1"); - - expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); - Assert.assertNotNull(expired); - Assert.assertEquals("spout3", expired.getValue().componentID); - } finally { - lock1.release(); - lock2.release(); - lock3.release(); - fs.delete(file1, false); - fs.delete(file2, false); - fs.delete(file3, false); + + private FileLockingThread[] startThreads(int thdCount, Path fileToLock, Path locksDir) + throws IOException { + FileLockingThread[] result = new FileLockingThread[thdCount]; + for (int i = 0; i < thdCount; i++) { + result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, "spout" + Integer.toString(i)); + } + + for (FileLockingThread thd : result) { + thd.start(); + } + return result; } - } - - @Test - public void testLockRecovery() throws Exception { - final int LOCK_EXPIRY_SEC = 1; - final int WAIT_MSEC = LOCK_EXPIRY_SEC*1000 + 500; - Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); - Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); - Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); - - fs.create(file1).close(); - fs.create(file2).close(); - fs.create(file3).close(); - - // 1) acquire locks on file1,file2,file3 - FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); - FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2"); - FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3"); - Assert.assertNotNull(lock1); - Assert.assertNotNull(lock2); - Assert.assertNotNull(lock3); - - try { - HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); - Assert.assertNull(expired); - - // 1) Simulate lock file lease expiring and getting closed by HDFS - closeUnderlyingLockFile(lock3); - - // 2) wait for all 3 locks to expire then heart beat on 2 locks - Thread.sleep(WAIT_MSEC*2); // wait for locks to expire - lock1.heartbeat("1"); - lock2.heartbeat("1"); - - // 3) Take ownership of stale lock - FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1"); - Assert.assertNotNull(lock3b); - Assert.assertEquals("Expected lock3 file", Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), lock3.getLockFile()); - } finally { - lock1.release(); - lock2.release(); - lock3.release(); - fs.delete(file1, false); - fs.delete(file2, false); - try { - fs.delete(file3, false); - } catch (Exception e) { - e.printStackTrace(); - } + + @Test + public void testStaleLockDetection_SingleLock() throws Exception { + final int LOCK_EXPIRY_SEC = 1; + final int WAIT_MSEC = 1500; + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + fs.create(file1).close(); + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + try { + // acquire lock on file1 + Assert.assertNotNull(lock1); + Assert.assertTrue(fs.exists(lock1.getLockFile())); + Thread.sleep(WAIT_MSEC); // wait for lock to expire + HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNotNull(expired); + + // heartbeat, ensure its no longer stale and read back the heartbeat data + lock1.heartbeat("1"); + expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1); + Assert.assertNull(expired); + + FileLock.LogEntry lastEntry = lock1.getLastLogEntry(); + Assert.assertNotNull(lastEntry); + Assert.assertEquals("1", lastEntry.fileOffset); + + // wait and check for expiry again + Thread.sleep(WAIT_MSEC); + expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNotNull(expired); + } finally { + lock1.release(); + fs.delete(file1, false); + } } - } - - public static void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { - Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile"); - m.setAccessible(true); - m.invoke(lock); - } - - /** return null if file not found */ - private ArrayList<String> readTextFile(Path file) throws IOException { - FSDataInputStream os = null; - try { - os = fs.open(file); - if (os == null) { - return null; - } - BufferedReader reader = new BufferedReader(new InputStreamReader(os)); - ArrayList<String> lines = new ArrayList<>(); - for (String line = reader.readLine(); line != null; line = reader.readLine()) { - lines.add(line); - } - return lines; - } catch( FileNotFoundException e) { - return null; - } finally { - if(os!=null) { - os.close(); - } + + @Test + public void testStaleLockDetection_MultipleLocks() throws Exception { + final int LOCK_EXPIRY_SEC = 1; + final int WAIT_MSEC = 1500; + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); + Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); + + fs.create(file1).close(); + fs.create(file2).close(); + fs.create(file3).close(); + + // 1) acquire locks on file1,file2,file3 + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2"); + FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3"); + Assert.assertNotNull(lock1); + Assert.assertNotNull(lock2); + Assert.assertNotNull(lock3); + + try { + HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNull(expired); + + // 2) wait for all 3 locks to expire then heart beat on 2 locks and verify stale lock + Thread.sleep(WAIT_MSEC); + lock1.heartbeat("1"); + lock2.heartbeat("1"); + + expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNotNull(expired); + Assert.assertEquals("spout3", expired.getValue().componentID); + } finally { + lock1.release(); + lock2.release(); + lock3.release(); + fs.delete(file1, false); + fs.delete(file2, false); + fs.delete(file3, false); + } } - } - class FileLockingThread extends Thread { + @Test + public void testLockRecovery() throws Exception { + final int LOCK_EXPIRY_SEC = 1; + final int WAIT_MSEC = LOCK_EXPIRY_SEC * 1000 + 500; + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); + Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); + + fs.create(file1).close(); + fs.create(file2).close(); + fs.create(file3).close(); + + // 1) acquire locks on file1,file2,file3 + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2"); + FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3"); + Assert.assertNotNull(lock1); + Assert.assertNotNull(lock2); + Assert.assertNotNull(lock3); - private int thdNum; - private final FileSystem fs; - public boolean cleanExit = false; - private Path fileToLock; - private Path locksDir; - private String spoutId; + try { + HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNull(expired); + + // 1) Simulate lock file lease expiring and getting closed by HDFS + closeUnderlyingLockFile(lock3); + + // 2) wait for all 3 locks to expire then heart beat on 2 locks + Thread.sleep(WAIT_MSEC * 2); // wait for locks to expire + lock1.heartbeat("1"); + lock2.heartbeat("1"); + + // 3) Take ownership of stale lock + FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1"); + Assert.assertNotNull(lock3b); + Assert.assertEquals("Expected lock3 file", Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), lock3.getLockFile()); + } finally { + lock1.release(); + lock2.release(); + lock3.release(); + fs.delete(file1, false); + fs.delete(file2, false); + try { + fs.delete(file3, false); + } catch (Exception e) { + e.printStackTrace(); + } + } + } - public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, Path locksDir, String spoutId) - throws IOException { - this.thdNum = thdNum; - this.fs = fs; - this.fileToLock = fileToLock; - this.locksDir = locksDir; - this.spoutId = spoutId; + public static void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { + Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile"); + m.setAccessible(true); + m.invoke(lock); } - @Override - public void run() { - Thread.currentThread().setName("FileLockingThread-" + thdNum); - FileLock lock = null; - try { - do { - System.err.println("Trying lock - " + getName()); - lock = FileLock.tryLock(fs, this.fileToLock, this.locksDir, spoutId); - System.err.println("Acquired lock - " + getName()); - if(lock==null) { - System.out.println("Retrying lock - " + getName()); - } - } while (lock==null); - cleanExit= true; - } catch (Exception e) { - e.printStackTrace(); - } - finally { + /** + * return null if file not found + */ + private ArrayList<String> readTextFile(Path file) throws IOException { + FSDataInputStream os = null; try { - if(lock!=null) { - lock.release(); - System.err.println("Released lock - " + getName()); - } - } catch (IOException e) { - e.printStackTrace(System.err); + os = fs.open(file); + if (os == null) { + return null; + } + BufferedReader reader = new BufferedReader(new InputStreamReader(os)); + ArrayList<String> lines = new ArrayList<>(); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + lines.add(line); + } + return lines; + } catch (FileNotFoundException e) { + return null; + } finally { + if (os != null) { + os.close(); + } + } + } + + class FileLockingThread extends Thread { + + private int thdNum; + private final FileSystem fs; + public boolean cleanExit = false; + private Path fileToLock; + private Path locksDir; + private String spoutId; + + public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, Path locksDir, String spoutId) + throws IOException { + this.thdNum = thdNum; + this.fs = fs; + this.fileToLock = fileToLock; + this.locksDir = locksDir; + this.spoutId = spoutId; } - } - System.err.println("Thread exiting - " + getName()); - } // run() - } // class FileLockingThread + @Override + public void run() { + Thread.currentThread().setName("FileLockingThread-" + thdNum); + FileLock lock = null; + try { + do { + System.err.println("Trying lock - " + getName()); + lock = FileLock.tryLock(fs, this.fileToLock, this.locksDir, spoutId); + System.err.println("Acquired lock - " + getName()); + if (lock == null) { + System.out.println("Retrying lock - " + getName()); + } + } while (lock == null && !Thread.currentThread().isInterrupted()); + cleanExit = true; + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (lock != null) { + lock.release(); + System.err.println("Released lock - " + getName()); + } + } catch (IOException e) { + e.printStackTrace(System.err); + } + } + System.err.println("Thread exiting - " + getName()); + } // run() + + } // class FileLockingThread } http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java index 6628cc9..33432d9 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -9,196 +10,192 @@ * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ - package org.apache.storm.hdfs.spout; -import org.apache.hadoop.conf.Configuration; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.ipc.RemoteException; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.Rule; public class TestHdfsSemantics { - static MiniDFSCluster.Builder builder; - static MiniDFSCluster hdfsCluster; - static FileSystem fs; - static String hdfsURI; - static HdfsConfiguration conf = new HdfsConfiguration(); - - private Path dir = new Path("/tmp/filesdir"); - - @BeforeClass - public static void setupClass() throws IOException { - conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); - builder = new MiniDFSCluster.Builder(new Configuration()); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; - } - - @AfterClass - public static void teardownClass() throws IOException { - fs.close(); - hdfsCluster.shutdown(); - } - - @Before - public void setUp() throws Exception { - assert fs.mkdirs(dir) ; - } - - @After - public void tearDown() throws Exception { - fs.delete(dir, true); - } - - - @Test - public void testDeleteSemantics() throws Exception { - Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); -// try { - // 1) Delete absent file - should return false - Assert.assertFalse(fs.exists(file)); - try { - Assert.assertFalse(fs.delete(file, false)); - } catch (IOException e) { - e.printStackTrace(); - } + @Rule + public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(); - // 2) deleting open file - should return true - fs.create(file, false); - Assert.assertTrue(fs.delete(file, false)); - - // 3) deleting closed file - should return true - FSDataOutputStream os = fs.create(file, false); - os.close(); - Assert.assertTrue(fs.exists(file)); - Assert.assertTrue(fs.delete(file, false)); - Assert.assertFalse(fs.exists(file)); - } - - @Test - public void testConcurrentDeletion() throws Exception { - Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); - fs.create(file).close(); - // 1 concurrent deletion - only one thread should succeed - FileDeletionThread[] thds = startThreads(10, file); - int successCount=0; - for (FileDeletionThread thd : thds) { - thd.join(); - if( thd.succeeded) - successCount++; - if(thd.exception!=null) - Assert.assertNotNull(thd.exception); - } - System.err.println(successCount); - Assert.assertEquals(1, successCount); - - } - - @Test - public void testAppendSemantics() throws Exception { - //1 try to append to an open file - Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); - FSDataOutputStream os1 = fs.create(file1, false); - try { - fs.append(file1); // should fail - Assert.assertTrue("Append did not throw an exception", false); - } catch (RemoteException e) { - // expecting AlreadyBeingCreatedException inside RemoteException - Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass()); + private FileSystem fs; + private final HdfsConfiguration conf = new HdfsConfiguration(); + + private final Path dir = new Path("/tmp/filesdir"); + + @Before + public void setup() throws IOException { + conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, "5000"); + fs = dfsClusterRule.getDfscluster().getFileSystem(); + assert fs.mkdirs(dir); } - //2 try to append to a closed file - os1.close(); - FSDataOutputStream os2 = fs.append(file1); // should pass - os2.close(); - } - - @Test - public void testDoubleCreateSemantics() throws Exception { - //1 create an already existing open file w/o override flag - Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); - FSDataOutputStream os1 = fs.create(file1, false); - try { - fs.create(file1, false); // should fail - Assert.assertTrue("Create did not throw an exception", false); - } catch (RemoteException e) { - Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass()); + @After + public void teardown() throws IOException { + fs.delete(dir, true); + fs.close(); } - //2 close file and retry creation - os1.close(); - try { - fs.create(file1, false); // should still fail - } catch (FileAlreadyExistsException e) { - // expecting this exception + + @Test + public void testDeleteSemantics() throws Exception { + Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); +// try { + // 1) Delete absent file - should return false + Assert.assertFalse(fs.exists(file)); + try { + Assert.assertFalse(fs.delete(file, false)); + } catch (IOException e) { + e.printStackTrace(); + } + + // 2) deleting open file - should return true + fs.create(file, false); + Assert.assertTrue(fs.delete(file, false)); + + // 3) deleting closed file - should return true + FSDataOutputStream os = fs.create(file, false); + os.close(); + Assert.assertTrue(fs.exists(file)); + Assert.assertTrue(fs.delete(file, false)); + Assert.assertFalse(fs.exists(file)); } - //3 delete file and retry creation - fs.delete(file1, false); - FSDataOutputStream os2 = fs.create(file1, false); // should pass - Assert.assertNotNull(os2); - os2.close(); - } + @Test + public void testConcurrentDeletion() throws Exception { + Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); + fs.create(file).close(); + // 1 concurrent deletion - only one thread should succeed + FileDeletionThread[] threads = null; + try { + threads = startThreads(10, file); + int successCount = 0; + for (FileDeletionThread thd : threads) { + thd.join(30_000); + if (thd.succeeded) { + successCount++; + } + if (thd.exception != null) { + Assert.assertNotNull(thd.exception); + } + } + System.err.println(successCount); + Assert.assertEquals(1, successCount); + } finally { + if (threads != null) { + for (FileDeletionThread thread : threads) { + thread.interrupt(); + thread.join(30_000); + if (thread.isAlive()) { + throw new RuntimeException("Failed to stop threads within 30 seconds, threads may leak into other tests"); + } + } + } + } + } + @Test + public void testAppendSemantics() throws Exception { + //1 try to append to an open file + Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); + try (FSDataOutputStream os1 = fs.create(file1, false)) { + fs.append(file1); // should fail + fail("Append did not throw an exception"); + } catch (RemoteException e) { + // expecting AlreadyBeingCreatedException inside RemoteException + Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass()); + } + + //2 try to append to a closed file + try (FSDataOutputStream os2 = fs.append(file1)) { + assertNotNull(os2); + } + } - private FileDeletionThread[] startThreads(int thdCount, Path file) - throws IOException { - FileDeletionThread[] result = new FileDeletionThread[thdCount]; - for (int i = 0; i < thdCount; i++) { - result[i] = new FileDeletionThread(i, fs, file); + @Test + public void testDoubleCreateSemantics() throws Exception { + //1 create an already existing open file w/o override flag + Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); + try (FSDataOutputStream os1 = fs.create(file1, false)) { + fs.create(file1, false); // should fail + fail("Create did not throw an exception"); + } catch (RemoteException e) { + Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass()); + } + //2 close file and retry creation + try { + fs.create(file1, false); // should still fail + fail("Create did not throw an exception"); + } catch (FileAlreadyExistsException e) { + // expecting this exception + } + + //3 delete file and retry creation + fs.delete(file1, false); + try (FSDataOutputStream os2 = fs.create(file1, false)) { + Assert.assertNotNull(os2); + } } - for (FileDeletionThread thd : result) { - thd.start(); + private FileDeletionThread[] startThreads(int thdCount, Path file) + throws IOException { + FileDeletionThread[] result = new FileDeletionThread[thdCount]; + for (int i = 0; i < thdCount; i++) { + result[i] = new FileDeletionThread(i, fs, file); + } + + for (FileDeletionThread thd : result) { + thd.start(); + } + return result; } - return result; - } - private static class FileDeletionThread extends Thread { + private static class FileDeletionThread extends Thread { - private final int thdNum; - private final FileSystem fs; - private final Path file; - public boolean succeeded; - public Exception exception = null; + private final int thdNum; + private final FileSystem fs; + private final Path file; + public boolean succeeded; + public Exception exception = null; - public FileDeletionThread(int thdNum, FileSystem fs, Path file) + public FileDeletionThread(int thdNum, FileSystem fs, Path file) throws IOException { - this.thdNum = thdNum; - this.fs = fs; - this.file = file; - } - - @Override - public void run() { - Thread.currentThread().setName("FileDeletionThread-" + thdNum); - try { - succeeded = fs.delete(file, false); - } catch (Exception e) { - exception = e; - } - } // run() - - } // class FileLockingThread + this.thdNum = thdNum; + this.fs = fs; + this.file = file; + } + + @Override + public void run() { + Thread.currentThread().setName("FileDeletionThread-" + thdNum); + try { + succeeded = fs.delete(file, false); + } catch (Exception e) { + exception = e; + } + } // run() + + } // class FileLockingThread }
