Updated Branches: refs/heads/trunk 269d16d3c -> 9c1d8e35c
KAFKA-1184 High-Level Consumer: expose fetcher threads number as a parameter; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c1d8e35 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c1d8e35 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c1d8e35 Branch: refs/heads/trunk Commit: 9c1d8e35c5913d22098cf80ae69035131cfee87f Parents: 269d16d Author: Evelina Stepanova <[email protected]> Authored: Thu Jan 30 08:36:53 2014 -0800 Committer: Neha Narkhede <[email protected]> Committed: Thu Jan 30 08:37:28 2014 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 4 ++++ .../src/main/scala/kafka/consumer/ConsumerFetcherManager.scala | 2 +- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 1 + perf/src/main/scala/kafka/perf/ConsumerPerformance.scala | 6 ++++++ 4 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/main/scala/kafka/consumer/ConsumerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c8c4212..e6875d6 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -28,6 +28,7 @@ object ConsumerConfig extends Config { val SocketBufferSize = 64*1024 val FetchSize = 1024 * 1024 val MaxFetchSize = 10*FetchSize + val NumConsumerFetchers = 1 val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 60 * 1000 @@ -93,6 +94,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the number of byes of messages to attempt to fetch */ val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) + + /** the number threads used to fetch data */ + val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index e4451bb..b9e2bea 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, private val config: ConsumerConfig, private val zkClient : ZkClient) extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), - config.clientId, 1) { + config.clientId, config.numConsumerFetchers) { private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null private var cluster: Cluster = null private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d88b6c3..426b1a7 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -150,6 +150,7 @@ object TestUtils extends Logging { props.put("auto.commit.interval.ms", "1000") props.put("rebalance.max.retries", "4") props.put("auto.offset.reset", "smallest") + props.put("num.consumer.fetchers", "2") props } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c1d8e35/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index ec3cd29..55ee01b 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -112,6 +112,11 @@ object ConsumerPerformance { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(10) + val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) val options = parser.parse(args : _*) @@ -130,6 +135,7 @@ object ConsumerPerformance { props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", "5000") + props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) val consumerConfig = new ConsumerConfig(props) val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt)
