Repository: incubator-apex-malhar Updated Branches: refs/heads/master df4cc4d2a -> 78c5fad19
APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/78c5fad1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/78c5fad1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/78c5fad1 Branch: refs/heads/master Commit: 78c5fad197f46016d6452abe5080561e450ad0d5 Parents: df4cc4d Author: Chaitanya <[email protected]> Authored: Thu Jun 2 10:00:14 2016 +0530 Committer: Chaitanya <[email protected]> Committed: Thu Jun 2 12:28:02 2016 +0530 ---------------------------------------------------------------------- .../lib/io/fs/FileSplitterInput.java | 17 ++++++----- .../lib/io/fs/FileSplitterInputTest.java | 32 ++++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/78c5fad1/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index a58bee7..077a4ac 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -18,7 +18,6 @@ */ package com.datatorrent.lib.io.fs; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; @@ -194,11 +193,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper protected void updateReferenceTimes(ScannedFileInfo fileInfo) { Map<String, Long> referenceTimePerInputDir; - if ((referenceTimePerInputDir = referenceTimes.get(fileInfo.getDirectoryPath())) == null) { + String referenceKey = fileInfo.getDirectoryPath() == null ? fileInfo.getFilePath() : fileInfo.getDirectoryPath(); + if ((referenceTimePerInputDir = referenceTimes.get(referenceKey)) == null) { referenceTimePerInputDir = Maps.newHashMap(); } referenceTimePerInputDir.put(fileInfo.getFilePath(), fileInfo.modifiedTime); - referenceTimes.put(fileInfo.getDirectoryPath(), referenceTimePerInputDir); + referenceTimes.put(referenceKey, referenceTimePerInputDir); } @Override @@ -375,11 +375,14 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - String filePath = new File(afile).getAbsolutePath(); + Path filePath = new Path(afile); LOG.debug("Scan started for input {}", filePath); - Map<String, Long> lastModifiedTimesForInputDir; - lastModifiedTimesForInputDir = referenceTimes.get(filePath); - scan(new Path(afile), null, lastModifiedTimesForInputDir); + Map<String, Long> lastModifiedTimesForInputDir = null; + if (fs.exists(filePath)) { + FileStatus fileStatus = fs.getFileStatus(filePath); + lastModifiedTimesForInputDir = referenceTimes.get(fileStatus.getPath().toUri().getPath()); + } + scan(filePath, null, lastModifiedTimesForInputDir); } scanIterationComplete(); } else { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/78c5fad1/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index cea5109..febda3f 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -156,6 +156,38 @@ public class FileSplitterInputTest } @Test + public void testScannerFilterForDuplicates() throws InterruptedException + { + String filePath = testMeta.dataDirectory + Path.SEPARATOR + "file0.txt"; + testMeta.scanner = new MockScanner(); + testMeta.fileSplitterInput.setScanner(testMeta.scanner); + testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); + testMeta.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt"); + testMeta.fileSplitterInput.getScanner().setFiles(filePath); + testMeta.fileSplitterInput.setup(testMeta.context); + testMeta.fileSplitterInput.beginWindow(1); + testMeta.scanner.semaphore.acquire(); + + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + testMeta.fileSplitterInput.beginWindow(2); + testMeta.scanner.semaphore.release(); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("File metadata", 1, testMeta.fileMetadataSink.collectedTuples.size()); + for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) { + FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata; + Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); + Assert.assertNotNull("name: ", metadata.getFileName()); + } + + testMeta.fileMetadataSink.collectedTuples.clear(); + } + + @Test public void testBlockMetadataNoSplit() throws InterruptedException { testMeta.fileSplitterInput.beginWindow(1);
