Repository: storm Updated Branches: refs/heads/master d593918a5 -> ce36b6dbf
STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b0af500 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b0af500 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b0af500 Branch: refs/heads/master Commit: 8b0af5004d8472117d709caa8af997c6842d982a Parents: d593918 Author: narendra_bidari <[email protected]> Authored: Mon Apr 4 12:28:09 2016 -0700 Committer: Jungtaek Lim <[email protected]> Committed: Wed Apr 20 16:31:02 2016 +0900 ---------------------------------------------------------------------- .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 ++++- .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java index e1e1d24..c845531 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java @@ -17,10 +17,12 @@ */ package org.apache.storm.kafka; +import java.io.Serializable; + import org.apache.storm.spout.MultiScheme; import org.apache.storm.spout.RawMultiScheme; -import java.io.Serializable; +import kafka.api.FetchRequest; public class KafkaConfig implements Serializable { private static final long serialVersionUID = 5276718734571623855L; @@ -39,6 +41,7 @@ public class KafkaConfig implements Serializable { public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; + public int minFetchByte = FetchRequest.DefaultMinBytes(); public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 7fa4340..090b6d1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -189,7 +189,7 @@ public class KafkaUtils { int partitionId = partition.partition; FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). - clientId(config.clientId).maxWait(config.fetchMaxWait).build(); + clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest);
