Repository: apex-malhar Updated Branches: refs/heads/master c128091c7 -> 3a3629841
APEXMALHAR-2447 Added notfication in AbstractFileInputOperator, when scanning the directory for first time. Implementations can use these information in different ways such as notifying downstream operators that a new directory has been scanned. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3a362984 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3a362984 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3a362984 Branch: refs/heads/master Commit: 3a362984143dde0a89fb9538a704ed1f444f0204 Parents: c128091 Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Thu Mar 16 08:27:39 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Sun May 14 21:41:06 2017 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 40 +++++++++-- .../io/fs/AbstractFileInputOperatorTest.java | 75 ++++++++++++++++++-- 2 files changed, 105 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3a362984/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index bc5ebf1..5c34546 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Serializable; + import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -710,6 +711,23 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par } /** + * Notifies that the directory is being scanned.<br> + * Override this method to custom handling. Will be called once + */ + protected void visitDirectory(Path filePath) + { + } + + private void checkVisitedDirectory(Path path) + { + String pathString = path.toString(); + if (!processedFiles.contains(pathString)) { + visitDirectory(path); + processedFiles.add(pathString); + } + } + + /** * Scans the directory for new files. */ protected void scanDirectory() @@ -718,10 +736,19 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles); for (Path newPath : newPaths) { - String newPathString = newPath.toString(); - pendingFiles.add(newPathString); - processedFiles.add(newPathString); - localProcessedFileCount.increment(); + try { + FileStatus fileStatus = fs.getFileStatus(newPath); + if (fileStatus.isDirectory()) { + checkVisitedDirectory(newPath); + } else { + String newPathString = newPath.toString(); + pendingFiles.add(newPathString); + processedFiles.add(newPathString); + localProcessedFileCount.increment(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } lastScanMillis = System.currentTimeMillis(); @@ -1059,6 +1086,11 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet(); try { LOG.debug("Scanning {} with pattern {}", filePath, this.filePatternRegexp); + if (!consumedFiles.contains(filePath.toString())) { + if (fs.isDirectory(filePath)) { + pathSet.add(filePath); + } + } FileStatus[] files = fs.listStatus(filePath); for (FileStatus status : files) { Path path = status.getPath(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3a362984/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index b9cdd67..8acd16a 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -20,7 +20,6 @@ package com.datatorrent.lib.io.fs; import java.io.ByteArrayOutputStream; import java.io.File; - import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -59,7 +58,6 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Partitioner.Partition; import com.datatorrent.api.StatsListener; - import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -152,6 +150,71 @@ public class AbstractFileInputOperatorTest } + public static class LineOperator extends LineByLineFileInputOperator + { + Set<String> dirPaths = Sets.newHashSet(); + + @Override + protected void visitDirectory(Path filePath) + { + dirPaths.add(Path.getPathWithoutSchemeAndAuthority(filePath).toString()); + } + } + + @Test + public void testEmptyDirectory() throws Exception + { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + Set<String> dPaths = Sets.newHashSet(); + dPaths.add(new File(testMeta.dir).getCanonicalPath()); + + String subdir01 = "/a"; + dPaths.add(new File(testMeta.dir + subdir01).getCanonicalPath()); + FileUtils.forceMkdir((new File(testMeta.dir + subdir01))); + + String subdir02 = "/b"; + dPaths.add(new File(testMeta.dir + subdir02).getCanonicalPath()); + FileUtils.forceMkdir(new File(testMeta.dir + subdir02)); + + String subdir03 = subdir02 + "/c"; + dPaths.add(new File(testMeta.dir + subdir03).getCanonicalPath()); + FileUtils.forceMkdir(new File(testMeta.dir + subdir03)); + + String subdir04 = "/d"; + List<String> allLines = Lists.newArrayList(); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 5; line++) { + lines.add("f0" + "l" + line); + } + allLines.addAll(lines); + File testFile = new File(testMeta.dir + subdir04, "file0"); + dPaths.add(new File(testMeta.dir + subdir04).getCanonicalPath()); + FileUtils.write(testFile, StringUtils.join(lines, '\n')); + + LineOperator oper = new LineOperator(); + oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); + oper.setScanIntervalMillis(0); + + CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; + oper.output.setSink(sink); + + int wid = 0; + + // Read all records to populate processedList in operator. + oper.setup(testMeta.context); + for (int i = 0; i < 3; i++) { + oper.beginWindow(wid); + oper.emitTuples(); + oper.endWindow(); + wid++; + } + + Assert.assertEquals("Size", 5, oper.dirPaths.size()); + Assert.assertTrue("Checking Sets", dPaths.equals(oper.dirPaths)); + } + @Test public void testScannerPartitioning() throws Exception { @@ -169,10 +232,10 @@ public class AbstractFileInputOperatorTest Set<Path> allFiles = Sets.newHashSet(); for (DirectoryScanner partition : partitions) { Set<Path> files = partition.scan(fs, path, Sets.<String>newHashSet()); - Assert.assertEquals("", 2, files.size()); + Assert.assertEquals("", 3, files.size()); allFiles.addAll(files); } - Assert.assertEquals("Found all files " + allFiles, 4, allFiles.size()); + Assert.assertEquals("Found all files " + allFiles, 5, allFiles.size()); } @@ -201,7 +264,7 @@ public class AbstractFileInputOperatorTest Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner()); Set<String> consumed = Sets.newHashSet(); LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed); - Assert.assertEquals("partition " + files, 2, files.size()); + Assert.assertEquals("partition " + files, 3, files.size()); } } @@ -1109,7 +1172,7 @@ public class AbstractFileInputOperatorTest Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner()); Set<String> consumed = Sets.newHashSet(); LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed); - Assert.assertEquals("partition " + files, 5, files.size()); + Assert.assertEquals("partition " + files, 6, files.size()); } }
