Repository: kafka Updated Branches: refs/heads/trunk bf2563e2f -> 0273c4379
MINOR: follow-up KAFKA-2730 to use two tags for broker id and fetcher id combination Author: Guozhang Wang <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #434 from guozhangwang/K2730-hotfix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0273c437 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0273c437 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0273c437 Branch: refs/heads/trunk Commit: 0273c4379f12d7c3daedc89b0838485270e16bf4 Parents: bf2563e Author: Guozhang Wang <[email protected]> Authored: Thu Nov 5 15:49:29 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 15:49:29 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala | 2 +- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0273c437/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 779876b..96c2a38 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -32,7 +32,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManage case Some(p) => "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id) } - new ReplicaFetcherThread(threadName, sourceBroker, brokerConfig, + new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaMgr, metrics, time) } http://git-wip-us.apache.org/repos/asf/kafka/blob/0273c437/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index fad0e3d..745ea2e 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -40,6 +40,7 @@ import scala.collection.{JavaConverters, Map, mutable} import JavaConverters._ class ReplicaFetcherThread(name: String, + fetcherId: Int, sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, @@ -65,8 +66,9 @@ class ReplicaFetcherThread(name: String, private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port) - // we need to include the full thread id composed of the broker and the thread index - // as the metrics tag to avoid metric name conflicts with more than one thread to the same broker + // we need to include both the broker id and the fetcher id + // as the metrics tag to avoid metric name conflicts with + // more than one fetcher thread to the same broker private val networkClient = { val selector = new Selector( NetworkReceive.UNLIMITED, @@ -74,7 +76,7 @@ class ReplicaFetcherThread(name: String, metrics, time, "replica-fetcher", - Map("thread-id" -> name).asJava, + Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava, false, ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, brokerConfig.values) )
