kafka-931; make zookeeper.connect a required property; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bd2a114 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bd2a114 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bd2a114 Branch: refs/heads/trunk Commit: 9bd2a11486420a313f800d734cab8a858fac6d0e Parents: fb37ea8 Author: Jun Rao <jun...@gmail.com> Authored: Mon Jun 3 17:58:48 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Jun 3 18:40:16 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 8 ++++---- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9bd2a114/core/src/main/scala/kafka/consumer/ConsumerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 9e9a8bc..c8c4212 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -31,9 +31,9 @@ object ConsumerConfig extends Config { val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 60 * 1000 - val MaxQueuedChunks = 10 + val MaxQueuedChunks = 2 val MaxRebalanceRetries = 4 - val AutoOffsetReset = OffsetRequest.SmallestTimeString + val AutoOffsetReset = OffsetRequest.LargestTimeString val ConsumerTimeoutMs = -1 val MinFetchBytes = 1 val MaxFetchWaitMs = 100 @@ -100,8 +100,8 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the frequency in ms that the consumer offsets are committed to zookeeper */ val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval) - /** max number of messages buffered for consumption */ - val queuedMaxMessages = props.getInt("queued.max.messages", MaxQueuedChunks) + /** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/ + val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks) /** max number of retries during rebalance */ val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries) http://git-wip-us.apache.org/repos/asf/kafka/blob/9bd2a114/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 63ea87e..d53d511 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -765,7 +765,7 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) class ZKConfig(props: VerifiableProperties) { /** ZK host string */ - val zkConnect = props.getString("zookeeper.connect", null) + val zkConnect = props.getString("zookeeper.connect") /** zookeeper session timeout */ val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000) http://git-wip-us.apache.org/repos/asf/kafka/blob/9bd2a114/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3cb1d4a..a4dcca6 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -148,6 +148,7 @@ object TestUtils extends Logging { props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") props.put("rebalance.max.retries", "4") + props.put("auto.offset.reset", "smallest") props }