discoveredFiles getting empty between scan and scan iteration completion
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f99696af Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f99696af Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f99696af Branch: refs/heads/devel-3 Commit: f99696aff2e46cd50d3060480d4d3018ac65e3e1 Parents: ea202da Author: Chandni Singh <[email protected]> Authored: Tue Sep 22 23:01:36 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Wed Sep 23 17:48:15 2015 -0700 ---------------------------------------------------------------------- .../lib/io/fs/FileSplitterInput.java | 43 ++++++++++++-------- .../lib/io/fs/FileSplitterInputTest.java | 8 ++-- 2 files changed, 30 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index a381be5..2560191 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -176,9 +176,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper protected boolean processFileInfo(FileInfo fileInfo) { ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo; + if (scannedFileInfo == TimeBasedDirectoryScanner.DELIMITER) { + return false; + } currentWindowRecoveryState.add(scannedFileInfo); updateReferenceTimes(scannedFileInfo); - return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan; + return super.processFileInfo(fileInfo); } protected void updateReferenceTimes(ScannedFileInfo fileInfo) @@ -255,6 +258,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext> { private static long DEF_SCAN_INTERVAL_MILLIS = 5000; + private static ScannedFileInfo DELIMITER = new ScannedFileInfo(); private boolean recursive; @@ -281,6 +285,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper protected transient Map<String, Long> referenceTimes; private transient ScannedFileInfo lastScannedInfo; + private transient int numDiscoveredPerIteration; public TimeBasedDirectoryScanner() { @@ -339,10 +344,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) && (lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) { trigger = false; + lastScannedInfo = null; + numDiscoveredPerIteration = 0; for (String afile : files) { scan(new Path(afile), null); } - scanComplete(); + scanIterationComplete(); } else { Thread.sleep(sleepMillis); } @@ -358,13 +365,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper /** * Operations that need to be done once a scan is complete. */ - protected void scanComplete() + protected void scanIterationComplete() { - LOG.debug("scan complete {}", lastScanMillis); - ScannedFileInfo fileInfo = discoveredFiles.peekLast(); - if (fileInfo != null) { - fileInfo.lastFileOfScan = true; - lastScannedInfo = fileInfo; + LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration); + if (numDiscoveredPerIteration > 0) { + discoveredFiles.add(DELIMITER); } lastScanMillis = System.currentTimeMillis(); } @@ -398,7 +403,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } if (acceptFile(childPathStr)) { LOG.debug("found {}", childPathStr); - discoveredFiles.add(info); + processDiscoveredFile(info); } else { // don't look at it again ignoredFiles.add(childPathStr); @@ -411,6 +416,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } } + protected void processDiscoveredFile(ScannedFileInfo info) + { + numDiscoveredPerIteration++; + lastScannedInfo = info; + discoveredFiles.add(info); + } + protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath) { ScannedFileInfo info; @@ -462,6 +474,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper return discoveredFiles.poll(); } + protected int getNumDiscoveredPerIteration() + { + return numDiscoveredPerIteration; + } + /** * Gets the regular expression for file names to split. * @@ -570,9 +587,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo { protected final long modifiedTime; - private transient boolean lastFileOfScan; - private ScannedFileInfo() + protected ScannedFileInfo() { super(); modifiedTime = -1; @@ -583,11 +599,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper super(directoryPath, relativeFilePath); this.modifiedTime = modifiedTime; } - - protected boolean isLastFileOfScan() - { - return lastFileOfScan; - } } private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index 8dfea7a..1e2a25c 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -471,14 +471,12 @@ public class FileSplitterInputTest } @Override - protected void scanComplete() + protected void scanIterationComplete() { - super.scanComplete(); - if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) { + if (getNumDiscoveredPerIteration() > 0) { semaphore.release(); - LOG.debug("discovered {}", discoveredFiles.size()); } - + super.scanIterationComplete(); } }
