Repository: apex-malhar
Updated Branches:
  refs/heads/master 791383bd1 -> 53cb1836f


APEXMALHAR-1852 fix the recovery of partial file test


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9febcc0c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9febcc0c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9febcc0c

Branch: refs/heads/master
Commit: 9febcc0ca1f0e18820ed3029048c3219bf5e5a3e
Parents: 9f04db7
Author: Chandni Singh <[email protected]>
Authored: Fri Oct 7 11:56:05 2016 -0700
Committer: Chandni Singh <[email protected]>
Committed: Sun Oct 9 20:52:57 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileSplitter.java         |   1 -
 .../lib/io/fs/FileSplitterInput.java            |  81 ++++------
 .../apex/malhar/lib/wal/FileSystemWAL.java      |   2 +-
 .../lib/io/fs/FileSplitterBaseTest.java         |   2 +-
 .../lib/io/fs/FileSplitterInputTest.java        | 158 ++++++++++---------
 .../datatorrent/lib/io/fs/FileSplitterTest.java |  27 ----
 6 files changed, 122 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index 38c8e96..c002c18 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -192,7 +192,6 @@ public abstract class AbstractFileSplitter extends 
BaseOperator
   {
     LOG.debug("file {}", fileInfo.getFilePath());
     FileMetadata fileMetadata = createFileMetadata(fileInfo);
-    LOG.debug("fileMetadata {}", fileMetadata);
     Path path = new Path(fileInfo.getFilePath());
 
     fileMetadata.setFileName(path.getName());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 3763ef0..2d290cd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
@@ -27,6 +26,7 @@ import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -52,9 +52,9 @@ import org.apache.hadoop.fs.Path;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
@@ -62,7 +62,6 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Input operator that scans a directory for files and splits a file into 
blocks.<br/>
@@ -90,8 +89,6 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
 
   private Map<String, Map<String, Long>> referenceTimes;
 
-  private transient long sleepMillis;
-
   public FileSplitterInput()
   {
     super();
@@ -104,9 +101,8 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   {
     currentWindowRecoveryState = Lists.newLinkedList();
     if (referenceTimes == null) {
-      referenceTimes = Maps.newHashMap();
+      referenceTimes = new ConcurrentHashMap<>();
     }
-    sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
     scanner.setup(context);
     windowDataManager.setup(context);
     super.setup(context);
@@ -166,14 +162,7 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
 
     Throwable throwable;
     if ((throwable = scanner.atomicThrowable.get()) != null) {
-      DTThrowable.rethrow(throwable);
-    }
-    if (blockMetadataIterator == null && scanner.discoveredFiles.isEmpty()) {
-      try {
-        Thread.sleep(sleepMillis);
-      } catch (InterruptedException e) {
-        throw new RuntimeException("waiting for work", e);
-      }
+      Throwables.propagate(throwable);
     }
     process();
   }
@@ -198,7 +187,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
     Map<String, Long> referenceTimePerInputDir;
     String referenceKey = fileInfo.getDirectoryPath() == null ? 
fileInfo.getFilePath() : fileInfo.getDirectoryPath();
     if ((referenceTimePerInputDir = referenceTimes.get(referenceKey)) == null) 
{
-      referenceTimePerInputDir = Maps.newHashMap();
+      referenceTimePerInputDir = new ConcurrentHashMap<>();
     }
     referenceTimePerInputDir.put(fileInfo.getFilePath(), 
fileInfo.modifiedTime);
     referenceTimes.put(referenceKey, referenceTimePerInputDir);
@@ -389,7 +378,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
         LOG.error("service", throwable);
         running = false;
         atomicThrowable.set(throwable);
-        DTThrowable.rethrow(throwable);
+        Throwables.propagate(throwable);
       }
     }
 
@@ -415,46 +404,45 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
       lastScanMillis = System.currentTimeMillis();
     }
 
-    protected void scan(@NotNull Path filePath, Path rootPath)
+    /**
+     * This is not used anywhere and should be removed. however, currently it 
breaks backward compatibility, so
+     * just deprecating it.
+     */
+    @Deprecated
+    protected void scan(@NotNull Path filePath, Path rootPath) throws 
IOException
     {
       Map<String, Long> lastModifiedTimesForInputDir;
       lastModifiedTimesForInputDir = 
referenceTimes.get(filePath.toUri().getPath());
       scan(filePath, rootPath, lastModifiedTimesForInputDir);
     }
 
-    private void scan(Path filePath, Path rootPath, Map<String, Long> 
lastModifiedTimesForInputDir)
+    private void scan(Path filePath, Path rootPath, Map<String, Long> 
lastModifiedTimesForInputDir) throws IOException
     {
-      try {
-        FileStatus parentStatus = fs.getFileStatus(filePath);
-        String parentPathStr = filePath.toUri().getPath();
+      FileStatus parentStatus = fs.getFileStatus(filePath);
+      String parentPathStr = filePath.toUri().getPath();
 
-        LOG.debug("scan {}", parentPathStr);
+      LOG.debug("scan {}", parentPathStr);
 
-        FileStatus[] childStatuses = fs.listStatus(filePath);
+      FileStatus[] childStatuses = fs.listStatus(filePath);
 
-        if (childStatuses.length == 0 && rootPath == null && 
(lastModifiedTimesForInputDir == null || 
lastModifiedTimesForInputDir.get(parentPathStr) == null)) { // empty input 
directory copy as is
-          ScannedFileInfo info = new ScannedFileInfo(null, 
filePath.toString(), parentStatus.getModificationTime());
-          processDiscoveredFile(info);
-        }
-
-        for (FileStatus childStatus : childStatuses) {
-          Path childPath = childStatus.getPath();
-          String childPathStr = childPath.toUri().getPath();
+      if (childStatuses.length == 0 && rootPath == null && 
(lastModifiedTimesForInputDir == null || 
lastModifiedTimesForInputDir.get(parentPathStr) == null)) { // empty input 
directory copy as is
+        ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), 
parentStatus.getModificationTime());
+        processDiscoveredFile(info);
+      }
 
-          if (childStatus.isDirectory() && isRecursive()) {
-            addToDiscoveredFiles(rootPath, parentStatus, childStatus, 
lastModifiedTimesForInputDir);
-            scan(childPath, rootPath == null ? parentStatus.getPath() : 
rootPath, lastModifiedTimesForInputDir);
-          } else if (acceptFile(childPathStr)) {
-            addToDiscoveredFiles(rootPath, parentStatus, childStatus, 
lastModifiedTimesForInputDir);
-          } else {
-            // don't look at it again
-            ignoredFiles.add(childPathStr);
-          }
+      for (FileStatus childStatus : childStatuses) {
+        Path childPath = childStatus.getPath();
+        String childPathStr = childPath.toUri().getPath();
+
+        if (childStatus.isDirectory() && isRecursive()) {
+          addToDiscoveredFiles(rootPath, parentStatus, childStatus, 
lastModifiedTimesForInputDir);
+          scan(childPath, rootPath == null ? parentStatus.getPath() : 
rootPath, lastModifiedTimesForInputDir);
+        } else if (acceptFile(childPathStr)) {
+          addToDiscoveredFiles(rootPath, parentStatus, childStatus, 
lastModifiedTimesForInputDir);
+        } else {
+          // don't look at it again
+          ignoredFiles.add(childPathStr);
         }
-      } catch (FileNotFoundException fnf) {
-        LOG.warn("Failed to list directory {}", filePath, fnf);
-      } catch (IOException e) {
-        throw new RuntimeException("listing files", e);
       }
     }
 
@@ -480,8 +468,6 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
 
       ScannedFileInfo info = createScannedFileInfo(parentStatus.getPath(), 
parentStatus, childPath, childStatus,
           rootPath);
-
-      LOG.debug("Processing file: " + info.getFilePath());
       processDiscoveredFile(info);
     }
 
@@ -490,6 +476,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
       numDiscoveredPerIteration++;
       lastScannedInfo = info;
       discoveredFiles.add(info);
+      LOG.debug("discovered {} {}", info.getFilePath(), info.modifiedTime);
     }
 
     protected ScannedFileInfo createScannedFileInfo(Path parentPath, 
FileStatus parentStatus, Path childPath,

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java 
b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
index 49f61a4..1ae039b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
@@ -402,8 +402,8 @@ public class FileSystemWAL implements 
WAL<FileSystemWAL.FileSystemWALReader, Fil
         isOpenPathTmp = false;
       }
 
-      LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, 
walPointer);
       if (fileSystemWAL.fileContext.util().exists(pathToReadFrom)) {
+        LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, 
walPointer);
         DataInputStream stream = 
fileSystemWAL.fileContext.open(pathToReadFrom);
         if (walPointer.offset > 0) {
           stream.skip(walPointer.offset);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index 862e589..58cf0a5 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -184,7 +184,7 @@ public class FileSplitterBaseTest
 
     Assert.assertEquals("Blocks", 10, 
baseTestMeta.blockMetadataSink.collectedTuples.size());
 
-    for (int window = 2; window < 8; window++) {
+    for (int window = 2; window <= 8; window++) {
       baseTestMeta.fileSplitter.beginWindow(window);
       baseTestMeta.fileSplitter.handleIdleTime();
       baseTestMeta.fileSplitter.endWindow();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index acbf93b..df45778 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -64,7 +64,8 @@ public class FileSplitterInputTest
     for (int file = 0; file < 12; file++) {
       HashSet<String> lines = Sets.newHashSet();
       for (int line = 0; line < 2; line++) {
-        lines.add("f" + file + "l" + line);
+        //padding 0 to file number so every file has 6 blocks.
+        lines.add("f" + String.format("%02d", file) + "l" + line);
       }
       allLines.addAll(lines);
       File created = new File(dataDirectory, "file" + file + ".txt");
@@ -101,7 +102,7 @@ public class FileSplitterInputTest
       fileSplitterInput = new FileSplitterInput();
       fileSplitterInput.setBlocksThreshold(100);
       scanner = new MockScanner();
-      scanner.setScanIntervalMillis(500);
+      scanner.setScanIntervalMillis(100);
       scanner.setFilePatternRegularExp(".*[.]txt");
       scanner.setFiles(dataDirectory);
       fileSplitterInput.setScanner(scanner);
@@ -142,10 +143,10 @@ public class FileSplitterInputTest
   @Rule
   public TestMeta testMeta = new TestMeta();
 
-  private void window1TestHelper() throws InterruptedException
+  private void validateFileMetadataInWindow1() throws InterruptedException
   {
     testMeta.fileSplitterInput.beginWindow(1);
-    testMeta.scanner.semaphore.acquire();
+    
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12);
 
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
@@ -165,7 +166,7 @@ public class FileSplitterInputTest
   public void testFileMetadata() throws InterruptedException
   {
     testMeta.fileSplitterInput.setup(testMeta.context);
-    window1TestHelper();
+    validateFileMetadataInWindow1();
     testMeta.fileSplitterInput.teardown();
   }
 
@@ -187,8 +188,6 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.endWindow();
 
     testMeta.fileSplitterInput.beginWindow(2);
-    testMeta.scanner.semaphore.release();
-    testMeta.scanner.semaphore.acquire();
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
@@ -208,7 +207,7 @@ public class FileSplitterInputTest
   {
     testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
-    testMeta.scanner.semaphore.acquire();
+    testMeta.scanner.semaphore.acquire(12);
 
     testMeta.fileSplitterInput.emitTuples();
     Assert.assertEquals("Blocks", 12, 
testMeta.blockMetadataSink.collectedTuples.size());
@@ -226,7 +225,7 @@ public class FileSplitterInputTest
 
     testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
-    testMeta.scanner.semaphore.acquire();
+    testMeta.scanner.semaphore.acquire(12);
 
     testMeta.fileSplitterInput.emitTuples();
     Assert.assertEquals("Files", 12, 
testMeta.fileMetadataSink.collectedTuples.size());
@@ -249,7 +248,7 @@ public class FileSplitterInputTest
 
     testMeta.fileSplitterInput.setup(testMeta.context);
     //will emit window 1 from data directory
-    window1TestHelper();
+    validateFileMetadataInWindow1();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
     testMeta.fileSplitterInput.teardown();
@@ -271,7 +270,7 @@ public class FileSplitterInputTest
   public void testTimeScan() throws InterruptedException, IOException, 
TimeoutException
   {
     testMeta.fileSplitterInput.setup(testMeta.context);
-    window1TestHelper();
+    validateFileMetadataInWindow1();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
@@ -290,7 +289,8 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
-    Assert.assertEquals("window 2: files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: files " + 
testMeta.fileMetadataSink.collectedTuples, 1,
+        testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("window 2: blocks", 1, 
testMeta.blockMetadataSink.collectedTuples.size());
     testMeta.fileSplitterInput.teardown();
   }
@@ -301,7 +301,7 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
 
     testMeta.fileSplitterInput.setup(testMeta.context);
-    window1TestHelper();
+    validateFileMetadataInWindow1();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
@@ -326,29 +326,34 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.teardown();
   }
 
-  private void blocksTestHelper() throws InterruptedException
+
+  private int getTotalNumOfBlocks(int numFiles, long blockLength)
+  {
+    int noOfBlocks = 0;
+    for (int i = 0; i < numFiles; i++) {
+      File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
+      noOfBlocks += (int)Math.ceil(testFile.length() / (blockLength * 1.0));
+    }
+    return noOfBlocks;
+  }
+
+  private void validateBlocks(long targetWindow, long blockLength) throws 
InterruptedException
   {
     testMeta.fileSplitterInput.beginWindow(1);
-    testMeta.scanner.semaphore.acquire();
+    
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12);
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
     Assert.assertEquals("Blocks", 10, 
testMeta.blockMetadataSink.collectedTuples.size());
 
-    for (int window = 2; window < 8; window++) {
+    for (int window = 2; window <= targetWindow; window++) {
       testMeta.fileSplitterInput.beginWindow(window);
       testMeta.fileSplitterInput.emitTuples();
       testMeta.fileSplitterInput.endWindow();
     }
 
-    int noOfBlocks = 0;
-    for (int i = 0; i < 12; i++) {
-      File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
-      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
-    }
-
     Assert.assertEquals("Files", 12, 
testMeta.fileMetadataSink.collectedTuples.size());
-    Assert.assertEquals("Blocks", noOfBlocks, 
testMeta.blockMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, blockLength), 
testMeta.blockMetadataSink.collectedTuples.size());
   }
 
   @Test
@@ -357,13 +362,13 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.setBlockSize(2L);
     testMeta.fileSplitterInput.setBlocksThreshold(10);
     testMeta.fileSplitterInput.setup(testMeta.context);
-    blocksTestHelper();
+    validateBlocks(8, 2);
     testMeta.fileSplitterInput.teardown();
   }
 
-  private void recoveryTestHelper() throws InterruptedException
+  private void validateRecovery(long targetWindow, long blockLength) throws 
InterruptedException
   {
-    blocksTestHelper();
+    validateBlocks(targetWindow, blockLength);
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
     testMeta.fileSplitterInput.teardown();
@@ -372,11 +377,12 @@ public class FileSplitterInputTest
     testMeta.resetSinks();
 
     testMeta.fileSplitterInput.setup(testMeta.context);
-    for (int i = 1; i < 8; i++) {
+    for (int i = 1; i <= targetWindow; i++) {
       testMeta.fileSplitterInput.beginWindow(i);
     }
     Assert.assertEquals("Files", 12, 
testMeta.fileMetadataSink.collectedTuples.size());
-    Assert.assertEquals("Blocks", 62, 
testMeta.blockMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, blockLength),
+        testMeta.blockMetadataSink.collectedTuples.size());
   }
 
   @Test
@@ -386,7 +392,7 @@ public class FileSplitterInputTest
     testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10);
 
     testMeta.fileSplitterInput.setup(testMeta.context);
-    recoveryTestHelper();
+    validateRecovery(8, 2);
     testMeta.fileSplitterInput.teardown();
   }
 
@@ -397,7 +403,7 @@ public class FileSplitterInputTest
     testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10);
     testMeta.fileSplitterInput.setup(testMeta.context);
 
-    recoveryTestHelper();
+    validateRecovery(8, 2);
 
     Thread.sleep(1000);
     HashSet<String> lines = Sets.newHashSet();
@@ -411,12 +417,12 @@ public class FileSplitterInputTest
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
-    testMeta.fileSplitterInput.beginWindow(8);
+    testMeta.fileSplitterInput.beginWindow(9);
     ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
-
-    Assert.assertEquals("Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Files " + testMeta.fileMetadataSink.collectedTuples, 
1,
+        testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Blocks", 6, 
testMeta.blockMetadataSink.collectedTuples.size());
     testMeta.fileSplitterInput.teardown();
   }
@@ -433,13 +439,14 @@ public class FileSplitterInputTest
 
     testMeta.fileSplitterInput.beginWindow(1);
 
-    testMeta.scanner.semaphore.acquire();
+    testMeta.scanner.semaphore.acquire(12);
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
-    //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are 
emitted.
+    //fileX.txt has just 6 blocks. Since blocks threshold is 2, only 2 are 
emitted.
     Assert.assertEquals("Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Blocks", 2, 
testMeta.blockMetadataSink.collectedTuples.size());
+    AbstractFileSplitter.FileMetadata fileX = 
testMeta.fileMetadataSink.collectedTuples.get(0);
 
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
@@ -453,46 +460,57 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
 
+    //fileX is recovered and first two blocks are repeated.
     Assert.assertEquals("Recovered Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
+    AbstractFileSplitter.FileMetadata fileXRecovered = 
testMeta.fileMetadataSink.collectedTuples.get(0);
+    Assert.assertEquals("recovered file-metadata", fileX.getFileName(), 
fileXRecovered.getFileName());
+
     Assert.assertEquals("Recovered Blocks", 2, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.endWindow();
+
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
 
     testMeta.fileSplitterInput.beginWindow(2);
-    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.emitTuples(); //next 2 blocks of fileX
     testMeta.fileSplitterInput.endWindow();
 
-    Assert.assertEquals("Blocks", 4, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.beginWindow(3);
+    testMeta.fileSplitterInput.emitTuples(); //next 2 blocks of fileX
+    testMeta.fileSplitterInput.endWindow();
 
-    String file1 = 
testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
+    //Next 2 blocks of fileX
+    Assert.assertEquals("File", 0, 
testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 4, 
testMeta.blockMetadataSink.collectedTuples.size());
 
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
-    testMeta.fileSplitterInput.beginWindow(3);
-    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    testMeta.fileSplitterInput.beginWindow(4);
+    
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(11);
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
+    //2 blocks of a different file
     Assert.assertEquals("New file", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Blocks", 2, 
testMeta.blockMetadataSink.collectedTuples.size());
 
-    String file2 = 
testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
+    AbstractFileSplitter.FileMetadata fileY = 
testMeta.fileMetadataSink.collectedTuples.get(0);
 
-    Assert.assertTrue("Block file name 0",
-        
testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
-    Assert.assertTrue("Block file name 1",
-        
testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
-    testMeta.fileSplitterInput.teardown();
+    for (BlockMetadata.FileBlockMetadata blockMetadata : 
testMeta.blockMetadataSink.collectedTuples) {
+      Assert.assertTrue("Block file name", 
blockMetadata.getFilePath().endsWith(fileY.getFileName()));
+      testMeta.fileSplitterInput.teardown();
+    }
   }
 
   @Test
   public void testRecursive() throws InterruptedException, IOException
   {
     testMeta.fileSplitterInput.setup(testMeta.context);
-    window1TestHelper();
+    validateFileMetadataInWindow1();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
-    Thread.sleep(1000);
     //added a new relativeFilePath
     File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt");
     HashSet<String> lines = Sets.newHashSet();
@@ -500,14 +518,15 @@ public class FileSplitterInputTest
       lines.add("f13" + "l" + line);
     }
     FileUtils.write(f13, StringUtils.join(lines, '\n'));
-
     //window 2
     testMeta.fileSplitterInput.beginWindow(2);
-    testMeta.scanner.semaphore.acquire();
+    testMeta.scanner.semaphore.acquire(2);
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
-    Assert.assertEquals("window 2: files", 2, 
testMeta.fileMetadataSink.collectedTuples.size());
+    //one for the folder "child" and one for "file13"
+    Assert.assertEquals("window 2: files " + 
testMeta.fileMetadataSink.collectedTuples, 2,
+        testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("window 2: blocks", 1, 
testMeta.blockMetadataSink.collectedTuples.size());
     testMeta.fileSplitterInput.teardown();
   }
@@ -539,7 +558,7 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
 
-    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    testMeta.scanner.semaphore.acquire(12);
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
@@ -570,25 +589,29 @@ public class FileSplitterInputTest
   @Test
   public void testFileModificationTest() throws InterruptedException, 
IOException, TimeoutException
   {
+    File file11 = new File(testMeta.dataDirectory, "file11.txt");
+    long lastModifiedTime = file11.lastModified();
+    LOG.debug("file 11 modified time {} ", lastModifiedTime);
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
     testMeta.fileSplitterInput.setup(testMeta.context);
-    window1TestHelper();
+    validateFileMetadataInWindow1();
 
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
     Thread.sleep(1000);
-    //change a file , this will not change mtime of the file.
-    File f12 = new File(testMeta.dataDirectory, "file11" + ".txt");
+    //create more lines to append to the file
     HashSet<String> lines = Sets.newHashSet();
     for (int line = 0; line < 2; line++) {
-      lines.add("f13" + "l" + line);
+      lines.add("f11" + "l" + line);
     }
     /* Need to use FileWriter, FileUtils changes the directory timestamp when
        file is changed. */
-    FileWriter fout = new FileWriter(f12, true);
+    FileWriter fout = new FileWriter(file11, true);
     fout.write(StringUtils.join(lines, '\n').toCharArray());
     fout.close();
+
+    LOG.debug("file 11 modified time after append {} ", file11.lastModified());
     testMeta.fileSplitterInput.getScanner().setTrigger(true);
 
     //window 2
@@ -604,10 +627,7 @@ public class FileSplitterInputTest
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
     testMeta.scanner.setTrigger(true);
-    testMeta.scanner.semaphore.release();
     testMeta.fileSplitterInput.beginWindow(3);
-    Thread.sleep(1000);
-    testMeta.scanner.semaphore.acquire();
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
@@ -640,7 +660,7 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.setup(testMeta.context);
 
     testMeta.fileSplitterInput.beginWindow(1);
-    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(15);
 
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
@@ -677,22 +697,16 @@ public class FileSplitterInputTest
 
   private static class MockScanner extends 
FileSplitterInput.TimeBasedDirectoryScanner
   {
-    transient Semaphore semaphore;
 
-    private MockScanner()
-    {
-      super();
-      this.semaphore = new Semaphore(0);
-    }
+    private final transient Semaphore semaphore = new Semaphore(0);
 
     @Override
-    protected void scanIterationComplete()
+    protected void processDiscoveredFile(FileSplitterInput.ScannedFileInfo 
info)
     {
-      if (getNumDiscoveredPerIteration() > 0) {
-        semaphore.release();
-      }
-      super.scanIterationComplete();
+      super.processDiscoveredFile(info);
+      semaphore.release();
     }
+
   }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitterInputTest.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
index 24ab938..63a726d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
@@ -401,33 +401,6 @@ public class FileSplitterTest
   }
 
   @Test
-  public void testRecursive() throws InterruptedException, IOException
-  {
-    testMeta.fileSplitter.scanner.regex = null;
-    testFileMetadata();
-    testMeta.fileMetadataSink.clear();
-    testMeta.blockMetadataSink.clear();
-
-    Thread.sleep(1000);
-    //added a new relativeFilePath
-    File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt");
-    HashSet<String> lines = Sets.newHashSet();
-    for (int line = 0; line < 2; line++) {
-      lines.add("f13" + "l" + line);
-    }
-    FileUtils.write(f13, StringUtils.join(lines, '\n'));
-
-    //window 2
-    testMeta.fileSplitter.beginWindow(2);
-    testMeta.exchanger.exchange(null);
-    testMeta.fileSplitter.emitTuples();
-    testMeta.fileSplitter.endWindow();
-
-    Assert.assertEquals("window 2: files", 2, 
testMeta.fileMetadataSink.collectedTuples.size());
-    Assert.assertEquals("window 2: blocks", 1, 
testMeta.blockMetadataSink.collectedTuples.size());
-  }
-
-  @Test
   public void testSingleFile() throws InterruptedException, IOException
   {
     testMeta.fileSplitter.teardown();

Reply via email to