Repository: storm Updated Branches: refs/heads/1.1.x-branch ed005ab5f -> a0308efd6
http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index f60cbf3..057787d 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -15,14 +16,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.storm.hdfs.spout; import org.apache.storm.Config; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.storm.hdfs.common.HdfsUtils; @@ -57,703 +56,707 @@ import java.util.List; import java.util.Map; import org.apache.storm.hdfs.common.HdfsUtils.Pair; - +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.ClassRule; public class TestHdfsSpout { - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - public File baseFolder; - private Path source; - private Path archive; - private Path badfiles; - - - public TestHdfsSpout() { - } - - static MiniDFSCluster.Builder builder; - static MiniDFSCluster hdfsCluster; - static DistributedFileSystem fs; - static String hdfsURI; - static Configuration conf = new Configuration(); - - @BeforeClass - public static void setupClass() throws IOException { - builder = new MiniDFSCluster.Builder(new Configuration()); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; - } - - @AfterClass - public static void teardownClass() throws IOException { - fs.close(); - hdfsCluster.shutdown(); - } - - - @Before - public void setup() throws Exception { - baseFolder = tempFolder.newFolder("hdfsspout"); - source = new Path(baseFolder.toString() + "/source"); - fs.mkdirs(source); - archive = new Path(baseFolder.toString() + "/archive"); - fs.mkdirs(archive); - badfiles = new Path(baseFolder.toString() + "/bad"); - fs.mkdirs(badfiles); - } - - @After - public void shutDown() throws IOException { - fs.delete(new Path(baseFolder.toString()), true); - } - - @Test - public void testSimpleText_noACK() throws IOException { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 5); - - Path file2 = new Path(source.toString() + "/file2.txt"); - createTextFile(file2, 5); - - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(1); - spout.setCommitFrequencySec(1); - - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); - - runSpout(spout,"r11"); - - Path arc1 = new Path(archive.toString() + "/file1.txt"); - Path arc2 = new Path(archive.toString() + "/file2.txt"); - checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); - } - - @Test - public void testSimpleText_ACK() throws IOException { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 5); - - Path file2 = new Path(source.toString() + "/file2.txt"); - createTextFile(file2, 5); - - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(1); - spout.setCommitFrequencySec(1); - - Map conf = getCommonConfigs(); - conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing - openSpout(spout, 0, conf); - - // consume file 1 - runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4"); - Path arc1 = new Path(archive.toString() + "/file1.txt"); - checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1); - - // consume file 2 - runSpout(spout, "r6", "a5", "a6", "a7", "a8", "a9"); - Path arc2 = new Path(archive.toString() + "/file2.txt"); - checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); - } - - @Test - public void testResumeAbandoned_Text_NoAck() throws Exception { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 6); - - final Integer lockExpirySec = 1; - - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(1); - spout.setCommitFrequencySec(1000); // effectively disable commits based on time - spout.setLockTimeoutSec(lockExpirySec); - - - HdfsSpout spout2 = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout2.setCommitFrequencyCount(1); - spout2.setCommitFrequencySec(1000); // effectively disable commits based on time - spout2.setLockTimeoutSec(lockExpirySec); - - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); - openSpout(spout2, 1, conf); - - // consume file 1 partially - List<String> res = runSpout(spout, "r2"); - Assert.assertEquals(2, res.size()); - - // abandon file - FileLock lock = getField(spout, "lock"); - TestFileLock.closeUnderlyingLockFile(lock); - Thread.sleep(lockExpirySec * 2 * 1000); - - // check lock file presence - Assert.assertTrue(fs.exists(lock.getLockFile())); - - // create another spout to take over processing and read a few lines - List<String> res2 = runSpout(spout2, "r3"); - Assert.assertEquals(3, res2.size()); - - // check lock file presence - Assert.assertTrue(fs.exists(lock.getLockFile())); - - // check lock file contents - List<String> contents = readTextFile(fs, lock.getLockFile().toString()); - Assert.assertFalse(contents.isEmpty()); - - // finish up reading the file - res2 = runSpout(spout2, "r2"); - Assert.assertEquals(4, res2.size()); - - // check lock file is gone - Assert.assertFalse(fs.exists(lock.getLockFile())); - FileReader rdr = getField(spout2, "reader"); - Assert.assertNull(rdr); - Assert.assertTrue(getBoolField(spout2, "fileReadCompletely")); + @ClassRule + public static MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule(); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + public File baseFolder; - } - - @Test - public void testResumeAbandoned_Seq_NoAck() throws Exception { - Path file1 = new Path(source.toString() + "/file1.seq"); - createSeqFile(fs, file1, 6); - - final Integer lockExpirySec = 1; - - HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); - spout.setCommitFrequencyCount(1); - spout.setCommitFrequencySec(1000); // effectively disable commits based on time - spout.setLockTimeoutSec(lockExpirySec); - - - HdfsSpout spout2 = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); - spout2.setCommitFrequencyCount(1); - spout2.setCommitFrequencySec(1000); // effectively disable commits based on time - spout2.setLockTimeoutSec(lockExpirySec); - - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); - openSpout(spout2, 1, conf); - - // consume file 1 partially - List<String> res = runSpout(spout, "r2"); - Assert.assertEquals(2, res.size()); - // abandon file - FileLock lock = getField(spout, "lock"); - TestFileLock.closeUnderlyingLockFile(lock); - Thread.sleep(lockExpirySec * 2 * 1000); - - // check lock file presence - Assert.assertTrue(fs.exists(lock.getLockFile())); - - // create another spout to take over processing and read a few lines - List<String> res2 = runSpout(spout2, "r3"); - Assert.assertEquals(3, res2.size()); - - // check lock file presence - Assert.assertTrue(fs.exists(lock.getLockFile())); - - // check lock file contents - List<String> contents = getTextFileContents(fs, lock.getLockFile()); - Assert.assertFalse(contents.isEmpty()); - - // finish up reading the file - res2 = runSpout(spout2, "r3"); - Assert.assertEquals(4, res2.size()); + private Path source; + private Path archive; + private Path badfiles; - // check lock file is gone - Assert.assertFalse(fs.exists(lock.getLockFile())); - FileReader rdr = getField(spout2, "reader"); - Assert.assertNull( rdr ); - Assert.assertTrue(getBoolField(spout2, "fileReadCompletely")); - } + private static DistributedFileSystem fs; + private static final Configuration conf = new Configuration(); - private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException { - ArrayList<String> expected = new ArrayList<>(); - for (Path txtFile : txtFiles) { - List<String> lines= getTextFileContents(fs, txtFile); - expected.addAll(lines); + @BeforeClass + public static void setupClass() throws IOException { + fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem(); } - List<String> actual = new ArrayList<>(); - for (Pair<HdfsSpout.MessageId, List<Object>> item : collector.items) { - actual.add(item.getValue().get(0).toString()); + @AfterClass + public static void teardownClass() throws IOException { + fs.close(); } - Assert.assertEquals(expected, actual); - } - private List<String> getTextFileContents(FileSystem fs, Path txtFile) throws IOException { - ArrayList<String> result = new ArrayList<>(); - FSDataInputStream istream = fs.open(txtFile); - InputStreamReader isreader = new InputStreamReader(istream,"UTF-8"); - BufferedReader reader = new BufferedReader(isreader); + @Before + public void setup() throws Exception { + baseFolder = tempFolder.newFolder("hdfsspout"); + source = new Path(baseFolder.toString() + "/source"); + fs.mkdirs(source); + archive = new Path(baseFolder.toString() + "/archive"); + fs.mkdirs(archive); + badfiles = new Path(baseFolder.toString() + "/bad"); + fs.mkdirs(badfiles); + } - for( String line = reader.readLine(); line!=null; line = reader.readLine() ) { - result.add(line); + @After + public void shutDown() throws IOException { + fs.delete(new Path(baseFolder.toString()), true); } - isreader.close(); - return result; - } + @Test + public void testSimpleText_noACK() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 5); - private void checkCollectorOutput_seq(MockCollector collector, Path... seqFiles) throws IOException { - ArrayList<String> expected = new ArrayList<>(); - for (Path seqFile : seqFiles) { - List<String> lines= getSeqFileContents(fs, seqFile); - expected.addAll(lines); - } - Assert.assertTrue(expected.equals(collector.lines)); - } - - private List<String> getSeqFileContents(FileSystem fs, Path... seqFiles) throws IOException { - ArrayList<String> result = new ArrayList<>(); - - for (Path seqFile : seqFiles) { - Path file = new Path(fs.getUri().toString() + seqFile.toString()); - SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file)); - try { - Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value)) { - String keyValStr = Arrays.asList(key, value).toString(); - result.add(keyValStr); + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 5); + + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(1); + spout.setCommitFrequencySec(1); + + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); + + runSpout(spout, "r11"); + + Path arc1 = new Path(archive.toString() + "/file1.txt"); + Path arc2 = new Path(archive.toString() + "/file2.txt"); + checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); } - } finally { - reader.close(); - } - }// for - return result; - } - - private List<String> listDir(Path p) throws IOException { - ArrayList<String> result = new ArrayList<>(); - RemoteIterator<LocatedFileStatus> fileNames = fs.listFiles(p, false); - while ( fileNames.hasNext() ) { - LocatedFileStatus fileStatus = fileNames.next(); - result.add(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()); } - return result; - } + @Test + public void testSimpleText_ACK() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 5); - @Test - public void testMultipleFileConsumption_Ack() throws Exception { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 5); + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 5); + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(1); + spout.setCommitFrequencySec(1); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(1); - spout.setCommitFrequencySec(1); + Map conf = getCommonConfigs(); + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing + openSpout(spout, 0, conf); - Map conf = getCommonConfigs(); - conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing - openSpout(spout, 0, conf); + // consume file 1 + runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4"); + Path arc1 = new Path(archive.toString() + "/file1.txt"); + checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1); - // read few lines from file1 dont ack - runSpout(spout, "r3"); - FileReader reader = getField(spout, "reader"); - Assert.assertNotNull(reader); - Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + // consume file 2 + runSpout(spout, "r6", "a5", "a6", "a7", "a8", "a9"); + Path arc2 = new Path(archive.toString() + "/file2.txt"); + checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); + } + } - // read remaining lines - runSpout(spout, "r3"); - reader = getField(spout, "reader"); - Assert.assertNotNull(reader); - Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely") ); + @Test + public void testResumeAbandoned_Text_NoAck() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 6); + + final Integer lockExpirySec = 1; + + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(1); + spout.setCommitFrequencySec(1000); // effectively disable commits based on time + spout.setLockTimeoutSec(lockExpirySec); + + try (AutoCloseableHdfsSpout closeableSpout2 = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout2 = closeableSpout2.spout; + spout2.setCommitFrequencyCount(1); + spout2.setCommitFrequencySec(1000); // effectively disable commits based on time + spout2.setLockTimeoutSec(lockExpirySec); + + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); + openSpout(spout2, 1, conf); + + // consume file 1 partially + List<String> res = runSpout(spout, "r2"); + Assert.assertEquals(2, res.size()); + + // abandon file + FileLock lock = getField(spout, "lock"); + TestFileLock.closeUnderlyingLockFile(lock); + Thread.sleep(lockExpirySec * 2 * 1000); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // create another spout to take over processing and read a few lines + List<String> res2 = runSpout(spout2, "r3"); + Assert.assertEquals(3, res2.size()); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // check lock file contents + List<String> contents = readTextFile(fs, lock.getLockFile().toString()); + Assert.assertFalse(contents.isEmpty()); + + // finish up reading the file + res2 = runSpout(spout2, "r2"); + Assert.assertEquals(4, res2.size()); + + // check lock file is gone + Assert.assertFalse(fs.exists(lock.getLockFile())); + FileReader rdr = getField(spout2, "reader"); + Assert.assertNull(rdr); + Assert.assertTrue(getBoolField(spout2, "fileReadCompletely")); + } + } + } - // ack few - runSpout(spout, "a0", "a1", "a2"); - reader = getField(spout, "reader"); - Assert.assertNotNull(reader); - Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + @Test + public void testResumeAbandoned_Seq_NoAck() throws Exception { + Path file1 = new Path(source.toString() + "/file1.seq"); + createSeqFile(fs, file1, 6); + + final Integer lockExpirySec = 1; + + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(1); + spout.setCommitFrequencySec(1000); // effectively disable commits based on time + spout.setLockTimeoutSec(lockExpirySec); + + try (AutoCloseableHdfsSpout closeableSpout2 = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields)) { + HdfsSpout spout2 = closeableSpout2.spout; + spout2.setCommitFrequencyCount(1); + spout2.setCommitFrequencySec(1000); // effectively disable commits based on time + spout2.setLockTimeoutSec(lockExpirySec); + + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); + openSpout(spout2, 1, conf); + + // consume file 1 partially + List<String> res = runSpout(spout, "r2"); + Assert.assertEquals(2, res.size()); + // abandon file + FileLock lock = getField(spout, "lock"); + TestFileLock.closeUnderlyingLockFile(lock); + Thread.sleep(lockExpirySec * 2 * 1000); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // create another spout to take over processing and read a few lines + List<String> res2 = runSpout(spout2, "r3"); + Assert.assertEquals(3, res2.size()); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // check lock file contents + List<String> contents = getTextFileContents(fs, lock.getLockFile()); + Assert.assertFalse(contents.isEmpty()); + + // finish up reading the file + res2 = runSpout(spout2, "r3"); + Assert.assertEquals(4, res2.size()); + + // check lock file is gone + Assert.assertFalse(fs.exists(lock.getLockFile())); + FileReader rdr = getField(spout2, "reader"); + Assert.assertNull(rdr); + Assert.assertTrue(getBoolField(spout2, "fileReadCompletely")); + } + } + } - //ack rest - runSpout(spout, "a3", "a4"); - reader = getField(spout, "reader"); - Assert.assertNull(reader); - Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException { + ArrayList<String> expected = new ArrayList<>(); + for (Path txtFile : txtFiles) { + List<String> lines = getTextFileContents(fs, txtFile); + expected.addAll(lines); + } + List<String> actual = new ArrayList<>(); + for (Pair<HdfsSpout.MessageId, List<Object>> item : collector.items) { + actual.add(item.getValue().get(0).toString()); + } + Assert.assertEquals(expected, actual); + } - // go to next file - Path file2 = new Path(source.toString() + "/file2.txt"); - createTextFile(file2, 5); + private List<String> getTextFileContents(FileSystem fs, Path txtFile) throws IOException { + ArrayList<String> result = new ArrayList<>(); + FSDataInputStream istream = fs.open(txtFile); + InputStreamReader isreader = new InputStreamReader(istream, "UTF-8"); + BufferedReader reader = new BufferedReader(isreader); - // Read 1 line - runSpout(spout, "r1"); - Assert.assertNotNull(getField(spout, "reader")); - Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + result.add(line); + } + isreader.close(); + return result; + } - // ack 1 tuple - runSpout(spout, "a5"); - Assert.assertNotNull(getField(spout, "reader")); - Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + private void checkCollectorOutput_seq(MockCollector collector, Path... seqFiles) throws IOException { + ArrayList<String> expected = new ArrayList<>(); + for (Path seqFile : seqFiles) { + List<String> lines = getSeqFileContents(fs, seqFile); + expected.addAll(lines); + } + Assert.assertTrue(expected.equals(collector.lines)); + } + private List<String> getSeqFileContents(FileSystem fs, Path... seqFiles) throws IOException { + ArrayList<String> result = new ArrayList<>(); + + for (Path seqFile : seqFiles) { + Path file = new Path(fs.getUri().toString() + seqFile.toString()); + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file)); + try { + Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + while (reader.next(key, value)) { + String keyValStr = Arrays.asList(key, value).toString(); + result.add(keyValStr); + } + } finally { + reader.close(); + } + }// for + return result; + } - // read and ack remaining lines - runSpout(spout, "r5", "a6", "a7", "a8", "a9"); - Assert.assertNull(getField(spout, "reader")); - Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); - } + private List<String> listDir(Path p) throws IOException { + ArrayList<String> result = new ArrayList<>(); + RemoteIterator<LocatedFileStatus> fileNames = fs.listFiles(p, false); + while (fileNames.hasNext()) { + LocatedFileStatus fileStatus = fileNames.next(); + result.add(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()); + } + return result; + } - private static <T> T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { - Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); - readerFld.setAccessible(true); - return (T) readerFld.get(spout); - } + @Test + public void testMultipleFileConsumption_Ack() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 5); + + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(1); + spout.setCommitFrequencySec(1); + + Map conf = getCommonConfigs(); + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing + openSpout(spout, 0, conf); + + // read few lines from file1 dont ack + runSpout(spout, "r3"); + FileReader reader = getField(spout, "reader"); + Assert.assertNotNull(reader); + Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + + // read remaining lines + runSpout(spout, "r3"); + reader = getField(spout, "reader"); + Assert.assertNotNull(reader); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + + // ack few + runSpout(spout, "a0", "a1", "a2"); + reader = getField(spout, "reader"); + Assert.assertNotNull(reader); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + + //ack rest + runSpout(spout, "a3", "a4"); + reader = getField(spout, "reader"); + Assert.assertNull(reader); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + + // go to next file + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 5); + + // Read 1 line + runSpout(spout, "r1"); + Assert.assertNotNull(getField(spout, "reader")); + Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + + // ack 1 tuple + runSpout(spout, "a5"); + Assert.assertNotNull(getField(spout, "reader")); + Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + + // read and ack remaining lines + runSpout(spout, "r5", "a6", "a7", "a8", "a9"); + Assert.assertNull(getField(spout, "reader")); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + } + } - private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { - Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); - readerFld.setAccessible(true); - return readerFld.getBoolean(spout); - } + private static <T> T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); + readerFld.setAccessible(true); + return (T) readerFld.get(spout); + } + private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); + readerFld.setAccessible(true); + return readerFld.getBoolean(spout); + } - @Test - public void testSimpleSequenceFile() throws IOException { - //1) create a couple files to consume - source = new Path("/tmp/hdfsspout/source"); - fs.mkdirs(source); - archive = new Path("/tmp/hdfsspout/archive"); - fs.mkdirs(archive); + @Test + public void testSimpleSequenceFile() throws Exception { + //1) create a couple files to consume + source = new Path("/tmp/hdfsspout/source"); + fs.mkdirs(source); + archive = new Path("/tmp/hdfsspout/archive"); + fs.mkdirs(archive); - Path file1 = new Path(source + "/file1.seq"); - createSeqFile(fs, file1, 5); + Path file1 = new Path(source + "/file1.seq"); + createSeqFile(fs, file1, 5); - Path file2 = new Path(source + "/file2.seq"); - createSeqFile(fs, file2, 5); + Path file2 = new Path(source + "/file2.seq"); + createSeqFile(fs, file2, 5); + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); - HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); + // consume both files + List<String> res = runSpout(spout, "r11"); + Assert.assertEquals(10, res.size()); - // consume both files - List<String> res = runSpout(spout, "r11"); - Assert.assertEquals(10, res.size()); + Assert.assertEquals(2, listDir(archive).size()); - Assert.assertEquals(2, listDir(archive).size()); + Path f1 = new Path(archive + "/file1.seq"); + Path f2 = new Path(archive + "/file2.seq"); + checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2); + } + } - Path f1 = new Path(archive + "/file1.seq"); - Path f2 = new Path(archive + "/file2.seq"); + @Test + public void testReadFailures() throws Exception { + // 1) create couple of input files to read + Path file1 = new Path(source.toString() + "/file1.txt"); + Path file2 = new Path(source.toString() + "/file2.txt"); + + createTextFile(file1, 6); + createTextFile(file2, 7); + Assert.assertEquals(2, listDir(source).size()); + + // 2) run spout + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); + + List<String> res = runSpout(spout, "r11"); + String[] expected = new String[]{"[line 0]", "[line 1]", "[line 2]", "[line 0]", "[line 1]", "[line 2]"}; + Assert.assertArrayEquals(expected, res.toArray()); + + // 3) make sure 6 lines (3 from each file) were read in all + Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 6); + ArrayList<Path> badFiles = HdfsUtils.listFilesByModificationTime(fs, badfiles, 0); + Assert.assertEquals(badFiles.size(), 2); + } + } - checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2); - } + // check lock creation/deletion and contents + @Test + public void testLocking() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 10); + + // 0) config spout to log progress in lock file for each tuple + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(1); + spout.setCommitFrequencySec(1000); // effectively disable commits based on time + + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); + + // 1) read initial lines in file, then check if lock exists + List<String> res = runSpout(spout, "r5"); + Assert.assertEquals(5, res.size()); + List<String> lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(1, lockFiles.size()); + + // 2) check log file content line count == tuples emitted + 1 + List<String> lines = readTextFile(fs, lockFiles.get(0)); + Assert.assertEquals(lines.size(), res.size() + 1); + + // 3) read remaining lines in file, then ensure lock is gone + runSpout(spout, "r6"); + lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(0, lockFiles.size()); + + // 4) --- Create another input file and reverify same behavior --- + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 10); + + // 5) read initial lines in file, then check if lock exists + res = runSpout(spout, "r5"); + Assert.assertEquals(15, res.size()); + lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(1, lockFiles.size()); + + // 6) check log file content line count == tuples emitted + 1 + lines = readTextFile(fs, lockFiles.get(0)); + Assert.assertEquals(6, lines.size()); + + // 7) read remaining lines in file, then ensure lock is gone + runSpout(spout, "r6"); + lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(0, lockFiles.size()); + } + } - @Test - public void testReadFailures() throws Exception { - // 1) create couple of input files to read - Path file1 = new Path(source.toString() + "/file1.txt"); - Path file2 = new Path(source.toString() + "/file2.txt"); - - createTextFile(file1, 6); - createTextFile(file2, 7); - Assert.assertEquals(2, listDir(source).size()); - - // 2) run spout - HdfsSpout spout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields); - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); + @Test + public void testLockLoggingFreqCount() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 10); - List<String> res = runSpout(spout, "r11"); - String[] expected = new String[] {"[line 0]","[line 1]","[line 2]","[line 0]","[line 1]","[line 2]"}; - Assert.assertArrayEquals(expected, res.toArray()); + // 0) config spout to log progress in lock file for each tuple + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(2); // 1 lock log entry every 2 tuples + spout.setCommitFrequencySec(1000); // Effectively disable commits based on time - // 3) make sure 6 lines (3 from each file) were read in all - Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 6); - ArrayList<Path> badFiles = HdfsUtils.listFilesByModificationTime(fs, badfiles, 0); - Assert.assertEquals(badFiles.size(), 2); - } + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); - // check lock creation/deletion and contents - @Test - public void testLocking() throws Exception { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 10); + // 1) read 5 lines in file, + runSpout(spout, "r5"); - // 0) config spout to log progress in lock file for each tuple + // 2) check log file contents + String lockFile = listDir(spout.getLockDirPath()).get(0); + List<String> lines = readTextFile(fs, lockFile); + Assert.assertEquals(lines.size(), 3); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(1); - spout.setCommitFrequencySec(1000); // effectively disable commits based on time - - - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); - - // 1) read initial lines in file, then check if lock exists - List<String> res = runSpout(spout, "r5"); - Assert.assertEquals(5, res.size()); - List<String> lockFiles = listDir(spout.getLockDirPath()); - Assert.assertEquals(1, lockFiles.size()); - - // 2) check log file content line count == tuples emitted + 1 - List<String> lines = readTextFile(fs, lockFiles.get(0)); - Assert.assertEquals(lines.size(), res.size()+1); - - // 3) read remaining lines in file, then ensure lock is gone - runSpout(spout, "r6"); - lockFiles = listDir(spout.getLockDirPath()); - Assert.assertEquals(0, lockFiles.size()); - - - // 4) --- Create another input file and reverify same behavior --- - Path file2 = new Path(source.toString() + "/file2.txt"); - createTextFile(file2, 10); - - // 5) read initial lines in file, then check if lock exists - res = runSpout(spout, "r5"); - Assert.assertEquals(15, res.size()); - lockFiles = listDir(spout.getLockDirPath()); - Assert.assertEquals(1, lockFiles.size()); - - // 6) check log file content line count == tuples emitted + 1 - lines = readTextFile(fs, lockFiles.get(0)); - Assert.assertEquals(6, lines.size()); - - // 7) read remaining lines in file, then ensure lock is gone - runSpout(spout, "r6"); - lockFiles = listDir(spout.getLockDirPath()); - Assert.assertEquals(0, lockFiles.size()); - } - - @Test - public void testLockLoggingFreqCount() throws Exception { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 10); - - // 0) config spout to log progress in lock file for each tuple - - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(2); // 1 lock log entry every 2 tuples - spout.setCommitFrequencySec(1000); // Effectively disable commits based on time - - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); - - // 1) read 5 lines in file, - runSpout(spout, "r5"); - - // 2) check log file contents - String lockFile = listDir(spout.getLockDirPath()).get(0); - List<String> lines = readTextFile(fs, lockFile); - Assert.assertEquals(lines.size(), 3); - - // 3) read 6th line and see if another log entry was made - runSpout(spout, "r1"); - lines = readTextFile(fs, lockFile); - Assert.assertEquals(lines.size(), 4); - } - - @Test - public void testLockLoggingFreqSec() throws Exception { - Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 10); - - // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); - spout.setCommitFrequencyCount(0); // disable it - spout.setCommitFrequencySec(2); // log every 2 sec - - Map conf = getCommonConfigs(); - openSpout(spout, 0, conf); - - // 1) read 5 lines in file - runSpout(spout, "r5"); - - // 2) check log file contents - String lockFile = listDir(spout.getLockDirPath()).get(0); - List<String> lines = readTextFile(fs, lockFile); - Assert.assertEquals(lines.size(), 1); - Thread.sleep(3000); // allow freq_sec to expire - - // 3) read another line and see if another log entry was made - runSpout(spout, "r1"); - lines = readTextFile(fs, lockFile); - Assert.assertEquals(2, lines.size()); - } - - private static List<String> readTextFile(FileSystem fs, String f) throws IOException { - Path file = new Path(f); - FSDataInputStream x = fs.open(file); - BufferedReader reader = new BufferedReader(new InputStreamReader(x)); - String line = null; - ArrayList<String> result = new ArrayList<>(); - while( (line = reader.readLine()) !=null ) - result.add( line ); - return result; - } - - - private Map getCommonConfigs() { - Map conf = new HashMap(); - conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0"); - return conf; - } - - private HdfsSpout makeSpout(String readerType, String[] outputFields) { - HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields) - .setReaderType(readerType) - .setHdfsUri(hdfsCluster.getURI().toString()) - .setSourceDir(source.toString()) - .setArchiveDir(archive.toString()) - .setBadFilesDir(badfiles.toString()); - - return spout; - } - - private void openSpout(HdfsSpout spout, int spoutId, Map conf) { - MockCollector collector = new MockCollector(); - spout.open(conf, new MockTopologyContext(spoutId), collector); - } - - /** - * Execute a sequence of calls on HdfsSpout. - * - * @param cmds: set of commands to run, - * e.g. "r,r,r,r,a1,f2,...". The commands are: - * r[N] - receive() called N times - * aN - ack, item number: N - * fN - fail, item number: N - */ - - private List<String> runSpout(HdfsSpout spout, String... cmds) { - MockCollector collector = (MockCollector) spout.getCollector(); - for(String cmd : cmds) { - if(cmd.startsWith("r")) { - int count = 1; - if(cmd.length() > 1) { - count = Integer.parseInt(cmd.substring(1)); - } - for(int i=0; i<count; ++i) { - spout.nextTuple(); - } + // 3) read 6th line and see if another log entry was made + runSpout(spout, "r1"); + lines = readTextFile(fs, lockFile); + Assert.assertEquals(lines.size(), 4); } - else if(cmd.startsWith("a")) { - int n = Integer.parseInt(cmd.substring(1)); - Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n); - spout.ack(item.getKey()); + } + + @Test + public void testLockLoggingFreqSec() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 10); + + // 0) config spout to log progress in lock file for each tuple + try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) { + HdfsSpout spout = closeableSpout.spout; + spout.setCommitFrequencyCount(0); // disable it + spout.setCommitFrequencySec(2); // log every 2 sec + + Map conf = getCommonConfigs(); + openSpout(spout, 0, conf); + + // 1) read 5 lines in file + runSpout(spout, "r5"); + + // 2) check log file contents + String lockFile = listDir(spout.getLockDirPath()).get(0); + List<String> lines = readTextFile(fs, lockFile); + Assert.assertEquals(lines.size(), 1); + Thread.sleep(3000); // allow freq_sec to expire + + // 3) read another line and see if another log entry was made + runSpout(spout, "r1"); + lines = readTextFile(fs, lockFile); + Assert.assertEquals(2, lines.size()); } - else if(cmd.startsWith("f")) { - int n = Integer.parseInt(cmd.substring(1)); - Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n); - spout.fail(item.getKey()); + } + + private static List<String> readTextFile(FileSystem fs, String f) throws IOException { + Path file = new Path(f); + FSDataInputStream x = fs.open(file); + BufferedReader reader = new BufferedReader(new InputStreamReader(x)); + String line = null; + ArrayList<String> result = new ArrayList<>(); + while ((line = reader.readLine()) != null) { + result.add(line); } - } - return collector.lines; + return result; } - private void createTextFile(Path file, int lineCount) throws IOException { - FSDataOutputStream os = fs.create(file); - int size = 0; - for (int i = 0; i < lineCount; i++) { - os.writeBytes("line " + i + System.lineSeparator()); - String msg = "line " + i + System.lineSeparator(); - size += msg.getBytes().length; + private Map getCommonConfigs() { + Map conf = new HashMap(); + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0"); + return conf; } - os.close(); - } + private AutoCloseableHdfsSpout makeSpout(String readerType, String[] outputFields) { + HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields) + .setReaderType(readerType) + .setHdfsUri(DFS_CLUSTER_RULE.getDfscluster().getURI().toString()) + .setSourceDir(source.toString()) + .setArchiveDir(archive.toString()) + .setBadFilesDir(badfiles.toString()); + return new AutoCloseableHdfsSpout(spout); + } - private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException { + private static class AutoCloseableHdfsSpout implements AutoCloseable { - Configuration conf = new Configuration(); - try { - if(fs.exists(file)) { - fs.delete(file, false); - } + private final HdfsSpout spout; - SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class ); - for (int i = 0; i < rowCount; i++) { - w.append(new IntWritable(i), new Text("line " + i)); - } - w.close(); - System.out.println("done"); - } catch (IOException e) { - e.printStackTrace(); + public AutoCloseableHdfsSpout(HdfsSpout spout) { + this.spout = spout; + } + @Override + public void close() throws Exception { + spout.close(); + } } - } - + private void openSpout(HdfsSpout spout, int spoutId, Map conf) { + MockCollector collector = new MockCollector(); + spout.open(conf, new MockTopologyContext(spoutId), collector); + } - static class MockCollector extends SpoutOutputCollector { - //comma separated offsets - public ArrayList<String> lines; - public ArrayList<Pair<HdfsSpout.MessageId, List<Object> > > items; + /** + * Execute a sequence of calls on HdfsSpout. + * + * @param cmds: set of commands to run, e.g. "r,r,r,r,a1,f2,...". The commands are: r[N] - receive() called N times aN - ack, item + * number: N fN - fail, item number: N + */ + private List<String> runSpout(HdfsSpout spout, String... cmds) { + MockCollector collector = (MockCollector) spout.getCollector(); + for (String cmd : cmds) { + if (cmd.startsWith("r")) { + int count = 1; + if (cmd.length() > 1) { + count = Integer.parseInt(cmd.substring(1)); + } + for (int i = 0; i < count; ++i) { + spout.nextTuple(); + } + } else if (cmd.startsWith("a")) { + int n = Integer.parseInt(cmd.substring(1)); + Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n); + spout.ack(item.getKey()); + } else if (cmd.startsWith("f")) { + int n = Integer.parseInt(cmd.substring(1)); + Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n); + spout.fail(item.getKey()); + } + } + return collector.lines; + } - public MockCollector() { - super(null); - lines = new ArrayList<>(); - items = new ArrayList<>(); + private void createTextFile(Path file, int lineCount) throws IOException { + FSDataOutputStream os = fs.create(file); + int size = 0; + for (int i = 0; i < lineCount; i++) { + os.writeBytes("line " + i + System.lineSeparator()); + String msg = "line " + i + System.lineSeparator(); + size += msg.getBytes().length; + } + os.close(); } + private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException { + Configuration conf = new Configuration(); + try { + if (fs.exists(file)) { + fs.delete(file, false); + } - @Override - public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { - lines.add(tuple.toString()); - items.add(HdfsUtils.Pair.of(messageId, tuple)); - return null; - } + SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class); + for (int i = 0; i < rowCount; i++) { + w.append(new IntWritable(i), new Text("line " + i)); + } + w.close(); + System.out.println("done"); + } catch (IOException e) { + e.printStackTrace(); - @Override - public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) { - throw new UnsupportedOperationException("NOT Implemented"); + } } - @Override - public void reportError(Throwable arg0) { - throw new UnsupportedOperationException("NOT Implemented"); - } + static class MockCollector extends SpoutOutputCollector { + //comma separated offsets - @Override - public long getPendingCount() { - return 0; - } - } // class MockCollector + public ArrayList<String> lines; + public ArrayList<Pair<HdfsSpout.MessageId, List<Object>>> items; + + public MockCollector() { + super(null); + lines = new ArrayList<>(); + items = new ArrayList<>(); + } + @Override + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { + lines.add(tuple.toString()); + items.add(HdfsUtils.Pair.of(messageId, tuple)); + return null; + } + @Override + public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) { + throw new UnsupportedOperationException("NOT Implemented"); + } - // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, thereafter - // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each file read - static class MockTextFailingReader extends TextFileReader { - public static final String[] defaultFields = {"line"}; - int readAttempts = 0; + @Override + public void reportError(Throwable arg0) { + throw new UnsupportedOperationException("NOT Implemented"); + } - public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws IOException { - super(fs, file, conf); - } + @Override + public long getPendingCount() { + return 0; + } + } // class MockCollector - @Override - public List<Object> next() throws IOException, ParseException { - readAttempts++; - if (readAttempts == 3 || readAttempts ==4) { - throw new IOException("mock test exception"); - } else if (readAttempts > 5 ) { - throw new ParseException("mock test exception", null); - } - return super.next(); - } - } + // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, thereafter + // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each file read + static class MockTextFailingReader extends TextFileReader { - static class MockTopologyContext extends TopologyContext { - private final int componentId; + public static final String[] defaultFields = {"line"}; + int readAttempts = 0; - public MockTopologyContext(int componentId) { - // StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled - super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); - this.componentId = componentId; - } + public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws IOException { + super(fs, file, conf); + } - public String getThisComponentId() { - return Integer.toString( componentId ); + @Override + public List<Object> next() throws IOException, ParseException { + readAttempts++; + if (readAttempts == 3 || readAttempts == 4) { + throw new IOException("mock test exception"); + } else if (readAttempts > 5) { + throw new ParseException("mock test exception", null); + } + return super.next(); + } } - } + static class MockTopologyContext extends TopologyContext { + + private final int componentId; + + public MockTopologyContext(int componentId) { + // StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled + super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this.componentId = componentId; + } + + public String getThisComponentId() { + return Integer.toString(componentId); + } + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java index 0bb44af..48aa4b6 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -9,13 +10,10 @@ * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ - package org.apache.storm.hdfs.spout; import org.apache.hadoop.conf.Configuration; @@ -33,92 +31,89 @@ import java.io.IOException; public class TestProgressTracker { - private FileSystem fs; - private Configuration conf = new Configuration(); - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - public File baseFolder; - - @Before - public void setUp() throws Exception { - fs = FileSystem.getLocal(conf); - } - - @Test - public void testBasic() throws Exception { - ProgressTracker tracker = new ProgressTracker(); - baseFolder = tempFolder.newFolder("trackertest"); - - Path file = new Path( baseFolder.toString() + Path.SEPARATOR + "testHeadTrimming.txt" ); - createTextFile(file, 10); - - // create reader and do some checks - TextFileReader reader = new TextFileReader(fs, file, null); - FileOffset pos0 = tracker.getCommitPosition(); - Assert.assertNull(pos0); - - TextFileReader.Offset currOffset = reader.getFileOffset(); - Assert.assertNotNull(currOffset); - Assert.assertEquals(0, currOffset.charOffset); - - // read 1st line and ack - Assert.assertNotNull(reader.next()); - TextFileReader.Offset pos1 = reader.getFileOffset(); - tracker.recordAckedOffset(pos1); - - TextFileReader.Offset pos1b = (TextFileReader.Offset) tracker.getCommitPosition(); - Assert.assertEquals(pos1, pos1b); - - // read 2nd line and ACK - Assert.assertNotNull(reader.next()); - TextFileReader.Offset pos2 = reader.getFileOffset(); - tracker.recordAckedOffset(pos2); - - tracker.dumpState(System.err); - TextFileReader.Offset pos2b = (TextFileReader.Offset) tracker.getCommitPosition(); - Assert.assertEquals(pos2, pos2b); - - - // read lines 3..7, don't ACK .. commit pos should remain same - Assert.assertNotNull(reader.next());//3 - TextFileReader.Offset pos3 = reader.getFileOffset(); - Assert.assertNotNull(reader.next());//4 - TextFileReader.Offset pos4 = reader.getFileOffset(); - Assert.assertNotNull(reader.next());//5 - TextFileReader.Offset pos5 = reader.getFileOffset(); - Assert.assertNotNull(reader.next());//6 - TextFileReader.Offset pos6 = reader.getFileOffset(); - Assert.assertNotNull(reader.next());//7 - TextFileReader.Offset pos7 = reader.getFileOffset(); - - // now ack msg 5 and check - tracker.recordAckedOffset(pos5); - Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2 - tracker.recordAckedOffset(pos4); - Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2 - tracker.recordAckedOffset(pos3); - Assert.assertEquals(pos5, tracker.getCommitPosition()); // should be at 5 - - tracker.recordAckedOffset(pos6); - Assert.assertEquals(pos6, tracker.getCommitPosition()); // should be at 6 - tracker.recordAckedOffset(pos6); // double ack on same msg - Assert.assertEquals(pos6, tracker.getCommitPosition()); // should still be at 6 - - tracker.recordAckedOffset(pos7); - Assert.assertEquals(pos7, tracker.getCommitPosition()); // should be at 7 - - tracker.dumpState(System.err); - } - - - - private void createTextFile(Path file, int lineCount) throws IOException { - FSDataOutputStream os = fs.create(file); - for (int i = 0; i < lineCount; i++) { - os.writeBytes("line " + i + System.lineSeparator()); + private FileSystem fs; + private Configuration conf = new Configuration(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + public File baseFolder; + + @Before + public void setUp() throws Exception { + fs = FileSystem.getLocal(conf); + } + + @Test + public void testBasic() throws Exception { + ProgressTracker tracker = new ProgressTracker(); + baseFolder = tempFolder.newFolder("trackertest"); + + Path file = new Path(baseFolder.toString() + Path.SEPARATOR + "testHeadTrimming.txt"); + createTextFile(file, 10); + + // create reader and do some checks + TextFileReader reader = new TextFileReader(fs, file, null); + FileOffset pos0 = tracker.getCommitPosition(); + Assert.assertNull(pos0); + + TextFileReader.Offset currOffset = reader.getFileOffset(); + Assert.assertNotNull(currOffset); + Assert.assertEquals(0, currOffset.charOffset); + + // read 1st line and ack + Assert.assertNotNull(reader.next()); + TextFileReader.Offset pos1 = reader.getFileOffset(); + tracker.recordAckedOffset(pos1); + + TextFileReader.Offset pos1b = (TextFileReader.Offset) tracker.getCommitPosition(); + Assert.assertEquals(pos1, pos1b); + + // read 2nd line and ACK + Assert.assertNotNull(reader.next()); + TextFileReader.Offset pos2 = reader.getFileOffset(); + tracker.recordAckedOffset(pos2); + + tracker.dumpState(System.err); + TextFileReader.Offset pos2b = (TextFileReader.Offset) tracker.getCommitPosition(); + Assert.assertEquals(pos2, pos2b); + + // read lines 3..7, don't ACK .. commit pos should remain same + Assert.assertNotNull(reader.next());//3 + TextFileReader.Offset pos3 = reader.getFileOffset(); + Assert.assertNotNull(reader.next());//4 + TextFileReader.Offset pos4 = reader.getFileOffset(); + Assert.assertNotNull(reader.next());//5 + TextFileReader.Offset pos5 = reader.getFileOffset(); + Assert.assertNotNull(reader.next());//6 + TextFileReader.Offset pos6 = reader.getFileOffset(); + Assert.assertNotNull(reader.next());//7 + TextFileReader.Offset pos7 = reader.getFileOffset(); + + // now ack msg 5 and check + tracker.recordAckedOffset(pos5); + Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2 + tracker.recordAckedOffset(pos4); + Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2 + tracker.recordAckedOffset(pos3); + Assert.assertEquals(pos5, tracker.getCommitPosition()); // should be at 5 + + tracker.recordAckedOffset(pos6); + Assert.assertEquals(pos6, tracker.getCommitPosition()); // should be at 6 + tracker.recordAckedOffset(pos6); // double ack on same msg + Assert.assertEquals(pos6, tracker.getCommitPosition()); // should still be at 6 + + tracker.recordAckedOffset(pos7); + Assert.assertEquals(pos7, tracker.getCommitPosition()); // should be at 7 + + tracker.dumpState(System.err); + } + + private void createTextFile(Path file, int lineCount) throws IOException { + try (FSDataOutputStream os = fs.create(file)) { + for (int i = 0; i < lineCount; i++) { + os.writeBytes("line " + i + System.lineSeparator()); + } + } } - os.close(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/a0308efd/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java new file mode 100644 index 0000000..d80796a --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java @@ -0,0 +1,78 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.testing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +public class MiniDFSClusterRule implements TestRule { + + private static final String TEST_BUILD_DATA = "test.build.data"; + + private final Java7Supplier<Configuration> hadoopConfSupplier; + private Configuration hadoopConf; + private MiniDFSCluster dfscluster; + + public static interface Java7Supplier<T> { + T get(); + } + + public MiniDFSClusterRule() { + this(new Java7Supplier<Configuration>() { + @Override + public Configuration get() { + return new Configuration(); + } + }); + } + + public MiniDFSClusterRule(Java7Supplier<Configuration> hadoopConfSupplier) { + this.hadoopConfSupplier = hadoopConfSupplier; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public MiniDFSCluster getDfscluster() { + return dfscluster; + } + + @Override + public Statement apply(Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + try { + System.setProperty(TEST_BUILD_DATA, "target/test/data"); + hadoopConf = hadoopConfSupplier.get(); + dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build(); + dfscluster.waitActive(); + } finally { + if (dfscluster != null) { + dfscluster.shutdown(); + } + System.clearProperty(TEST_BUILD_DATA); + } + } + }; + } + +}
