Repository: apex-malhar Updated Branches: refs/heads/master 791383bd1 -> 53cb1836f
APEXMALHAR-1852 fix the recovery of partial file test Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9febcc0c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9febcc0c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9febcc0c Branch: refs/heads/master Commit: 9febcc0ca1f0e18820ed3029048c3219bf5e5a3e Parents: 9f04db7 Author: Chandni Singh <[email protected]> Authored: Fri Oct 7 11:56:05 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Sun Oct 9 20:52:57 2016 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileSplitter.java | 1 - .../lib/io/fs/FileSplitterInput.java | 81 ++++------ .../apex/malhar/lib/wal/FileSystemWAL.java | 2 +- .../lib/io/fs/FileSplitterBaseTest.java | 2 +- .../lib/io/fs/FileSplitterInputTest.java | 158 ++++++++++--------- .../datatorrent/lib/io/fs/FileSplitterTest.java | 27 ---- 6 files changed, 122 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java index 38c8e96..c002c18 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -192,7 +192,6 @@ public abstract class AbstractFileSplitter extends BaseOperator { LOG.debug("file {}", fileInfo.getFilePath()); FileMetadata fileMetadata = createFileMetadata(fileInfo); - LOG.debug("fileMetadata {}", fileMetadata); Path path = new Path(fileInfo.getFilePath()); fileMetadata.setFileName(path.getName()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index 3763ef0..2d290cd 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -18,7 +18,6 @@ */ package com.datatorrent.lib.io.fs; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -27,6 +26,7 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -52,9 +52,9 @@ import org.apache.hadoop.fs.Path; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.datatorrent.api.Component; import com.datatorrent.api.Context; @@ -62,7 +62,6 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.netlet.util.DTThrowable; /** * Input operator that scans a directory for files and splits a file into blocks.<br/> @@ -90,8 +89,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper private Map<String, Map<String, Long>> referenceTimes; - private transient long sleepMillis; - public FileSplitterInput() { super(); @@ -104,9 +101,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper { currentWindowRecoveryState = Lists.newLinkedList(); if (referenceTimes == null) { - referenceTimes = Maps.newHashMap(); + referenceTimes = new ConcurrentHashMap<>(); } - sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); scanner.setup(context); windowDataManager.setup(context); super.setup(context); @@ -166,14 +162,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper Throwable throwable; if ((throwable = scanner.atomicThrowable.get()) != null) { - DTThrowable.rethrow(throwable); - } - if (blockMetadataIterator == null && scanner.discoveredFiles.isEmpty()) { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException e) { - throw new RuntimeException("waiting for work", e); - } + Throwables.propagate(throwable); } process(); } @@ -198,7 +187,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper Map<String, Long> referenceTimePerInputDir; String referenceKey = fileInfo.getDirectoryPath() == null ? fileInfo.getFilePath() : fileInfo.getDirectoryPath(); if ((referenceTimePerInputDir = referenceTimes.get(referenceKey)) == null) { - referenceTimePerInputDir = Maps.newHashMap(); + referenceTimePerInputDir = new ConcurrentHashMap<>(); } referenceTimePerInputDir.put(fileInfo.getFilePath(), fileInfo.modifiedTime); referenceTimes.put(referenceKey, referenceTimePerInputDir); @@ -389,7 +378,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper LOG.error("service", throwable); running = false; atomicThrowable.set(throwable); - DTThrowable.rethrow(throwable); + Throwables.propagate(throwable); } } @@ -415,46 +404,45 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper lastScanMillis = System.currentTimeMillis(); } - protected void scan(@NotNull Path filePath, Path rootPath) + /** + * This is not used anywhere and should be removed. however, currently it breaks backward compatibility, so + * just deprecating it. + */ + @Deprecated + protected void scan(@NotNull Path filePath, Path rootPath) throws IOException { Map<String, Long> lastModifiedTimesForInputDir; lastModifiedTimesForInputDir = referenceTimes.get(filePath.toUri().getPath()); scan(filePath, rootPath, lastModifiedTimesForInputDir); } - private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir) + private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir) throws IOException { - try { - FileStatus parentStatus = fs.getFileStatus(filePath); - String parentPathStr = filePath.toUri().getPath(); + FileStatus parentStatus = fs.getFileStatus(filePath); + String parentPathStr = filePath.toUri().getPath(); - LOG.debug("scan {}", parentPathStr); + LOG.debug("scan {}", parentPathStr); - FileStatus[] childStatuses = fs.listStatus(filePath); + FileStatus[] childStatuses = fs.listStatus(filePath); - if (childStatuses.length == 0 && rootPath == null && (lastModifiedTimesForInputDir == null || lastModifiedTimesForInputDir.get(parentPathStr) == null)) { // empty input directory copy as is - ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime()); - processDiscoveredFile(info); - } - - for (FileStatus childStatus : childStatuses) { - Path childPath = childStatus.getPath(); - String childPathStr = childPath.toUri().getPath(); + if (childStatuses.length == 0 && rootPath == null && (lastModifiedTimesForInputDir == null || lastModifiedTimesForInputDir.get(parentPathStr) == null)) { // empty input directory copy as is + ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime()); + processDiscoveredFile(info); + } - if (childStatus.isDirectory() && isRecursive()) { - addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir); - scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir); - } else if (acceptFile(childPathStr)) { - addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir); - } else { - // don't look at it again - ignoredFiles.add(childPathStr); - } + for (FileStatus childStatus : childStatuses) { + Path childPath = childStatus.getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (childStatus.isDirectory() && isRecursive()) { + addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir); + scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir); + } else if (acceptFile(childPathStr)) { + addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir); + } else { + // don't look at it again + ignoredFiles.add(childPathStr); } - } catch (FileNotFoundException fnf) { - LOG.warn("Failed to list directory {}", filePath, fnf); - } catch (IOException e) { - throw new RuntimeException("listing files", e); } } @@ -480,8 +468,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper ScannedFileInfo info = createScannedFileInfo(parentStatus.getPath(), parentStatus, childPath, childStatus, rootPath); - - LOG.debug("Processing file: " + info.getFilePath()); processDiscoveredFile(info); } @@ -490,6 +476,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper numDiscoveredPerIteration++; lastScannedInfo = info; discoveredFiles.add(info); + LOG.debug("discovered {} {}", info.getFilePath(), info.modifiedTime); } protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java index 49f61a4..1ae039b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java @@ -402,8 +402,8 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil isOpenPathTmp = false; } - LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, walPointer); if (fileSystemWAL.fileContext.util().exists(pathToReadFrom)) { + LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, walPointer); DataInputStream stream = fileSystemWAL.fileContext.open(pathToReadFrom); if (walPointer.offset > 0) { stream.skip(walPointer.offset); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java index 862e589..58cf0a5 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java @@ -184,7 +184,7 @@ public class FileSplitterBaseTest Assert.assertEquals("Blocks", 10, baseTestMeta.blockMetadataSink.collectedTuples.size()); - for (int window = 2; window < 8; window++) { + for (int window = 2; window <= 8; window++) { baseTestMeta.fileSplitter.beginWindow(window); baseTestMeta.fileSplitter.handleIdleTime(); baseTestMeta.fileSplitter.endWindow(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index acbf93b..df45778 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -64,7 +64,8 @@ public class FileSplitterInputTest for (int file = 0; file < 12; file++) { HashSet<String> lines = Sets.newHashSet(); for (int line = 0; line < 2; line++) { - lines.add("f" + file + "l" + line); + //padding 0 to file number so every file has 6 blocks. + lines.add("f" + String.format("%02d", file) + "l" + line); } allLines.addAll(lines); File created = new File(dataDirectory, "file" + file + ".txt"); @@ -101,7 +102,7 @@ public class FileSplitterInputTest fileSplitterInput = new FileSplitterInput(); fileSplitterInput.setBlocksThreshold(100); scanner = new MockScanner(); - scanner.setScanIntervalMillis(500); + scanner.setScanIntervalMillis(100); scanner.setFilePatternRegularExp(".*[.]txt"); scanner.setFiles(dataDirectory); fileSplitterInput.setScanner(scanner); @@ -142,10 +143,10 @@ public class FileSplitterInputTest @Rule public TestMeta testMeta = new TestMeta(); - private void window1TestHelper() throws InterruptedException + private void validateFileMetadataInWindow1() throws InterruptedException { testMeta.fileSplitterInput.beginWindow(1); - testMeta.scanner.semaphore.acquire(); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); @@ -165,7 +166,7 @@ public class FileSplitterInputTest public void testFileMetadata() throws InterruptedException { testMeta.fileSplitterInput.setup(testMeta.context); - window1TestHelper(); + validateFileMetadataInWindow1(); testMeta.fileSplitterInput.teardown(); } @@ -187,8 +188,6 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.endWindow(); testMeta.fileSplitterInput.beginWindow(2); - testMeta.scanner.semaphore.release(); - testMeta.scanner.semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); @@ -208,7 +207,7 @@ public class FileSplitterInputTest { testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); - testMeta.scanner.semaphore.acquire(); + testMeta.scanner.semaphore.acquire(12); testMeta.fileSplitterInput.emitTuples(); Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size()); @@ -226,7 +225,7 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); - testMeta.scanner.semaphore.acquire(); + testMeta.scanner.semaphore.acquire(12); testMeta.fileSplitterInput.emitTuples(); Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); @@ -249,7 +248,7 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.setup(testMeta.context); //will emit window 1 from data directory - window1TestHelper(); + validateFileMetadataInWindow1(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); testMeta.fileSplitterInput.teardown(); @@ -271,7 +270,7 @@ public class FileSplitterInputTest public void testTimeScan() throws InterruptedException, IOException, TimeoutException { testMeta.fileSplitterInput.setup(testMeta.context); - window1TestHelper(); + validateFileMetadataInWindow1(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -290,7 +289,8 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); - Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("window 2: files " + testMeta.fileMetadataSink.collectedTuples, 1, + testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); testMeta.fileSplitterInput.teardown(); } @@ -301,7 +301,7 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000); testMeta.fileSplitterInput.setup(testMeta.context); - window1TestHelper(); + validateFileMetadataInWindow1(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -326,29 +326,34 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.teardown(); } - private void blocksTestHelper() throws InterruptedException + + private int getTotalNumOfBlocks(int numFiles, long blockLength) + { + int noOfBlocks = 0; + for (int i = 0; i < numFiles; i++) { + File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt"); + noOfBlocks += (int)Math.ceil(testFile.length() / (blockLength * 1.0)); + } + return noOfBlocks; + } + + private void validateBlocks(long targetWindow, long blockLength) throws InterruptedException { testMeta.fileSplitterInput.beginWindow(1); - testMeta.scanner.semaphore.acquire(); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); Assert.assertEquals("Blocks", 10, testMeta.blockMetadataSink.collectedTuples.size()); - for (int window = 2; window < 8; window++) { + for (int window = 2; window <= targetWindow; window++) { testMeta.fileSplitterInput.beginWindow(window); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); } - int noOfBlocks = 0; - for (int i = 0; i < 12; i++) { - File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt"); - noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); - } - Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); - Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, blockLength), testMeta.blockMetadataSink.collectedTuples.size()); } @Test @@ -357,13 +362,13 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.setBlockSize(2L); testMeta.fileSplitterInput.setBlocksThreshold(10); testMeta.fileSplitterInput.setup(testMeta.context); - blocksTestHelper(); + validateBlocks(8, 2); testMeta.fileSplitterInput.teardown(); } - private void recoveryTestHelper() throws InterruptedException + private void validateRecovery(long targetWindow, long blockLength) throws InterruptedException { - blocksTestHelper(); + validateBlocks(targetWindow, blockLength); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); testMeta.fileSplitterInput.teardown(); @@ -372,11 +377,12 @@ public class FileSplitterInputTest testMeta.resetSinks(); testMeta.fileSplitterInput.setup(testMeta.context); - for (int i = 1; i < 8; i++) { + for (int i = 1; i <= targetWindow; i++) { testMeta.fileSplitterInput.beginWindow(i); } Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); - Assert.assertEquals("Blocks", 62, testMeta.blockMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, blockLength), + testMeta.blockMetadataSink.collectedTuples.size()); } @Test @@ -386,7 +392,7 @@ public class FileSplitterInputTest testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10); testMeta.fileSplitterInput.setup(testMeta.context); - recoveryTestHelper(); + validateRecovery(8, 2); testMeta.fileSplitterInput.teardown(); } @@ -397,7 +403,7 @@ public class FileSplitterInputTest testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10); testMeta.fileSplitterInput.setup(testMeta.context); - recoveryTestHelper(); + validateRecovery(8, 2); Thread.sleep(1000); HashSet<String> lines = Sets.newHashSet(); @@ -411,12 +417,12 @@ public class FileSplitterInputTest testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); - testMeta.fileSplitterInput.beginWindow(8); + testMeta.fileSplitterInput.beginWindow(9); ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); - - Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Files " + testMeta.fileMetadataSink.collectedTuples, 1, + testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Blocks", 6, testMeta.blockMetadataSink.collectedTuples.size()); testMeta.fileSplitterInput.teardown(); } @@ -433,13 +439,14 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.beginWindow(1); - testMeta.scanner.semaphore.acquire(); + testMeta.scanner.semaphore.acquire(12); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); - //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted. + //fileX.txt has just 6 blocks. Since blocks threshold is 2, only 2 are emitted. Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + AbstractFileSplitter.FileMetadata fileX = testMeta.fileMetadataSink.collectedTuples.get(0); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -453,46 +460,57 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); + //fileX is recovered and first two blocks are repeated. Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + AbstractFileSplitter.FileMetadata fileXRecovered = testMeta.fileMetadataSink.collectedTuples.get(0); + Assert.assertEquals("recovered file-metadata", fileX.getFileName(), fileXRecovered.getFileName()); + Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.endWindow(); + + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); testMeta.fileSplitterInput.beginWindow(2); - testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.emitTuples(); //next 2 blocks of fileX testMeta.fileSplitterInput.endWindow(); - Assert.assertEquals("Blocks", 4, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.beginWindow(3); + testMeta.fileSplitterInput.emitTuples(); //next 2 blocks of fileX + testMeta.fileSplitterInput.endWindow(); - String file1 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName(); + //Next 2 blocks of fileX + Assert.assertEquals("File", 0, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", 4, testMeta.blockMetadataSink.collectedTuples.size()); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); - testMeta.fileSplitterInput.beginWindow(3); - ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + testMeta.fileSplitterInput.beginWindow(4); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(11); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); + //2 blocks of a different file Assert.assertEquals("New file", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); - String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName(); + AbstractFileSplitter.FileMetadata fileY = testMeta.fileMetadataSink.collectedTuples.get(0); - Assert.assertTrue("Block file name 0", - testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1)); - Assert.assertTrue("Block file name 1", - testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2)); - testMeta.fileSplitterInput.teardown(); + for (BlockMetadata.FileBlockMetadata blockMetadata : testMeta.blockMetadataSink.collectedTuples) { + Assert.assertTrue("Block file name", blockMetadata.getFilePath().endsWith(fileY.getFileName())); + testMeta.fileSplitterInput.teardown(); + } } @Test public void testRecursive() throws InterruptedException, IOException { testMeta.fileSplitterInput.setup(testMeta.context); - window1TestHelper(); + validateFileMetadataInWindow1(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); - Thread.sleep(1000); //added a new relativeFilePath File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt"); HashSet<String> lines = Sets.newHashSet(); @@ -500,14 +518,15 @@ public class FileSplitterInputTest lines.add("f13" + "l" + line); } FileUtils.write(f13, StringUtils.join(lines, '\n')); - //window 2 testMeta.fileSplitterInput.beginWindow(2); - testMeta.scanner.semaphore.acquire(); + testMeta.scanner.semaphore.acquire(2); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); - Assert.assertEquals("window 2: files", 2, testMeta.fileMetadataSink.collectedTuples.size()); + //one for the folder "child" and one for "file13" + Assert.assertEquals("window 2: files " + testMeta.fileMetadataSink.collectedTuples, 2, + testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); testMeta.fileSplitterInput.teardown(); } @@ -539,7 +558,7 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); - ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + testMeta.scanner.semaphore.acquire(12); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); @@ -570,25 +589,29 @@ public class FileSplitterInputTest @Test public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException { + File file11 = new File(testMeta.dataDirectory, "file11.txt"); + long lastModifiedTime = file11.lastModified(); + LOG.debug("file 11 modified time {} ", lastModifiedTime); testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000); testMeta.fileSplitterInput.setup(testMeta.context); - window1TestHelper(); + validateFileMetadataInWindow1(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); Thread.sleep(1000); - //change a file , this will not change mtime of the file. - File f12 = new File(testMeta.dataDirectory, "file11" + ".txt"); + //create more lines to append to the file HashSet<String> lines = Sets.newHashSet(); for (int line = 0; line < 2; line++) { - lines.add("f13" + "l" + line); + lines.add("f11" + "l" + line); } /* Need to use FileWriter, FileUtils changes the directory timestamp when file is changed. */ - FileWriter fout = new FileWriter(f12, true); + FileWriter fout = new FileWriter(file11, true); fout.write(StringUtils.join(lines, '\n').toCharArray()); fout.close(); + + LOG.debug("file 11 modified time after append {} ", file11.lastModified()); testMeta.fileSplitterInput.getScanner().setTrigger(true); //window 2 @@ -604,10 +627,7 @@ public class FileSplitterInputTest testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); testMeta.scanner.setTrigger(true); - testMeta.scanner.semaphore.release(); testMeta.fileSplitterInput.beginWindow(3); - Thread.sleep(1000); - testMeta.scanner.semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); @@ -640,7 +660,7 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); - ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(15); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); @@ -677,22 +697,16 @@ public class FileSplitterInputTest private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner { - transient Semaphore semaphore; - private MockScanner() - { - super(); - this.semaphore = new Semaphore(0); - } + private final transient Semaphore semaphore = new Semaphore(0); @Override - protected void scanIterationComplete() + protected void processDiscoveredFile(FileSplitterInput.ScannedFileInfo info) { - if (getNumDiscoveredPerIteration() > 0) { - semaphore.release(); - } - super.scanIterationComplete(); + super.processDiscoveredFile(info); + semaphore.release(); } + } private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9febcc0c/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java index 24ab938..63a726d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java @@ -401,33 +401,6 @@ public class FileSplitterTest } @Test - public void testRecursive() throws InterruptedException, IOException - { - testMeta.fileSplitter.scanner.regex = null; - testFileMetadata(); - testMeta.fileMetadataSink.clear(); - testMeta.blockMetadataSink.clear(); - - Thread.sleep(1000); - //added a new relativeFilePath - File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt"); - HashSet<String> lines = Sets.newHashSet(); - for (int line = 0; line < 2; line++) { - lines.add("f13" + "l" + line); - } - FileUtils.write(f13, StringUtils.join(lines, '\n')); - - //window 2 - testMeta.fileSplitter.beginWindow(2); - testMeta.exchanger.exchange(null); - testMeta.fileSplitter.emitTuples(); - testMeta.fileSplitter.endWindow(); - - Assert.assertEquals("window 2: files", 2, testMeta.fileMetadataSink.collectedTuples.size()); - Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); - } - - @Test public void testSingleFile() throws InterruptedException, IOException { testMeta.fileSplitter.teardown();
