Repository: storm
Updated Branches:
  refs/heads/1.x-branch ac7b828ad -> 39e8f0e9c


STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89f4d44d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89f4d44d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89f4d44d

Branch: refs/heads/1.x-branch
Commit: 89f4d44dee45567d6d855842856d5439f9501987
Parents: a240df5
Author: narendra_bidari <[email protected]>
Authored: Mon Apr 4 12:28:09 2016 -0700
Committer: narendra_bidari <[email protected]>
Committed: Mon Apr 4 12:28:09 2016 -0700

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 ++++-
 .../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java  | 2 +-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/89f4d44d/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
index e1e1d24..c845531 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
@@ -17,10 +17,12 @@
  */
 package org.apache.storm.kafka;
 
+import java.io.Serializable;
+
 import org.apache.storm.spout.MultiScheme;
 import org.apache.storm.spout.RawMultiScheme;
 
-import java.io.Serializable;
+import kafka.api.FetchRequest;
 
 public class KafkaConfig implements Serializable {
     private static final long serialVersionUID = 5276718734571623855L;
@@ -39,6 +41,7 @@ public class KafkaConfig implements Serializable {
     public long maxOffsetBehind = Long.MAX_VALUE;
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
     public int metricsTimeBucketSizeInSecs = 60;
+    public int minFetchByte = FetchRequest.DefaultMinBytes();
 
     public KafkaConfig(BrokerHosts hosts, String topic) {
         this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());

http://git-wip-us.apache.org/repos/asf/storm/blob/89f4d44d/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index a2be825..9eb8268 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -185,7 +185,7 @@ public class KafkaUtils {
         int partitionId = partition.partition;
         FetchRequestBuilder builder = new FetchRequestBuilder();
         FetchRequest fetchRequest = builder.addFetch(topic, partitionId, 
offset, config.fetchSizeBytes).
-                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
+                       
clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
         FetchResponse fetchResponse;
         try {
             fetchResponse = consumer.fetch(fetchRequest);

Reply via email to