kafka-931; make zookeeper.connect a required property; patched by Jun Rao; 
reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bd2a114
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bd2a114
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bd2a114

Branch: refs/heads/trunk
Commit: 9bd2a11486420a313f800d734cab8a858fac6d0e
Parents: fb37ea8
Author: Jun Rao <jun...@gmail.com>
Authored: Mon Jun 3 17:58:48 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Jun 3 18:40:16 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 8 ++++----
 core/src/main/scala/kafka/utils/ZkUtils.scala           | 2 +-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala    | 1 +
 3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9bd2a114/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9e9a8bc..c8c4212 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -31,9 +31,9 @@ object ConsumerConfig extends Config {
   val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
   val AutoCommitInterval = 60 * 1000
-  val MaxQueuedChunks = 10
+  val MaxQueuedChunks = 2
   val MaxRebalanceRetries = 4
-  val AutoOffsetReset = OffsetRequest.SmallestTimeString
+  val AutoOffsetReset = OffsetRequest.LargestTimeString
   val ConsumerTimeoutMs = -1
   val MinFetchBytes = 1
   val MaxFetchWaitMs = 100
@@ -100,8 +100,8 @@ class ConsumerConfig private (val props: 
VerifiableProperties) extends ZKConfig(
   /** the frequency in ms that the consumer offsets are committed to zookeeper 
*/
   val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", 
AutoCommitInterval)
 
-  /** max number of messages buffered for consumption */
-  val queuedMaxMessages = props.getInt("queued.max.messages", MaxQueuedChunks)
+  /** max number of message chunks buffered for consumption, each chunk can be 
up to fetch.message.max.bytes*/
+  val queuedMaxMessages = props.getInt("queued.max.message.chunks", 
MaxQueuedChunks)
 
   /** max number of retries during rebalance */
   val rebalanceMaxRetries = props.getInt("rebalance.max.retries", 
MaxRebalanceRetries)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bd2a114/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 63ea87e..d53d511 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -765,7 +765,7 @@ class ZKGroupTopicDirs(group: String, topic: String) 
extends ZKGroupDirs(group)
 
 class ZKConfig(props: VerifiableProperties) {
   /** ZK host string */
-  val zkConnect = props.getString("zookeeper.connect", null)
+  val zkConnect = props.getString("zookeeper.connect")
 
   /** zookeeper session timeout */
   val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bd2a114/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3cb1d4a..a4dcca6 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -148,6 +148,7 @@ object TestUtils extends Logging {
     props.put("zookeeper.sync.time.ms", "200")
     props.put("auto.commit.interval.ms", "1000")
     props.put("rebalance.max.retries", "4")
+    props.put("auto.offset.reset", "smallest")
 
     props
   }

Reply via email to