This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef89cf4  KAFKA-7838: Log leader and follower end offsets when 
shrinking ISR (#6168)
ef89cf4 is described below

commit ef89cf4eb687dbcca719acca09c98ded001d12dd
Author: Dhruvil Shah <dhru...@confluent.io>
AuthorDate: Fri Jan 25 14:11:31 2019 -0800

    KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168)
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index ca3abbb..e731111 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -634,16 +634,25 @@ class Partition(val topicPartition: TopicPartition,
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, 
replicaMaxLagTimeMs)
-          if(outOfSyncReplicas.nonEmpty) {
+          if (outOfSyncReplicas.nonEmpty) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.nonEmpty)
-            info("Shrinking ISR from %s to 
%s".format(inSyncReplicas.map(_.brokerId).mkString(","),
-              newInSyncReplicas.map(_.brokerId).mkString(",")))
+            info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, 
endOffset: %d). Out of sync replicas: %s."
+              .format(inSyncReplicas.map(_.brokerId).mkString(","),
+                newInSyncReplicas.map(_.brokerId).mkString(","),
+                leaderReplica.highWatermark.messageOffset,
+                leaderReplica.logEndOffset.messageOffset,
+                outOfSyncReplicas.map { replica =>
+                  s"(brokerId: ${replica.brokerId}, endOffset: 
${replica.logEndOffset.messageOffset})"
+                }.mkString(" ")
+              )
+            )
+
             // update ISR in zk and in cache
             updateIsr(newInSyncReplicas)
-            // we may need to increment high watermark since ISR could be down 
to 1
-
             replicaManager.isrShrinkRate.mark()
+
+            // we may need to increment high watermark since ISR could be down 
to 1
             maybeIncrementLeaderHW(leaderReplica)
           } else {
             false

Reply via email to