This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ef89cf4 KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168) ef89cf4 is described below commit ef89cf4eb687dbcca719acca09c98ded001d12dd Author: Dhruvil Shah <dhru...@confluent.io> AuthorDate: Fri Jan 25 14:11:31 2019 -0800 KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168) Reviewers: Jun Rao <jun...@gmail.com> --- core/src/main/scala/kafka/cluster/Partition.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ca3abbb..e731111 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -634,16 +634,25 @@ class Partition(val topicPartition: TopicPartition, leaderReplicaIfLocal match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) - if(outOfSyncReplicas.nonEmpty) { + if (outOfSyncReplicas.nonEmpty) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) - info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","), - newInSyncReplicas.map(_.brokerId).mkString(","))) + info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s." + .format(inSyncReplicas.map(_.brokerId).mkString(","), + newInSyncReplicas.map(_.brokerId).mkString(","), + leaderReplica.highWatermark.messageOffset, + leaderReplica.logEndOffset.messageOffset, + outOfSyncReplicas.map { replica => + s"(brokerId: ${replica.brokerId}, endOffset: ${replica.logEndOffset.messageOffset})" + }.mkString(" ") + ) + ) + // update ISR in zk and in cache updateIsr(newInSyncReplicas) - // we may need to increment high watermark since ISR could be down to 1 - replicaManager.isrShrinkRate.mark() + + // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) } else { false