Repository: storm
Updated Branches:
  refs/heads/master d593918a5 -> ce36b6dbf


STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b0af500
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b0af500
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b0af500

Branch: refs/heads/master
Commit: 8b0af5004d8472117d709caa8af997c6842d982a
Parents: d593918
Author: narendra_bidari <[email protected]>
Authored: Mon Apr 4 12:28:09 2016 -0700
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Apr 20 16:31:02 2016 +0900

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 ++++-
 .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java  | 2 +-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
index e1e1d24..c845531 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
@@ -17,10 +17,12 @@
  */
 package org.apache.storm.kafka;
 
+import java.io.Serializable;
+
 import org.apache.storm.spout.MultiScheme;
 import org.apache.storm.spout.RawMultiScheme;
 
-import java.io.Serializable;
+import kafka.api.FetchRequest;
 
 public class KafkaConfig implements Serializable {
     private static final long serialVersionUID = 5276718734571623855L;
@@ -39,6 +41,7 @@ public class KafkaConfig implements Serializable {
     public long maxOffsetBehind = Long.MAX_VALUE;
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
     public int metricsTimeBucketSizeInSecs = 60;
+    public int minFetchByte = FetchRequest.DefaultMinBytes();
 
     public KafkaConfig(BrokerHosts hosts, String topic) {
         this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());

http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 7fa4340..090b6d1 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -189,7 +189,7 @@ public class KafkaUtils {
         int partitionId = partition.partition;
         FetchRequestBuilder builder = new FetchRequestBuilder();
         FetchRequest fetchRequest = builder.addFetch(topic, partitionId, 
offset, config.fetchSizeBytes).
-                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
+                       
clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
         FetchResponse fetchResponse;
         try {
             fetchResponse = consumer.fetch(fetchRequest);

Reply via email to