Repository: kafka Updated Branches: refs/heads/trunk b6af35161 -> 9e72c12e9
KAFKA-4458; add per partition in-sync and assigned replica count (KIP-96) Author: Xavier Léauté <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2186 from xvrl/per-partition-replica-metrics Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e72c12e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e72c12e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e72c12e Branch: refs/heads/trunk Commit: 9e72c12e9811811997b6ca99a6dc56a9af46e43d Parents: b6af351 Author: Xavier LeÌauteÌ <[email protected]> Authored: Wed Dec 7 00:28:56 2016 +0000 Committer: Ismael Juma <[email protected]> Committed: Wed Dec 7 00:31:49 2016 +0000 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 32 +++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9e72c12e/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c8db015..7e52a91 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -75,14 +75,28 @@ class Partition(val topic: String, tags ) - def isUnderReplicated(): Boolean = { - leaderReplicaIfLocal() match { - case Some(_) => - inSyncReplicas.size < assignedReplicas.size - case None => - false - } - } + newGauge("InSyncReplicasCount", + new Gauge[Int] { + def value = { + if (isLeaderReplicaLocal) inSyncReplicas.size else 0 + } + }, + tags + ) + + newGauge("ReplicasCount", + new Gauge[Int] { + def value = { + if (isLeaderReplicaLocal) assignedReplicas.size else 0 + } + }, + tags + ) + + private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined + + def isUnderReplicated: Boolean = + isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { val replicaOpt = getReplica(replicaId) @@ -480,6 +494,8 @@ class Partition(val topic: String, */ private def removePartitionMetrics() { removeMetric("UnderReplicated", tags) + removeMetric("InSyncReplicasCount", tags) + removeMetric("ReplicasCount", tags) } override def equals(that: Any): Boolean = {
