KAFKA-991; Reduce the queue size in hadoop producer; patched by Swapnil Ghike, reviewed by Jay Kreps and Joel Koshy.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8edd3e63 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8edd3e63 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8edd3e63 Branch: refs/heads/trunk Commit: 8edd3e63024832f2c05b5819ab3dfc2e0b300729 Parents: 76d3905 Author: Joel Koshy <[email protected]> Authored: Thu Aug 1 14:58:25 2013 -0700 Committer: Joel Koshy <[email protected]> Committed: Thu Aug 1 14:58:25 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java | 7 +++++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8edd3e63/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 0b435b9..709a609 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -40,8 +40,11 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V> private Logger log = Logger.getLogger(KafkaOutputFormat.class); public static final String KAFKA_URL = "kafka.output.url"; - /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */ - public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; + /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window): + * We set the default to a million bytes so that the server will not reject the batch of messages + * with a MessageSizeTooLargeException. The actual size will be smaller after compression. + */ + public static final int KAFKA_QUEUE_SIZE = 1000000; public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; private static final Map<String, String> kafkaConfigMap; http://git-wip-us.apache.org/repos/asf/kafka/blob/8edd3e63/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b774431..41c9626 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -18,7 +18,7 @@ package kafka.server import java.util.Properties -import kafka.message.Message +import kafka.message.{MessageSet, Message} import kafka.consumer.ConsumerConfig import kafka.utils.{VerifiableProperties, ZKConfig, Utils} @@ -38,7 +38,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) /* the maximum size of message that the server can receive */ - val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue)) + val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) /* the number of network threads that the server uses for handling network requests */ val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
