This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a7f29c KAFKA-7152; Avoid moving a replica out of isr if its LEO
equals leader's LEO
9a7f29c is described below
commit 9a7f29c1ede9f1ece67b73d2d6497d60f38fb62a
Author: Zhanxiang (Patrick) Huang <[email protected]>
AuthorDate: Sat Jul 21 16:52:54 2018 -0700
KAFKA-7152; Avoid moving a replica out of isr if its LEO equals leader's LEO
When there are many inactive partitions in the cluster, we observed
constant churn of URP in the cluster even if follower can catch up with
leader's byte-in-rate because leader broker frequently moves replicas of
inactive partitions out of ISR. This PR mitigates this issue by not moving
replica out of ISR if follower's LEO == leader's LEO.
Author: Zhanxiang (Patrick) Huang <[email protected]>
Reviewers: Dong Lin <[email protected]>
Closes #5412 from hzxa21/KAFKA-7152
---
core/src/main/scala/kafka/cluster/Partition.scala | 6 ++-
.../unit/kafka/server/ISRExpirationTest.scala | 62 +++++++++++++++++-----
2 files changed, 53 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index b80c344..154a8f9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -553,7 +553,8 @@ class Partition(val topic: String,
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long):
Set[Replica] = {
/**
- * there are two cases that will be handled here -
+ * If the follower already has the same leo as the leader, it will not be
considered as out-of-sync,
+ * otherwise there are two cases that will be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for
maxLagMs ms,
* the follower is stuck and should be removed from
the ISR
* 2. Slow followers: If the replica has not read up to the leo within the
last maxLagMs ms,
@@ -565,7 +566,8 @@ class Partition(val topic: String,
**/
val candidateReplicas = inSyncReplicas - leaderReplica
- val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds -
r.lastCaughtUpTimeMs) > maxLagMs)
+ val laggingReplicas = candidateReplicas.filter(r =>
+ r.logEndOffset.messageOffset != leaderReplica.logEndOffset.messageOffset
&& (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty)
debug("Lagging replicas are
%s".format(laggingReplicas.map(_.brokerId).mkString(",")))
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 8212ed6..c90a5b9 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -40,6 +40,7 @@ class IsrExpirationTest {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String,
Int), Seq[Int]]()
val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100
+ val leaderLogEndOffset = 20
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp,
replicaLagTimeMaxMs.toString)
@@ -81,12 +82,12 @@ class IsrExpirationTest {
assertEquals("All replicas should be in ISR",
configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
- // let the follower catch up to the Leader logEndOffset (15)
+ // let the follower catch up to the Leader logEndOffset - 1
for (replica <- partition0.assignedReplicas - leaderReplica)
- replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
- highWatermark = 15L,
+ replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
+ highWatermark =
leaderLogEndOffset - 1,
leaderLogStartOffset = 0L,
- leaderLogEndOffset = 15L,
+ leaderLogEndOffset =
leaderLogEndOffset,
followerLogStartOffset =
0L,
fetchTimeMs =
time.milliseconds,
readSize = -1,
@@ -138,10 +139,10 @@ class IsrExpirationTest {
// Make the remote replica not read to the end of log. It should be not be
out of sync for at least 100 ms
for (replica <- partition0.assignedReplicas - leaderReplica)
- replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(10L), MemoryRecords.EMPTY),
- highWatermark = 10L,
+ replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(leaderLogEndOffset - 2), MemoryRecords.EMPTY),
+ highWatermark =
leaderLogEndOffset - 2,
leaderLogStartOffset = 0L,
- leaderLogEndOffset = 15L,
+ leaderLogEndOffset =
leaderLogEndOffset,
followerLogStartOffset =
0L,
fetchTimeMs =
time.milliseconds,
readSize = -1,
@@ -155,10 +156,10 @@ class IsrExpirationTest {
time.sleep(75)
(partition0.assignedReplicas - leaderReplica).foreach { r =>
- r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(11L), MemoryRecords.EMPTY),
- highWatermark = 11L,
+ r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
+ highWatermark = leaderLogEndOffset - 1,
leaderLogStartOffset = 0L,
- leaderLogEndOffset = 15L,
+ leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
@@ -175,10 +176,10 @@ class IsrExpirationTest {
// Now actually make a fetch to the end of the log. The replicas should be
back in ISR
(partition0.assignedReplicas - leaderReplica).foreach { r =>
- r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
- highWatermark = 15L,
+ r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
+ highWatermark = leaderLogEndOffset,
leaderLogStartOffset = 0L,
- leaderLogEndOffset = 15L,
+ leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
@@ -190,6 +191,40 @@ class IsrExpirationTest {
EasyMock.verify(log)
}
+ /*
+ * Test the case where a follower has already caught up with same log end
offset with the leader. This follower should not be considered as out-of-sync
+ */
+ @Test
+ def testIsrExpirationForCaughtUpFollowers() {
+ val log = logMock
+
+ // create one partition and all replicas
+ val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time,
configs.head, log)
+ assertEquals("All replicas should be in ISR",
configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+ val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+ // let the follower catch up to the Leader logEndOffset
+ for (replica <- partition0.assignedReplicas - leaderReplica)
+ replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new
LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
+ highWatermark = leaderLogEndOffset,
+ leaderLogStartOffset = 0L,
+ leaderLogEndOffset = leaderLogEndOffset,
+ followerLogStartOffset = 0L,
+ fetchTimeMs = time.milliseconds,
+ readSize = -1,
+ lastStableOffset = None))
+ var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica,
configs.head.replicaLagTimeMaxMs)
+ assertEquals("No replica should be out of sync", Set.empty[Int],
partition0OSR.map(_.brokerId))
+
+ // let some time pass
+ time.sleep(150)
+
+ // even though follower hasn't pulled any data for > replicaMaxLagTimeMs
ms, the follower has already caught up. So it is not out-of-sync.
+ partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica,
configs.head.replicaLagTimeMaxMs)
+ assertEquals("No replica should be out of sync", Set.empty[Int],
partition0OSR.map(_.brokerId))
+ EasyMock.verify(log)
+ }
+
private def getPartitionWithAllReplicasInIsr(topic: String, partitionId:
Int, time: Time, config: KafkaConfig,
localLog: Log): Partition = {
val leaderId = config.brokerId
@@ -222,6 +257,7 @@ class IsrExpirationTest {
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
EasyMock.expect(log.onHighWatermarkIncremented(0L))
+
EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
EasyMock.replay(log)
log
}