Repository: incubator-apex-malhar Updated Branches: refs/heads/release-3.2 5c7dae147 -> bbbb7f842
SPOI-1885: Adding getters to the variables of KafkaMessage nested class Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/cd3d7a70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/cd3d7a70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/cd3d7a70 Branch: refs/heads/release-3.2 Commit: cd3d7a70c45e040191ae69de6692ebb4ef97b9e6 Parents: 5c7dae1 Author: Chaitanya <[email protected]> Authored: Thu Oct 29 13:58:29 2015 +0530 Committer: Chaitanya <[email protected]> Committed: Thu Oct 29 13:58:29 2015 +0530 ---------------------------------------------------------------------- .../contrib/kafka/KafkaConsumer.java | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd3d7a70/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java index d6e06c4..cf5179c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java @@ -32,8 +32,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import kafka.message.Message; - import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; import javax.validation.constraints.Pattern.Flag; @@ -41,13 +39,16 @@ import javax.validation.constraints.Pattern.Flag; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import com.datatorrent.api.Context; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.HashMultimap; import com.google.common.collect.Maps; import com.google.common.collect.SetMultimap; +import com.datatorrent.api.Context; + +import kafka.message.Message; + /** * Base Kafka Consumer class used by kafka input operator * @@ -342,7 +343,20 @@ public abstract class KafkaConsumer implements Closeable this.msg = msg; this.offSet = offset; } + public KafkaPartition getKafkaPart() + { + return kafkaPart; + } + public Message getMsg() + { + return msg; + } + + public long getOffSet() + { + return offSet; + } } public static class KafkaMeterStatsUtil {
