Repository: apex-malhar Updated Branches: refs/heads/master d25dcf079 -> 70154f641
APEXMALHAR-2374 Calling scan recursively if the current path is a directory; test case for multi-level directories with recursive flag Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/70154f64 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/70154f64 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/70154f64 Branch: refs/heads/master Commit: 70154f641be38dd07512e7c45bb0e67254af6fb5 Parents: d25dcf0 Author: francisf <[email protected]> Authored: Wed Dec 21 17:08:24 2016 +0530 Committer: francisf <[email protected]> Committed: Tue Jan 3 14:09:13 2017 +0530 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 28 +++++++++++++++++++- .../io/fs/AbstractFileInputOperatorTest.java | 23 ++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70154f64/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 0f3cc48..be156d1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -1006,6 +1006,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par private static final long serialVersionUID = 4535844463258899929L; private String filePatternRegexp; private transient Pattern regex = null; + private boolean recursive = true; private int partitionIndex; private int partitionCount; protected final transient HashSet<String> ignoredFiles = new HashSet<String>(); @@ -1057,7 +1058,12 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par continue; } - if (acceptFile(filePathStr)) { + if (status.isDirectory() ) { + if (isRecursive()) { + LinkedHashSet<Path> childPathSet = scan(fs, path, consumedFiles); + pathSet.addAll(childPathSet); + } + } else if (acceptFile(filePathStr)) { LOG.debug("Found {}", filePathStr); pathSet.add(path); } else { @@ -1143,6 +1149,26 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par { this.partitionCount = partitionCount; } + + /** + * True if recursive; false otherwise. + * + * @param recursive true if recursive; false otherwise. + */ + public boolean isRecursive() + { + return recursive; + } + + /** + * Sets whether scan will be recursive. + * + * @return true if recursive; false otherwise. + */ + public void setRecursive(boolean recursive) + { + this.recursive = recursive; + } } protected static class RecoveryEntry http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70154f64/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index e9346ec..1a97f5e 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -96,17 +96,30 @@ public class AbstractFileInputOperatorTest public TestMeta testMeta = new TestMeta(); @Test + public void testSinglePartitonRecursive() throws Exception + { + checkSubDir(true); + } + + @Test public void testSinglePartiton() throws Exception { + checkSubDir(false); + } + + private void checkSubDir(boolean recursive) throws Exception + { FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); HashSet<String> allLines = Sets.newHashSet(); + String subdir = ""; for (int file = 0; file < 2; file++) { + subdir += String.format("/depth_%d", file); HashSet<String> lines = Sets.newHashSet(); for (int line = 0; line < 2; line++) { lines.add("f" + file + "l" + line); } allLines.addAll(lines); - FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + FileUtils.write(new File(testMeta.dir + subdir, "file" + file), StringUtils.join(lines, '\n')); } LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); @@ -118,6 +131,7 @@ public class AbstractFileInputOperatorTest oper.setDirectory(testMeta.dir); oper.getScanner().setFilePatternRegexp(".*file[\\d]"); + oper.getScanner().setRecursive(recursive); oper.setup(testMeta.context); for (long wid = 0; wid < 3; wid++) { @@ -127,7 +141,12 @@ public class AbstractFileInputOperatorTest } oper.teardown(); - Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size()); + int expectedNumTuples = 4; + if (!recursive) { + allLines = new HashSet<String>(); + expectedNumTuples = 0; + } + Assert.assertEquals("number tuples", expectedNumTuples, queryResults.collectedTuples.size()); Assert.assertEquals("lines", allLines, new HashSet<String>(queryResults.collectedTuples)); }
