Repository: apex-malhar Updated Branches: refs/heads/master c528980a9 -> 170072533
APEXMALHAR-2154 Update the Kafka Input Operator to use CheckpointNotificationListener Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9d15fe29 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9d15fe29 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9d15fe29 Branch: refs/heads/master Commit: 9d15fe29287697ff2b660e637c495d76a01ab910 Parents: 9b6e11d Author: chaitanya <[email protected]> Authored: Thu Aug 18 12:53:40 2016 +0530 Committer: chaitanya <[email protected]> Committed: Thu Aug 18 12:53:40 2016 +0530 ---------------------------------------------------------------------- .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9d15fe29/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 3e709eb..9fbc418 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -74,7 +74,7 @@ import com.datatorrent.netlet.util.DTThrowable; * @since 3.3.0 */ @InterfaceStability.Evolving -public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback +public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback { private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class); @@ -191,6 +191,12 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } @Override + public void beforeCheckpoint(long windowId) + { + + } + + @Override public void committed(long windowId) { if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) {
