Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 061176656 -> 47d66b000


APEXCORE-415 #resolve #comment Fixed double checkpointing bug in InputNode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f8b4d499
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f8b4d499
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f8b4d499

Branch: refs/heads/master
Commit: f8b4d499c61de2d4acd7710e63f52d11020eda2c
Parents: 9e17211
Author: Timothy Farkas <[email protected]>
Authored: Fri Apr 1 16:18:41 2016 -0700
Committer: Timothy Farkas <[email protected]>
Committed: Mon Apr 4 16:49:02 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/stram/engine/InputNode.java | 18 ++--
 .../stram/engine/GenericNodeTest.java           | 75 +++--------------
 .../datatorrent/stram/engine/InputNodeTest.java | 69 ++++++++++++++++
 .../com/datatorrent/stram/engine/NodeTest.java  | 87 ++++++++++++++++++++
 4 files changed, 180 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index f28841c..09eca59 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -28,6 +28,8 @@ import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Operator.ShutdownException;
 import com.datatorrent.api.Sink;
+import com.datatorrent.api.annotation.Stateless;
+
 import com.datatorrent.netlet.util.DTThrowable;
 
 import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
@@ -43,6 +45,7 @@ public class InputNode extends Node<InputOperator>
 {
   private final ArrayList<SweepableReservoir> deferredInputConnections = new 
ArrayList<SweepableReservoir>();
   protected SweepableReservoir controlTuples;
+  long lastCheckpointWindowId = Stateless.WINDOW_ID;
 
   public InputNode(InputOperator operator, OperatorContext context)
   {
@@ -145,10 +148,12 @@ public class InputNode extends Node<InputOperator>
                 checkpointWindowCount = 0;
                 if (doCheckpoint) {
                   checkpoint(currentWindowId);
+                  lastCheckpointWindowId = currentWindowId;
                   doCheckpoint = false;
                 }
                 else if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
                   checkpoint(currentWindowId);
+                  lastCheckpointWindowId = currentWindowId;
                 }
               }
 
@@ -162,11 +167,13 @@ public class InputNode extends Node<InputOperator>
 
             case CHECKPOINT:
               dagCheckpointOffsetCount = 0;
-              if (checkpointWindowCount == 0 && PROCESSING_MODE != 
ProcessingMode.EXACTLY_ONCE) {
-                checkpoint(currentWindowId);
-              }
-              else {
-                doCheckpoint = true;
+              if (lastCheckpointWindowId < currentWindowId) {
+                if (checkpointWindowCount == 0 && PROCESSING_MODE != 
ProcessingMode.EXACTLY_ONCE) {
+                  checkpoint(currentWindowId);
+                  lastCheckpointWindowId = currentWindowId;
+                } else {
+                  doCheckpoint = true;
+                }
               }
               for (int i = sinks.length; i-- > 0;) {
                 sinks[i].put(t);
@@ -234,6 +241,7 @@ public class InputNode extends Node<InputOperator>
         checkpointWindowCount = 0;
         if (doCheckpoint || PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
           checkpoint(currentWindowId);
+          lastCheckpointWindowId = currentWindowId;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 5828844..034851b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -44,7 +44,7 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.Stats.OperatorStats;
@@ -52,9 +52,9 @@ import 
com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.ScheduledExecutorService;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
 import com.datatorrent.stram.api.Checkpoint;
-import com.datatorrent.common.util.ScheduledExecutorService;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -229,7 +229,7 @@ public class GenericNodeTest
     }
   }
 
-  public static class GenericCheckpointOperator extends GenericOperator 
implements CheckpointListener
+  public static class GenericCheckpointOperator extends GenericOperator 
implements CheckpointNotificationListener
   {
     public Set<Long> checkpointedWindows = Sets.newHashSet();
     public volatile boolean checkpointTwice = false;
@@ -262,6 +262,11 @@ public class GenericNodeTest
     public void committed(long windowId)
     {
     }
+
+    @Override
+    public void beforeCheckpoint(long windowId)
+    {
+    }
   }
 
   @Test
@@ -476,77 +481,19 @@ public class GenericNodeTest
   @Test
   public void testDoubleCheckpointAtleastOnce() throws Exception
   {
-    testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE);
+    NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, true, 
testMeta.getDir());
   }
 
   @Test
   public void testDoubleCheckpointAtMostOnce() throws Exception
   {
-    testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE);
+    NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE, true, 
testMeta.getDir());
   }
 
   @Test
   public void testDoubleCheckpointExactlyOnce() throws Exception
   {
-    testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE);
-  }
-
-  @SuppressWarnings("SleepWhileInLoop")
-  private void testDoubleCheckpointHandling(ProcessingMode processingMode) 
throws Exception
-  {
-    WindowGenerator windowGenerator = new WindowGenerator(new 
ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
-    windowGenerator.setResetWindow(0L);
-    windowGenerator.setFirstWindow(0L);
-    windowGenerator.setWindowWidth(100);
-    windowGenerator.setCheckpointCount(1, 0);
-
-    GenericCheckpointOperator gco = new GenericCheckpointOperator();
-    DefaultAttributeMap dam = new DefaultAttributeMap();
-    dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
-    dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 2);
-    dam.put(OperatorContext.PROCESSING_MODE, processingMode);
-
-    final GenericNode in = new GenericNode(gco, new 
com.datatorrent.stram.engine.OperatorContext(0, dam, null));
-    in.setId(1);
-
-    TestSink testSink = new TestSink();
-
-    in.connectInputPort("ip1", 
windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
-    in.connectOutputPort("output", testSink);
-    in.firstWindowMillis = 0;
-    in.windowWidthMillis = 100;
-
-    windowGenerator.activate(null);
-
-    final AtomicBoolean ab = new AtomicBoolean(false);
-    Thread t = new Thread()
-    {
-      @Override
-      public void run()
-      {
-        ab.set(true);
-        in.activate();
-        in.run();
-        in.deactivate();
-      }
-    };
-
-    t.start();
-
-    long startTime = System.currentTimeMillis();
-    long endTime = 0;
-
-    while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - 
startTime) < 5000) {
-      Thread.sleep(50);
-    }
-
-    in.shutdown();
-    t.join();
-
-    windowGenerator.deactivate();
-
-    Assert.assertFalse(gco.checkpointTwice);
-    Assert.assertTrue("Timed out", (endTime - startTime) < 5000);
+    NodeTest.testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE, true, 
testMeta.getDir());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
index f723c2b..94b7675 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -18,26 +18,36 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Sink;
 import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.stram.engine.GenericNodeTest.FSTestWatcher;
+import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 public class InputNodeTest
 {
+  @Rule
+  public FSTestWatcher testMeta = new FSTestWatcher();
+
   @Test
   public void testEmitTuplesOutsideStreamingWindow() throws Exception
   {
@@ -111,6 +121,24 @@ public class InputNodeTest
     }
   }
 
+  @Test
+  public void testDoubleCheckpointAtleastOnce() throws Exception
+  {
+    NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, false, 
testMeta.getDir());
+  }
+
+  @Test
+  public void testDoubleCheckpointAtMostOnce() throws Exception
+  {
+    NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE, false, 
testMeta.getDir());
+  }
+
+  @Test
+  public void testDoubleCheckpointExactlyOnce() throws Exception
+  {
+    NodeTest.testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE, false, 
testMeta.getDir());
+  }
+
   public static class TestWindowGenerator implements SweepableReservoir
   {
     private final long baseSeconds = (System.currentTimeMillis() / 1000L) << 
32;
@@ -208,6 +236,47 @@ public class InputNodeTest
     private static final Logger LOG = 
LoggerFactory.getLogger(TestWindowGenerator.class);
   }
 
+
+  public static class InputCheckpointOperator extends 
GenericCheckpointOperator implements InputOperator
+  {
+    public Set<Long> checkpointedWindows = Sets.newHashSet();
+    public volatile boolean checkpointTwice = false;
+    public volatile int numWindows = 0;
+
+    public InputCheckpointOperator()
+    {
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+      super.checkpointed(windowId);
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+      super.committed(windowId);
+    }
+
+    @Override
+    public void emitTuples()
+    {
+    }
+  }
+
   public static class TestInputOperator implements InputOperator, 
IdleTimeHandler
   {
     public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<>();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f8b4d499/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 2e12f63..2595714 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -20,18 +20,26 @@ package com.datatorrent.stram.engine;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
 import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator;
+import com.datatorrent.stram.engine.InputNodeTest.InputCheckpointOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 
 /**
@@ -258,4 +266,83 @@ public class NodeTest
     node.deactivate();
   }
 
+  @SuppressWarnings("SleepWhileInLoop")
+  public static void testDoubleCheckpointHandling(ProcessingMode 
processingMode, boolean trueGenericFalseInput, String path)
+      throws Exception
+  {
+    WindowGenerator windowGenerator = new WindowGenerator(new 
ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
+    windowGenerator.setResetWindow(0L);
+    windowGenerator.setFirstWindow(0L);
+    windowGenerator.setWindowWidth(100);
+    windowGenerator.setCheckpointCount(1, 0);
+
+    GenericCheckpointOperator gco;
+
+    if (trueGenericFalseInput) {
+      gco = new GenericCheckpointOperator();
+    } else {
+      gco = new InputCheckpointOperator();
+    }
+    DefaultAttributeMap dam = new DefaultAttributeMap();
+    
dam.put(com.datatorrent.stram.engine.OperatorContext.APPLICATION_WINDOW_COUNT, 
2);
+    
dam.put(com.datatorrent.stram.engine.OperatorContext.CHECKPOINT_WINDOW_COUNT, 
2);
+    dam.put(com.datatorrent.stram.engine.OperatorContext.PROCESSING_MODE, 
processingMode);
+    dam.put(com.datatorrent.stram.engine.OperatorContext.STORAGE_AGENT, new 
FSStorageAgent(path, new Configuration()));
+
+    final Node in;
+
+    if (trueGenericFalseInput) {
+      in = new GenericNode(gco, new 
com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+    } else {
+      in = new InputNode((InputCheckpointOperator) gco, new 
com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+    }
+
+    in.setId(1);
+
+    TestSink testSink = new TestSink();
+    String inputPort;
+
+    if (trueGenericFalseInput) {
+      inputPort = "ip1";
+    } else {
+      inputPort = Node.INPUT;
+    }
+
+    in.connectInputPort(inputPort, 
windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
+    in.connectOutputPort("output", testSink);
+    in.firstWindowMillis = 0;
+    in.windowWidthMillis = 100;
+
+    windowGenerator.activate(null);
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        in.activate();
+        in.run();
+        in.deactivate();
+      }
+    };
+
+    t.start();
+
+    long startTime = System.currentTimeMillis();
+    long endTime = 0;
+
+    while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - 
startTime) < 6000) {
+      Thread.sleep(50);
+    }
+
+    in.shutdown();
+    t.join();
+
+    windowGenerator.deactivate();
+
+    Assert.assertFalse(gco.checkpointTwice);
+    Assert.assertTrue("Timed out", (endTime - startTime) < 5000);
+  }
 }

Reply via email to