Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 d910102d7 -> 59f21fb5d
MLHR-1841 #comment rotating all files irrespective of the state of stream Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b8a10bc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b8a10bc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b8a10bc5 Branch: refs/heads/devel-3 Commit: b8a10bc5381bd8a004a2af4f5c0812e379c82d44 Parents: d910102 Author: Chandni Singh <[email protected]> Authored: Tue Sep 8 14:21:04 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Tue Sep 8 14:52:34 2015 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 4 +-- .../io/fs/AbstractFileOutputOperatorTest.java | 38 ++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index aef0739..8339cc1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -868,9 +868,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp if (++rotationCount == rotationWindows) { rotationCount = 0; // Rotate the files - Iterator<String> iterator = streamsCache.asMap().keySet().iterator(); + Iterator<Map.Entry<String, MutableInt>> iterator = openPart.entrySet().iterator(); while (iterator.hasNext()) { - String filename = iterator.next(); + String filename = iterator.next().getKey(); // Rotate the file if the following conditions are met // 1. The file is not already rotated during this period for other reasons such as max length is reached // or rotate was explicitly called externally http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index b900af2..dd6bb1d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -1809,6 +1809,44 @@ public class AbstractFileOutputOperatorTest } @Test + public void testPeriodicRotationWithEviction() throws InterruptedException + { + EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); + File dir = new File(testMeta.getDir()); + writer.setFilePath(testMeta.getDir()); + writer.setRotationWindows(30); + writer.setAlwaysWriteToTmp(true); + writer.setExpireStreamAfterAccessMillis(1L); + writer.setup(testMeta.testOperatorContext); + + // Check that rotation for even.txt.0 happens + for (int i = 0; i < 30; ++i) { + writer.beginWindow(i); + if (i == 0) { + writer.input.put(i); + } + Thread.sleep(100L); + writer.endWindow(); + } + writer.committed(29); + Set<String> fileNames = new TreeSet<>(); + fileNames.add(EVEN_FILE + ".0"); + Collection<File> files = FileUtils.listFiles(dir, null, false); + Assert.assertEquals("Number of part files", 1, files.size()); + Assert.assertEquals("Part file names", fileNames, getFileNames(files)); + + // Check that rotation doesn't happen for files that don't have data during the rotation period + for (int i = 30; i < 120; ++i) { + writer.beginWindow(i); + writer.endWindow(); + } + writer.committed(119); + files = FileUtils.listFiles(dir, null, false); + Assert.assertEquals("Number of part files", 1, files.size()); + Assert.assertEquals("Part file names", fileNames, getFileNames(files)); + } + + @Test public void testCompression() throws IOException { EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
