APEXMALHAR-2004: Add file's modification time in referenceTimes map instead of parent's
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/327a3999 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/327a3999 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/327a3999 Branch: refs/heads/devel-3 Commit: 327a3999c6c443894bc1e085d91a1f030a848bdd Parents: d3a7063 Author: Tushar R. Gosavi <[email protected]> Authored: Sat Feb 27 18:34:38 2016 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Fri Mar 11 14:38:34 2016 +0530 ---------------------------------------------------------------------- .../lib/io/fs/FileSplitterInput.java | 8 ++-- .../lib/io/fs/FileSplitterInputTest.java | 47 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index ab70047..234650d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -434,17 +434,17 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, - @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath) + FileStatus childStatus, Path rootPath) { ScannedFileInfo info; if (rootPath == null) { info = parentStatus.isDirectory() ? - new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) : - new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime()); + new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), childStatus.getModificationTime()) : + new ScannedFileInfo(null, childPath.toUri().getPath(), childStatus.getModificationTime()); } else { URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri()); info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(), - parentStatus.getModificationTime()); + childStatus.getModificationTime()); } return info; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index cd0de2d..c5d2ae7 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -20,6 +20,7 @@ package com.datatorrent.lib.io.fs; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.util.HashSet; import java.util.Set; @@ -498,6 +499,52 @@ public class FileSplitterInputTest Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); } + @Test + public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException + { + testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000); + testFileMetadata(); + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + Thread.sleep(1000); + //change a file , this will not change mtime of the file. + File f12 = new File(testMeta.dataDirectory, "file11" + ".txt"); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 2; line++) { + lines.add("f13" + "l" + line); + } + /* Need to use FileWriter, FileUtils changes the directory timestamp when + file is changed. */ + FileWriter fout = new FileWriter(f12, true); + fout.write(StringUtils.join(lines, '\n').toCharArray()); + fout.close(); + testMeta.fileSplitterInput.getScanner().setTrigger(true); + + //window 2 + testMeta.fileSplitterInput.beginWindow(2); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + + //window 3 + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + testMeta.scanner.setTrigger(true); + testMeta.scanner.semaphore.release(); + testMeta.fileSplitterInput.beginWindow(3); + Thread.sleep(1000); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("window 2: files", 0, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("window 2: blocks", 0, testMeta.blockMetadataSink.collectedTuples.size()); + + } private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner {
