Repository: apex-malhar
Updated Branches:
  refs/heads/master 451250818 -> 719cf952d


APEXMALHAR-2121: Updating KafkaInputOperator to add emitTuple method with 
access to more kafka parameters


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dc821843
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dc821843
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dc821843

Branch: refs/heads/master
Commit: dc821843a8ff8064ef4bdba39adaa0b1595175ff
Parents: 882e456
Author: Priyanka Gugale <[email protected]>
Authored: Mon Jun 20 12:55:12 2016 +0530
Committer: Priyanka Gugale <[email protected]>
Committed: Mon Jun 20 12:55:12 2016 +0530

----------------------------------------------------------------------
 .../contrib/kafka/AbstractKafkaInputOperator.java   | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dc821843/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index b026d16..9a5917b 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -29,12 +29,12 @@ import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.KryoCloneUtils;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.cluster.Broker;
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import javax.validation.Valid;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
+
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
@@ -203,6 +204,14 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
    */
   protected abstract void emitTuple(Message message);
 
+  /**
+   * Concrete class derived from KafkaInputOpertor should implement this 
method if it wants to access kafka offset and partitionId along with kafka 
message. 
+   */
+  protected void emitTuple(KafkaConsumer.KafkaMessage message)
+  {
+    emitTuple(message.msg);
+  }
+
   public int getMaxTuplesPerWindow()
   {
     return maxTuplesPerWindow;
@@ -304,7 +313,8 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
             FetchResponse fetchResponse = ksc.fetch(req);
             Integer count = 0;
             for (MessageAndOffset msg : 
fetchResponse.messageSet(consumer.topic, kp.getPartitionId())) {
-              emitTuple(msg.message());
+              KafkaConsumer.KafkaMessage kafkaMessage = new 
KafkaConsumer.KafkaMessage(kp, msg.message(), msg.offset());
+              emitTuple(kafkaMessage);
               offsetStats.put(kp, msg.offset());
               count = count + 1;
               if (count.equals(rc.getValue().right))
@@ -430,7 +440,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
         pendingMessage = message;
         break;
       }
-      emitTuple(message.msg);
+      emitTuple(message);
       emitCount++;
       emitTotalMsgSize += message.msg.size();
       offsetStats.put(message.kafkaPart, message.offSet);

Reply via email to