Repository: apex-malhar
Updated Branches:
  refs/heads/master 2cf8bade8 -> 0f8442472


APEXMALHAR-2017 Fixed failing tests.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0f844247
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0f844247
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0f844247

Branch: refs/heads/master
Commit: 0f84424720268e1eebb06992bcda00d2e7d591d4
Parents: 28a7e34
Author: Lakshmi Prasanna Velineni <[email protected]>
Authored: Mon Oct 3 09:01:30 2016 -0700
Committer: Lakshmi Prasanna Velineni <[email protected]>
Committed: Wed Oct 19 22:28:34 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/contrib/hive/HiveMockTest.java  | 16 ++++++++++++-
 .../lib/io/fs/AbstractFileInputOperator.java    |  3 +--
 .../lib/io/fs/AbstractFileOutputOperator.java   |  6 +++--
 .../lib/io/fs/AbstractReconciler.java           |  2 +-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 25 ++++++++++----------
 .../AbstractWindowFileOutputOperatorTest.java   |  5 ++--
 .../lib/io/fs/FSInputModuleAppTest.java         |  1 +
 .../lib/fs/GenericFileOutputOperatorTest.java   |  6 +++++
 8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java 
b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
index 3c64bdf..4ec92c9 100755
--- a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
@@ -50,7 +50,6 @@ import 
com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitio
 import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 
-
 public class HiveMockTest extends HiveTestService
 {
   public static final String APP_ID = "HiveOperatorTest";
@@ -250,6 +249,11 @@ public class HiveMockTest extends HiveTestService
       }
 
       fsRolling.endWindow();
+
+      if (wid == 6) {
+        fsRolling.beforeCheckpoint(wid);
+        fsRolling.checkpointed(wid);
+      }
     }
 
     fsRolling.teardown();
@@ -353,6 +357,11 @@ public class HiveMockTest extends HiveTestService
       }
 
       fsRolling.endWindow();
+
+      if (wid == 6) {
+        fsRolling.beforeCheckpoint(wid);
+        fsRolling.checkpointed(wid);
+      }
     }
 
     fsRolling.teardown();
@@ -521,6 +530,11 @@ public class HiveMockTest extends HiveTestService
 
       fsRolling.endWindow();
 
+      if ((wid == 6) || (wid == 9)) {
+        fsRolling.beforeCheckpoint(wid);
+        fsRolling.checkpointed(wid);
+      }
+
       if (wid == 9) {
         Kryo kryo = new Kryo();
         FieldSerializer<HiveOperator> f1 = 
(FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 14cabfc..d4ee03b 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -37,7 +37,6 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import javax.validation.OverridesAttribute;
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
@@ -99,7 +98,7 @@ import com.datatorrent.lib.util.KryoCloneUtils;
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractFileInputOperator<T> implements InputOperator, 
Partitioner<AbstractFileInputOperator<T>>, StatsListener,
-  Operator.CheckpointListener, Operator.CheckpointNotificationListener
+    Operator.CheckpointListener, Operator.CheckpointNotificationListener
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFileInputOperator.class);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index f703a19..27a56cd 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -262,7 +262,7 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
   private Long expireStreamAfterAccessMillis;
   private final Set<String> filesWithOpenStreams;
 
-  private boolean initializeContext;
+  private transient boolean initializeContext;
 
   /**
    * This input port receives incoming tuples.
@@ -968,6 +968,9 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
   @Override
   public void beginWindow(long windowId)
   {
+    // All the filter state needs to be flushed to the disk. Not all filters 
allow a flush option, so the filters have
+    // to be closed and reopened. If no filter being is being used then it is 
a essentially a noop as the underlying
+    // FSDataOutputStream is not being closed in this operation.
     if (initializeContext) {
       try {
         Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
@@ -1240,7 +1243,6 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
         long start = System.currentTimeMillis();
         streamContext.finalizeContext();
         totalWritingTime += System.currentTimeMillis() - start;
-        //streamContext.resetFilter();
         // Re-initialize context when next window starts after checkpoint
         initializeContext = true;
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index c12becd..0d67c31 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.api.Operator.CheckpointNotificationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.NameableThreadFactory;
 import com.datatorrent.netlet.util.DTThrowable;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 5b58121..8f0fbb0 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -66,6 +66,7 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.RandomWordGenerator;
 import com.datatorrent.lib.util.TestUtils;
@@ -245,7 +246,7 @@ public class AbstractFileOutputOperatorTest
    */
   public static AbstractFileOutputOperator 
checkpoint(AbstractFileOutputOperator writer, long windowId)
   {
-    if (windowId >= 0) {
+    if (windowId >= Stateless.WINDOW_ID) {
       writer.beforeCheckpoint(windowId);
     }
     Kryo kryo = new Kryo();
@@ -421,7 +422,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -677,7 +678,7 @@ public class AbstractFileOutputOperatorTest
     writer.requestFinalize(EVEN_FILE);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(4);
@@ -821,7 +822,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(2);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(3);
@@ -868,8 +869,8 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(4);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
-    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
+    AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
 
@@ -1134,7 +1135,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1231,7 +1232,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1278,7 +1279,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.process(1);
     writer.endWindow();
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.beginWindow(1);
     writer.input.process(2);
@@ -1369,7 +1370,7 @@ public class AbstractFileOutputOperatorTest
     writer.input.put(3);
     writer.input.put(4);
 
-    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1);
+    AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 
Stateless.WINDOW_ID);
 
     writer.input.put(3);
     writer.input.put(4);
@@ -1766,13 +1767,11 @@ public class AbstractFileOutputOperatorTest
       for (int j = 0; j < 1000; ++j) {
         writer.input.put(i);
       }
-      //writer.endWindow();
+      writer.endWindow();
       if ((i % 2) == 1) {
         writer.beforeCheckpoint(i);
         evenOffsets.add(evenCounterContext.getCounter());
         oddOffsets.add(oddCounterContext.getCounter());
-        //evenOffsets.add(evenFile.length());
-        //oddOffsets.add(oddFile.length());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
index bcb1bc3..daebecb 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java
@@ -22,6 +22,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.Description;
 
+import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
 import com.datatorrent.lib.util.TestUtils.TestInfo;
@@ -70,7 +71,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.input.process("window 0");
     oper.endWindow();
 
-    AbstractFileOutputOperator checkPoint = 
AbstractFileOutputOperatorTest.checkpoint(oper, 0);
+    AbstractFileOutputOperator checkPoint = 
AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID);
 
     oper.beginWindow(1);
     oper.input.process("window 1");
@@ -110,7 +111,7 @@ public class AbstractWindowFileOutputOperatorTest
     oper.beginWindow(1);
     oper.input.process("1");
 
-    AbstractFileOutputOperator checkPoint = 
AbstractFileOutputOperatorTest.checkpoint(oper, -1);
+    AbstractFileOutputOperator checkPoint = 
AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID);
 
     oper.input.process("1");
     oper.teardown();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
index 4213a00..4042d3c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
@@ -99,6 +99,7 @@ public class FSInputModuleAppTest
     conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
     conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4");
     conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+    conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","10");
 
     LocalMode lma = LocalMode.newInstance();
     lma.prepareDAG(app, conf);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
index 6082f57..8b8ed01 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
@@ -62,6 +62,7 @@ public class GenericFileOutputOperatorTest extends 
AbstractFileOutputOperatorTes
       }
       writer.endWindow();
     }
+    checkpoint(writer, 10);
     writer.committed(10);
 
     for (int i = 13; i <= 26; i++) {
@@ -71,7 +72,10 @@ public class GenericFileOutputOperatorTest extends 
AbstractFileOutputOperatorTes
       }
       writer.endWindow();
     }
+    checkpoint(writer, 20);
     writer.committed(20);
+
+    checkpoint(writer, 26);
     writer.committed(26);
 
     String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", 
"13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
@@ -108,8 +112,10 @@ public class GenericFileOutputOperatorTest extends 
AbstractFileOutputOperatorTes
       }
       writer.endWindow();
       if (i % 10 == 0) {
+        checkpoint(writer, 10);
         writer.committed(10);
       }
+      checkpoint(writer, 24);
     }
     writer.committed(tuples.length);
 

Reply via email to