Repository: apex-malhar Updated Branches: refs/heads/master 22c65c4c0 -> 6c42103f8
APEXMALHAR-2411 Avoid isreplaystate variable, incorporate logic in activate() and replay() for Kinesis Input Operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6c42103f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6c42103f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6c42103f Branch: refs/heads/master Commit: 6c42103f8a9721649d2c6ce3905cffc8baf61fb9 Parents: 22c65c4 Author: deepak-narkhede <[email protected]> Authored: Fri Feb 17 11:43:05 2017 +0530 Committer: deepak-narkhede <[email protected]> Committed: Mon Feb 20 14:24:32 2017 +0530 ---------------------------------------------------------------------- .../kinesis/AbstractKinesisInputOperator.java | 33 ++++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6c42103f/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index fc10bea..18a6399 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -56,6 +56,7 @@ import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.Partitioner; import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; +import com.datatorrent.api.annotation.Stateless; import com.datatorrent.common.util.Pair; import com.datatorrent.lib.util.KryoCloneUtils; @@ -128,8 +129,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, private transient long lastRepartitionTime = 0L; - private transient boolean isReplayState = false; - //No of shards per partition in dynamic MANY_TO_ONE strategy // If the value is more than 1, then it enables the dynamic partitioning @Min(1) @@ -425,9 +424,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, operatorId = context.getId(); windowDataManager.setup(context); shardPosition.clear(); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) { - isReplayState = true; - } } /** @@ -477,6 +473,18 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, throw new RuntimeException(e); } } + + /* + * Set the shard positions and start the consumer if last recovery windowid + * match with current completed windowid. + */ + if (windowId == windowDataManager.getLargestCompletedWindow()) { + // Set the shard positions to the consumer + Map<String, String> statsData = new HashMap<String, String>(getConsumer().getShardPosition()); + statsData.putAll(shardPosition); + getConsumer().resetShardPositions(statsData); + consumer.start(); + } } catch (IOException e) { throw new RuntimeException("replay", e); @@ -507,9 +515,9 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, @Override public void activate(OperatorContext ctx) { - if(isReplayState) - { - // If it is a replay state, don't start the consumer + // If it is a replay state, don't start the consumer + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && + context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) { return; } consumer.start(); @@ -573,15 +581,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, } shardPosition.put(shardId, recordId); } - if(isReplayState) - { - isReplayState = false; - // Set the shard positions to the consumer - Map<String, String> statsData = new HashMap<String, String>(getConsumer().getShardPosition()); - statsData.putAll(shardPosition); - getConsumer().resetShardPositions(statsData); - consumer.start(); - } emitCount += count; }
