Repository: apex-core Updated Branches: refs/heads/master 1e9896bb4 -> 527c70bf8
APEXCORE-627 : Unit test AtMostOnceTest intermittently fails Fixed a race problem for calling checkpointed and committed in RecoverableInputOperator. The original implementation of the class used the method checkpointed() of the interface CheckpointListener to refresh a value of one of the criteria variables checkpointedWindowId. The fix update uses the method beforeCheckpoint() of the interface CheckpointNotificationListener. It guaranties that the update of the variable checkpointedWindowId will be done before the call of the method committed(). Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/527c70bf Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/527c70bf Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/527c70bf Branch: refs/heads/master Commit: 527c70bf8b4060d04d713f2b6e0ffb113a17ece6 Parents: 1e9896b Author: Sergey Golovko <[email protected]> Authored: Fri Jan 27 15:15:09 2017 -0800 Committer: Sergey Golovko <[email protected]> Committed: Mon Jan 30 16:20:14 2017 -0800 ---------------------------------------------------------------------- .../stram/engine/RecoverableInputOperator.java | 44 +++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/527c70bf/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java index ed95874..6412da1 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java @@ -26,24 +26,28 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.util.BaseOperator; /** * */ -public class RecoverableInputOperator implements InputOperator, com.datatorrent.api.Operator.CheckpointListener +public class RecoverableInputOperator implements InputOperator, Operator.CheckpointNotificationListener { public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); private long checkpointedWindowId; - boolean firstRun = true; - transient boolean first; - transient long windowId; - int maximumTuples = 20; - boolean simulateFailure; + private transient boolean firstRun = true; + private transient boolean first; + private transient long windowId; + private int maximumTuples = 20; + private boolean simulateFailure; private static final Map<Long, Long> idMap = new HashMap<>(); private static long tuple = 0; @@ -95,7 +99,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. public void setup(OperatorContext context) { firstRun = (checkpointedWindowId == 0); - logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId)); + logger.debug("{}", this); } @Override @@ -106,18 +110,12 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. @Override public void checkpointed(long windowId) { - if (checkpointedWindowId == 0) { - checkpointedWindowId = windowId; - logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId)); - } - - logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(windowId)); } @Override public void committed(long windowId) { - logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", this, Codec.getStringWindowId(windowId), firstRun, Codec.getStringWindowId(checkpointedWindowId)); + logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId)); if (simulateFailure && firstRun && checkpointedWindowId > 0 && windowId > checkpointedWindowId) { throw new RuntimeException("Failure Simulation from " + this); } @@ -134,4 +132,22 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. { simulateFailure = flag; } + + @Override + public void beforeCheckpoint(long windowId) + { + if (checkpointedWindowId == 0) { + checkpointedWindowId = windowId; + } + logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId)); + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("firstRun", this.firstRun) + .append("checkpointedWindowId", Codec.getStringWindowId(checkpointedWindowId)) + .toString(); + } }
