Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master df4cc4d2a -> 78c5fad19


APEXMALHAR-2103 Fixed the scanner issue in FileSplitterInput Class


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/78c5fad1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/78c5fad1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/78c5fad1

Branch: refs/heads/master
Commit: 78c5fad197f46016d6452abe5080561e450ad0d5
Parents: df4cc4d
Author: Chaitanya <[email protected]>
Authored: Thu Jun 2 10:00:14 2016 +0530
Committer: Chaitanya <[email protected]>
Committed: Thu Jun 2 12:28:02 2016 +0530

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            | 17 ++++++-----
 .../lib/io/fs/FileSplitterInputTest.java        | 32 ++++++++++++++++++++
 2 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/78c5fad1/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index a58bee7..077a4ac 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
@@ -194,11 +193,12 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
   protected void updateReferenceTimes(ScannedFileInfo fileInfo)
   {
     Map<String, Long> referenceTimePerInputDir;
-    if ((referenceTimePerInputDir = 
referenceTimes.get(fileInfo.getDirectoryPath())) == null) {
+    String referenceKey = fileInfo.getDirectoryPath() == null ? 
fileInfo.getFilePath() : fileInfo.getDirectoryPath();
+    if ((referenceTimePerInputDir = referenceTimes.get(referenceKey)) == null) 
{
       referenceTimePerInputDir = Maps.newHashMap();
     }
     referenceTimePerInputDir.put(fileInfo.getFilePath(), 
fileInfo.modifiedTime);
-    referenceTimes.put(fileInfo.getDirectoryPath(), referenceTimePerInputDir);
+    referenceTimes.put(referenceKey, referenceTimePerInputDir);
   }
 
   @Override
@@ -375,11 +375,14 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
             lastScannedInfo = null;
             numDiscoveredPerIteration = 0;
             for (String afile : files) {
-              String filePath = new File(afile).getAbsolutePath();
+              Path filePath = new Path(afile);
               LOG.debug("Scan started for input {}", filePath);
-              Map<String, Long> lastModifiedTimesForInputDir;
-              lastModifiedTimesForInputDir = referenceTimes.get(filePath);
-              scan(new Path(afile), null, lastModifiedTimesForInputDir);
+              Map<String, Long> lastModifiedTimesForInputDir = null;
+              if (fs.exists(filePath)) {
+                FileStatus fileStatus = fs.getFileStatus(filePath);
+                lastModifiedTimesForInputDir = 
referenceTimes.get(fileStatus.getPath().toUri().getPath());
+              }
+              scan(filePath, null, lastModifiedTimesForInputDir);
             }
             scanIterationComplete();
           } else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/78c5fad1/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cea5109..febda3f 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -156,6 +156,38 @@ public class FileSplitterInputTest
   }
 
   @Test
+  public void testScannerFilterForDuplicates() throws InterruptedException
+  {
+    String filePath = testMeta.dataDirectory + Path.SEPARATOR + "file0.txt";
+    testMeta.scanner = new MockScanner();
+    testMeta.fileSplitterInput.setScanner(testMeta.scanner);
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+    
testMeta.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
+    testMeta.fileSplitterInput.getScanner().setFiles(filePath);
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    testMeta.fileSplitterInput.beginWindow(1);
+    testMeta.scanner.semaphore.acquire();
+
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.scanner.semaphore.release();
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("File metadata", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
+    for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
+      FileSplitterInput.FileMetadata metadata = 
(FileSplitterInput.FileMetadata)fileMetadata;
+      Assert.assertTrue("path: " + metadata.getFilePath(), 
testMeta.filePaths.contains(metadata.getFilePath()));
+      Assert.assertNotNull("name: ", metadata.getFileName());
+    }
+
+    testMeta.fileMetadataSink.collectedTuples.clear();
+  }
+
+  @Test
   public void testBlockMetadataNoSplit() throws InterruptedException
   {
     testMeta.fileSplitterInput.beginWindow(1);

Reply via email to