This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a7f29c  KAFKA-7152; Avoid moving a replica out of isr if its LEO 
equals leader's LEO
9a7f29c is described below

commit 9a7f29c1ede9f1ece67b73d2d6497d60f38fb62a
Author: Zhanxiang (Patrick) Huang <[email protected]>
AuthorDate: Sat Jul 21 16:52:54 2018 -0700

    KAFKA-7152; Avoid moving a replica out of isr if its LEO equals leader's LEO
    
    When there are many inactive partitions in the cluster, we observed 
constant churn of URP in the cluster even if follower can catch up with 
leader's byte-in-rate because leader broker frequently moves replicas of 
inactive partitions out of ISR. This PR mitigates this issue by not moving 
replica out of ISR if follower's LEO == leader's LEO.
    
    Author: Zhanxiang (Patrick) Huang <[email protected]>
    
    Reviewers: Dong Lin <[email protected]>
    
    Closes #5412 from hzxa21/KAFKA-7152
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 ++-
 .../unit/kafka/server/ISRExpirationTest.scala      | 62 +++++++++++++++++-----
 2 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index b80c344..154a8f9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -553,7 +553,8 @@ class Partition(val topic: String,
 
   def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): 
Set[Replica] = {
     /**
-     * there are two cases that will be handled here -
+     * If the follower already has the same leo as the leader, it will not be 
considered as out-of-sync,
+     * otherwise there are two cases that will be handled here -
      * 1. Stuck followers: If the leo of the replica hasn't been updated for 
maxLagMs ms,
      *                     the follower is stuck and should be removed from 
the ISR
      * 2. Slow followers: If the replica has not read up to the leo within the 
last maxLagMs ms,
@@ -565,7 +566,8 @@ class Partition(val topic: String,
      **/
     val candidateReplicas = inSyncReplicas - leaderReplica
 
-    val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - 
r.lastCaughtUpTimeMs) > maxLagMs)
+    val laggingReplicas = candidateReplicas.filter(r =>
+      r.logEndOffset.messageOffset != leaderReplica.logEndOffset.messageOffset 
&& (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
     if (laggingReplicas.nonEmpty)
       debug("Lagging replicas are 
%s".format(laggingReplicas.map(_.brokerId).mkString(",")))
 
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 8212ed6..c90a5b9 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -40,6 +40,7 @@ class IsrExpirationTest {
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, 
Int), Seq[Int]]()
   val replicaLagTimeMaxMs = 100L
   val replicaFetchWaitMaxMs = 100
+  val leaderLogEndOffset = 20
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, 
replicaLagTimeMaxMs.toString)
@@ -81,12 +82,12 @@ class IsrExpirationTest {
     assertEquals("All replicas should be in ISR", 
configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
-    // let the follower catch up to the Leader logEndOffset (15)
+    // let the follower catch up to the Leader logEndOffset - 1
     for (replica <- partition0.assignedReplicas - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                                                    highWatermark = 15L,
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
+                                                    highWatermark = 
leaderLogEndOffset - 1,
                                                     leaderLogStartOffset = 0L,
-                                                    leaderLogEndOffset = 15L,
+                                                    leaderLogEndOffset = 
leaderLogEndOffset,
                                                     followerLogStartOffset = 
0L,
                                                     fetchTimeMs = 
time.milliseconds,
                                                     readSize = -1,
@@ -138,10 +139,10 @@ class IsrExpirationTest {
 
     // Make the remote replica not read to the end of log. It should be not be 
out of sync for at least 100 ms
     for (replica <- partition0.assignedReplicas - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(10L), MemoryRecords.EMPTY),
-                                                    highWatermark = 10L,
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset - 2), MemoryRecords.EMPTY),
+                                                    highWatermark = 
leaderLogEndOffset - 2,
                                                     leaderLogStartOffset = 0L,
-                                                    leaderLogEndOffset = 15L,
+                                                    leaderLogEndOffset = 
leaderLogEndOffset,
                                                     followerLogStartOffset = 
0L,
                                                     fetchTimeMs = 
time.milliseconds,
                                                     readSize = -1,
@@ -155,10 +156,10 @@ class IsrExpirationTest {
     time.sleep(75)
 
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
-      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(11L), MemoryRecords.EMPTY),
-                            highWatermark = 11L,
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
+                            highWatermark = leaderLogEndOffset - 1,
                             leaderLogStartOffset = 0L,
-                            leaderLogEndOffset = 15L,
+                            leaderLogEndOffset = leaderLogEndOffset,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1,
@@ -175,10 +176,10 @@ class IsrExpirationTest {
 
     // Now actually make a fetch to the end of the log. The replicas should be 
back in ISR
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
-      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                            highWatermark = 15L,
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
+                            highWatermark = leaderLogEndOffset,
                             leaderLogStartOffset = 0L,
-                            leaderLogEndOffset = 15L,
+                            leaderLogEndOffset = leaderLogEndOffset,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1,
@@ -190,6 +191,40 @@ class IsrExpirationTest {
     EasyMock.verify(log)
   }
 
+  /*
+   * Test the case where a follower has already caught up with same log end 
offset with the leader. This follower should not be considered as out-of-sync
+   */
+  @Test
+  def testIsrExpirationForCaughtUpFollowers() {
+    val log = logMock
+
+    // create one partition and all replicas
+    val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, 
configs.head, log)
+    assertEquals("All replicas should be in ISR", 
configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+    // let the follower catch up to the Leader logEndOffset
+    for (replica <- partition0.assignedReplicas - leaderReplica)
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
+        highWatermark = leaderLogEndOffset,
+        leaderLogStartOffset = 0L,
+        leaderLogEndOffset = leaderLogEndOffset,
+        followerLogStartOffset = 0L,
+        fetchTimeMs = time.milliseconds,
+        readSize = -1,
+        lastStableOffset = None))
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
+
+    // let some time pass
+    time.sleep(150)
+
+    // even though follower hasn't pulled any data for > replicaMaxLagTimeMs 
ms, the follower has already caught up. So it is not out-of-sync.
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: 
Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId = config.brokerId
@@ -222,6 +257,7 @@ class IsrExpirationTest {
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
+    
EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
     EasyMock.replay(log)
     log
   }

Reply via email to