Repository: apex-malhar
Updated Branches:
  refs/heads/master cf60959a7 -> 8486493a0


APEXMALHAR-2394 adding check if already been rotated in 
AbstractFileOutputOperator

Adding test that empty windows do not cause new file creation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4587a55c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4587a55c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4587a55c

Branch: refs/heads/master
Commit: 4587a55c0bc7b178ea6fc13a49db4cd7b1ac1ebb
Parents: cf896b0
Author: Oliver W <[email protected]>
Authored: Tue Jan 24 12:20:09 2017 -0800
Committer: Oliver Winke <[email protected]>
Committed: Mon Feb 20 16:54:29 2017 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 24 +++++-----
 .../io/fs/AbstractFileOutputOperatorTest.java   | 46 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4587a55c/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 85d5f70..263efea 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -879,17 +879,19 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
    */
   protected void rotate(String fileName) throws IllegalArgumentException, 
IOException, ExecutionException
   {
-    requestFinalize(fileName);
-    counts.remove(fileName);
-    streamsCache.invalidate(fileName);
-    MutableInt mi = openPart.get(fileName);
-    LOG.debug("Part file rotated {} : {}", fileName, mi.getValue());
-
-    //TODO: remove this as rotateHook is deprecated.
-    String partFileName = getPartFileName(fileName, mi.getValue());
-    rotateHook(partFileName);
-
-    getRotationState(fileName).rotated = true;
+    if (!this.getRotationState(fileName).rotated) {
+      requestFinalize(fileName);
+      counts.remove(fileName);
+      streamsCache.invalidate(fileName);
+      MutableInt mi = openPart.get(fileName);
+      LOG.debug("Part file rotated {} : {}", fileName, mi.getValue());
+
+      //TODO: remove this as rotateHook is deprecated.
+      String partFileName = getPartFileName(fileName, mi.getValue());
+      rotateHook(partFileName);
+
+      getRotationState(fileName).rotated = true;
+    }
   }
 
   private RotationState getRotationState(String fileName)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4587a55c/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 8f0fbb0..38319e5 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -781,6 +781,52 @@ public class AbstractFileOutputOperatorTest
   }
 
   @Test
+  public void testSingleRollingFileEmptyWindowsWrite()
+  {
+    SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
+
+    testSingleRollingFileEmptyWindowsWriteHelper(writer);
+
+    //Rolling file 0
+
+    String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
+
+    int numberOfFiles = new File(testMeta.getDir()).listFiles().length;
+
+    Assert.assertEquals("More than one File in Directory", 1, numberOfFiles);
+
+    String correctContents = "0\n" + "1\n" + "2\n";
+    checkOutput(0, singleFileName, correctContents);
+  }
+
+  private void 
testSingleRollingFileEmptyWindowsWriteHelper(SingleHDFSExactlyOnceWriter writer)
+  {
+    writer.setFilePath(testMeta.getDir());
+    writer.setMaxLength(4);
+    writer.setRotationWindows(1);
+    writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
+    writer.setup(testMeta.testOperatorContext);
+
+    writer.beginWindow(0);
+    writer.input.put(0);
+    writer.input.put(1);
+    writer.input.put(2);
+    writer.endWindow();
+
+    writer.beginWindow(1);
+    writer.endWindow();
+
+    writer.beginWindow(2);
+    writer.endWindow();
+
+    writer.beforeCheckpoint(2);
+    writer.checkpointed(2);
+    writer.committed(2);
+
+    writer.teardown();
+  }
+
+  @Test
   public void testSingleRollingFileFailedWrite()
   {
     SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();

Reply via email to