Repository: apex-malhar
Updated Branches:
  refs/heads/master c528980a9 -> 170072533


APEXMALHAR-2154 Update the Kafka Input Operator to use 
CheckpointNotificationListener


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9d15fe29
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9d15fe29
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9d15fe29

Branch: refs/heads/master
Commit: 9d15fe29287697ff2b660e637c495d76a01ab910
Parents: 9b6e11d
Author: chaitanya <[email protected]>
Authored: Thu Aug 18 12:53:40 2016 +0530
Committer: chaitanya <[email protected]>
Committed: Thu Aug 18 12:53:40 2016 +0530

----------------------------------------------------------------------
 .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9d15fe29/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 3e709eb..9fbc418 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -74,7 +74,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 3.3.0
  */
 @InterfaceStability.Evolving
-public abstract class AbstractKafkaInputOperator implements InputOperator, 
Operator.ActivationListener<Context.OperatorContext>, 
Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, 
StatsListener, OffsetCommitCallback
+public abstract class AbstractKafkaInputOperator implements InputOperator, 
Operator.ActivationListener<Context.OperatorContext>, 
Operator.CheckpointNotificationListener, 
Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
 {
 
   private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
@@ -191,6 +191,12 @@ public abstract class AbstractKafkaInputOperator 
implements InputOperator, Opera
   }
 
   @Override
+  public void beforeCheckpoint(long windowId)
+  {
+
+  }
+
+  @Override
   public void committed(long windowId)
   {
     if (initialOffset == InitialOffset.LATEST || initialOffset == 
InitialOffset.EARLIEST) {

Reply via email to