Repository: apex-malhar Updated Branches: refs/heads/master 451250818 -> 719cf952d
APEXMALHAR-2121: Updating KafkaInputOperator to add emitTuple method with access to more kafka parameters Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dc821843 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dc821843 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dc821843 Branch: refs/heads/master Commit: dc821843a8ff8064ef4bdba39adaa0b1595175ff Parents: 882e456 Author: Priyanka Gugale <[email protected]> Authored: Mon Jun 20 12:55:12 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Mon Jun 20 12:55:12 2016 +0530 ---------------------------------------------------------------------- .../contrib/kafka/AbstractKafkaInputOperator.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dc821843/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index b026d16..9a5917b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -29,12 +29,12 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.util.KryoCloneUtils; - import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.cluster.Broker; @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; import javax.validation.Valid; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; + import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; @@ -203,6 +204,14 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem */ protected abstract void emitTuple(Message message); + /** + * Concrete class derived from KafkaInputOpertor should implement this method if it wants to access kafka offset and partitionId along with kafka message. + */ + protected void emitTuple(KafkaConsumer.KafkaMessage message) + { + emitTuple(message.msg); + } + public int getMaxTuplesPerWindow() { return maxTuplesPerWindow; @@ -304,7 +313,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem FetchResponse fetchResponse = ksc.fetch(req); Integer count = 0; for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kp.getPartitionId())) { - emitTuple(msg.message()); + KafkaConsumer.KafkaMessage kafkaMessage = new KafkaConsumer.KafkaMessage(kp, msg.message(), msg.offset()); + emitTuple(kafkaMessage); offsetStats.put(kp, msg.offset()); count = count + 1; if (count.equals(rc.getValue().right)) @@ -430,7 +440,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem pendingMessage = message; break; } - emitTuple(message.msg); + emitTuple(message); emitCount++; emitTotalMsgSize += message.msg.size(); offsetStats.put(message.kafkaPart, message.offSet);
