Repository: apex-core
Updated Branches:
  refs/heads/master 1e9896bb4 -> 527c70bf8


APEXCORE-627 : Unit test AtMostOnceTest intermittently fails

Fixed a race problem for calling checkpointed and committed in 
RecoverableInputOperator. The original implementation of the class used the 
method checkpointed() of the interface CheckpointListener to refresh a value of 
one of the criteria variables checkpointedWindowId. The fix update uses the 
method beforeCheckpoint() of the interface CheckpointNotificationListener. It 
guaranties that the update of the variable checkpointedWindowId will be done 
before the call of the method committed().


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/527c70bf
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/527c70bf
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/527c70bf

Branch: refs/heads/master
Commit: 527c70bf8b4060d04d713f2b6e0ffb113a17ece6
Parents: 1e9896b
Author: Sergey Golovko <[email protected]>
Authored: Fri Jan 27 15:15:09 2017 -0800
Committer: Sergey Golovko <[email protected]>
Committed: Mon Jan 30 16:20:14 2017 -0800

----------------------------------------------------------------------
 .../stram/engine/RecoverableInputOperator.java  | 44 +++++++++++++-------
 1 file changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/527c70bf/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
 
b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
index ed95874..6412da1 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
@@ -26,24 +26,28 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.util.BaseOperator;
 
 /**
  *
  */
-public class RecoverableInputOperator implements InputOperator, 
com.datatorrent.api.Operator.CheckpointListener
+public class RecoverableInputOperator implements InputOperator, 
Operator.CheckpointNotificationListener
 {
   public final transient DefaultOutputPort<Long> output = new 
DefaultOutputPort<>();
   private long checkpointedWindowId;
-  boolean firstRun = true;
-  transient boolean first;
-  transient long windowId;
-  int maximumTuples = 20;
-  boolean simulateFailure;
+  private transient boolean firstRun = true;
+  private transient boolean first;
+  private transient long windowId;
+  private int maximumTuples = 20;
+  private boolean simulateFailure;
 
   private static final Map<Long, Long> idMap = new HashMap<>();
   private static long tuple = 0;
@@ -95,7 +99,7 @@ public class RecoverableInputOperator implements 
InputOperator, com.datatorrent.
   public void setup(OperatorContext context)
   {
     firstRun = (checkpointedWindowId == 0);
-    logger.debug("firstRun={} checkpointedWindowId={}", firstRun, 
Codec.getStringWindowId(checkpointedWindowId));
+    logger.debug("{}", this);
   }
 
   @Override
@@ -106,18 +110,12 @@ public class RecoverableInputOperator implements 
InputOperator, com.datatorrent.
   @Override
   public void checkpointed(long windowId)
   {
-    if (checkpointedWindowId == 0) {
-      checkpointedWindowId = windowId;
-      logger.debug("firstRun={} checkpointedWindowId={}", firstRun, 
Codec.getStringWindowId(checkpointedWindowId));
-    }
-
-    logger.debug("{} checkpointed at {}", this, 
Codec.getStringWindowId(windowId));
   }
 
   @Override
   public void committed(long windowId)
   {
-    logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", 
this, Codec.getStringWindowId(windowId), firstRun, 
Codec.getStringWindowId(checkpointedWindowId));
+    logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId));
     if (simulateFailure && firstRun && checkpointedWindowId > 0 && windowId > 
checkpointedWindowId) {
       throw new RuntimeException("Failure Simulation from " + this);
     }
@@ -134,4 +132,22 @@ public class RecoverableInputOperator implements 
InputOperator, com.datatorrent.
   {
     simulateFailure = flag;
   }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    if (checkpointedWindowId == 0) {
+      checkpointedWindowId = windowId;
+    }
+    logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId));
+  }
+
+  @Override
+  public String toString()
+  {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+            .append("firstRun", this.firstRun)
+            .append("checkpointedWindowId", 
Codec.getStringWindowId(checkpointedWindowId))
+            .toString();
+  }
 }

Reply via email to