Repository: apex-malhar Updated Branches: refs/heads/master cf60959a7 -> 8486493a0
APEXMALHAR-2394 adding check if already been rotated in AbstractFileOutputOperator Adding test that empty windows do not cause new file creation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4587a55c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4587a55c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4587a55c Branch: refs/heads/master Commit: 4587a55c0bc7b178ea6fc13a49db4cd7b1ac1ebb Parents: cf896b0 Author: Oliver W <[email protected]> Authored: Tue Jan 24 12:20:09 2017 -0800 Committer: Oliver Winke <[email protected]> Committed: Mon Feb 20 16:54:29 2017 -0800 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 24 +++++----- .../io/fs/AbstractFileOutputOperatorTest.java | 46 ++++++++++++++++++++ 2 files changed, 59 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4587a55c/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 85d5f70..263efea 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -879,17 +879,19 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp */ protected void rotate(String fileName) throws IllegalArgumentException, IOException, ExecutionException { - requestFinalize(fileName); - counts.remove(fileName); - streamsCache.invalidate(fileName); - MutableInt mi = openPart.get(fileName); - LOG.debug("Part file rotated {} : {}", fileName, mi.getValue()); - - //TODO: remove this as rotateHook is deprecated. - String partFileName = getPartFileName(fileName, mi.getValue()); - rotateHook(partFileName); - - getRotationState(fileName).rotated = true; + if (!this.getRotationState(fileName).rotated) { + requestFinalize(fileName); + counts.remove(fileName); + streamsCache.invalidate(fileName); + MutableInt mi = openPart.get(fileName); + LOG.debug("Part file rotated {} : {}", fileName, mi.getValue()); + + //TODO: remove this as rotateHook is deprecated. + String partFileName = getPartFileName(fileName, mi.getValue()); + rotateHook(partFileName); + + getRotationState(fileName).rotated = true; + } } private RotationState getRotationState(String fileName) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4587a55c/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 8f0fbb0..38319e5 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -781,6 +781,52 @@ public class AbstractFileOutputOperatorTest } @Test + public void testSingleRollingFileEmptyWindowsWrite() + { + SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter(); + + testSingleRollingFileEmptyWindowsWriteHelper(writer); + + //Rolling file 0 + + String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE; + + int numberOfFiles = new File(testMeta.getDir()).listFiles().length; + + Assert.assertEquals("More than one File in Directory", 1, numberOfFiles); + + String correctContents = "0\n" + "1\n" + "2\n"; + checkOutput(0, singleFileName, correctContents); + } + + private void testSingleRollingFileEmptyWindowsWriteHelper(SingleHDFSExactlyOnceWriter writer) + { + writer.setFilePath(testMeta.getDir()); + writer.setMaxLength(4); + writer.setRotationWindows(1); + writer.setAlwaysWriteToTmp(testMeta.writeToTmp); + writer.setup(testMeta.testOperatorContext); + + writer.beginWindow(0); + writer.input.put(0); + writer.input.put(1); + writer.input.put(2); + writer.endWindow(); + + writer.beginWindow(1); + writer.endWindow(); + + writer.beginWindow(2); + writer.endWindow(); + + writer.beforeCheckpoint(2); + writer.checkpointed(2); + writer.committed(2); + + writer.teardown(); + } + + @Test public void testSingleRollingFileFailedWrite() { SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
