This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b485f92647 KAFKA-13790; ReplicaManager should be robust to all 
partition updates from kraft metadata log (#12085)
b485f92647 is described below

commit b485f92647faecc3594bdf4164999d52c859b1bb
Author: David Jacot <[email protected]>
AuthorDate: Mon May 9 20:47:14 2022 +0200

    KAFKA-13790; ReplicaManager should be robust to all partition updates from 
kraft metadata log (#12085)
    
    This patch refactors the `Partition.makeLeader` and 
`Partition.makeFollower` to be robust to all partition updates from the KRaft 
metadata log. Particularly, it ensures the following invariants:
    
    - A partition update is accepted if the partition epoch is equal or newer. 
The partition epoch is updated by the AlterPartition path as well so we accept 
an update from the metadata log with the same partition epoch in order to fully 
update the partition state.
    - The leader epoch state offset is only updated when the leader epoch is 
bumped.
    - The follower states are only updated when the leader epoch is bumped.
    - Fetchers are only restarted when the leader epoch is bumped. This was 
already the case but this patch adds unit tests to prove/maintain it.
    
    In the mean time, the patch unifies the state change logs to be similar in 
both ZK and KRaft world.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 143 ++++++++------
 .../main/scala/kafka/server/ReplicaManager.scala   |  26 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 201 ++++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 218 ++++++++++++++++++++-
 4 files changed, 510 insertions(+), 78 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 126aa71e7c..907c0e3397 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -257,7 +257,7 @@ class Partition(val topicPartition: TopicPartition,
   @volatile private var leaderEpoch: Int = LeaderAndIsr.InitialLeaderEpoch - 1
   // start offset for 'leaderEpoch' above (leader epoch of the current leader 
for this partition),
   // defined when this broker is leader for partition
-  @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
+  @volatile private[cluster] var leaderEpochStartOffsetOpt: Option[Long] = None
   // Replica ID of the leader, defined when this broker is leader or follower 
for the partition.
   @volatile var leaderReplicaIdOpt: Option[Int] = None
   @volatile private[cluster] var partitionState: PartitionState = 
CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED)
@@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid]): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper 
path
+      // Partition state changes are expected to have an partition epoch 
larger or equal
+      // to the current partition epoch. The latter is allowed because the 
partition epoch
+      // is also updated by the AlterPartition response so the new epoch might 
be known
+      // before a LeaderAndIsr request is received or before an update is 
received via
+      // the metadata log.
+      if (partitionState.partitionEpoch < partitionEpoch) {
+        stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +
+          s"and partition state $partitionState since the leader is already at 
a newer partition epoch $partitionEpoch.")
+        return false
+      }
+
+      // Record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper 
path.
       controllerEpoch = partitionState.controllerEpoch
 
+      val currentTimeMs = time.milliseconds
+      val isNewLeader = !isLeader
+      val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
       val isr = partitionState.isr.asScala.map(_.toInt).toSet
       val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
       val removingReplicas = 
partitionState.removingReplicas.asScala.map(_.toInt)
 
-      if (partitionState.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING.value()) {
-        stateChangeLogger.info(
-          s"The topic partition $topicPartition was marked as RECOVERING. 
Leader log recovery is not implemented. " +
-          "Marking the topic partition as RECOVERED."
-        )
+      if (partitionState.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING.value) {
+        stateChangeLogger.info(s"The topic partition $topicPartition was 
marked as RECOVERING. " +
+          "Marking the topic partition as RECOVERED.")
       }
 
+      // Updating the assignment and ISR state is safe if the partition epoch 
is
+      // larger or equal to the current partition epoch.
       updateAssignmentAndIsr(
         assignment = partitionState.replicas.asScala.map(_.toInt),
         isr = isr,
@@ -576,51 +590,60 @@ class Partition(val topicPartition: TopicPartition,
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
-            s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch ", e)
-
+            s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch.", e)
           return false
       }
 
       val leaderLog = localLogOrException
-      val leaderEpochStartOffset = leaderLog.logEndOffset
-      stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch 
${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochStartOffset with high watermark 
${leaderLog.highWatermark} " +
-        s"ISR ${isr.mkString("[", ",", "]")} addingReplicas 
${addingReplicas.mkString("[", ",", "]")} " +
-        s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. 
Previous leader epoch was $leaderEpoch.")
 
-      // We cache the leader epoch here, persisting it only if it's local 
(hence having a log dir)
-      leaderEpoch = partitionState.leaderEpoch
-      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
-      partitionEpoch = partitionState.partitionEpoch
-
-      // In the case of successive leader elections in a short time period, a 
follower may have
-      // entries in its log from a later epoch than any entry in the new 
leader's log. In order
-      // to ensure that these followers can truncate to the right offset, we 
must cache the new
-      // leader epoch and the start offset since it should be larger than any 
epoch that a follower
-      // would try to query.
-      leaderLog.maybeAssignEpochStartOffset(leaderEpoch, 
leaderEpochStartOffset)
-
-      val isNewLeader = !isLeader
-      val currentTimeMs = time.milliseconds
+      // We update the epoch start offset and the replicas' state only if the 
leader epoch
+      // has changed.
+      if (isNewLeaderEpoch) {
+        val leaderEpochStartOffset = leaderLog.logEndOffset
+        stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId 
starts at " +
+          s"leader epoch ${partitionState.leaderEpoch} from offset 
$leaderEpochStartOffset " +
+          s"with partition epoch ${partitionState.partitionEpoch}, high 
watermark ${leaderLog.highWatermark}, " +
+          s"ISR ${isr.mkString("[", ",", "]")}, adding replicas 
${addingReplicas.mkString("[", ",", "]")} and " +
+          s"removing replicas ${removingReplicas.mkString("[", ",", "]")}. 
Previous leader epoch was $leaderEpoch.")
+
+        // In the case of successive leader elections in a short time period, 
a follower may have
+        // entries in its log from a later epoch than any entry in the new 
leader's log. In order
+        // to ensure that these followers can truncate to the right offset, we 
must cache the new
+        // leader epoch and the start offset since it should be larger than 
any epoch that a follower
+        // would try to query.
+        leaderLog.maybeAssignEpochStartOffset(partitionState.leaderEpoch, 
leaderEpochStartOffset)
+
+        // Initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and
+        // lastFetchLeaderLogEndOffset.
+        remoteReplicas.foreach { replica =>
+          replica.resetReplicaState(
+            currentTimeMs = currentTimeMs,
+            leaderEndOffset = leaderEpochStartOffset,
+            isNewLeader = isNewLeader,
+            isFollowerInSync = partitionState.isr.contains(replica.brokerId)
+          )
+        }
 
-      // Initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and
-      // lastFetchLeaderLogEndOffset.
-      remoteReplicas.foreach { replica =>
-        replica.resetReplicaState(
-          currentTimeMs = currentTimeMs,
-          leaderEndOffset = leaderEpochStartOffset,
-          isNewLeader = isNewLeader,
-          isFollowerInSync = partitionState.isr.contains(replica.brokerId)
-        )
+        // We update the leader epoch and the leader epoch start offset iff the
+        // leader epoch changed.
+        leaderEpoch = partitionState.leaderEpoch
+        leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
+      } else {
+        stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +
+          s"and partition state $partitionState since it is already the leader 
with leader epoch $leaderEpoch. " +
+          s"Current high watermark ${leaderLog.highWatermark}, ISR 
${isr.mkString("[", ",", "]")}, " +
+          s"adding replicas ${addingReplicas.mkString("[", ",", "]")} and " +
+          s"removing replicas ${removingReplicas.mkString("[", ",", "]")}.")
       }
 
+      partitionEpoch = partitionState.partitionEpoch
       leaderReplicaIdOpt = Some(localBrokerId)
 
-      // we may need to increment high watermark since ISR could be down to 1
+      // We may need to increment high watermark since ISR could be down to 1.
       (maybeIncrementLeaderHW(leaderLog, currentTimeMs = currentTimeMs), 
isNewLeader)
     }
 
-    // some delayed operations may be unblocked after HW changed
+    // Some delayed operations may be unblocked after HW changed.
     if (leaderHWIncremented)
       tryCompleteDelayedRequests()
 
@@ -631,15 +654,20 @@ class Partition(val topicPartition: TopicPartition,
    * Make the local replica the follower by setting the new leader and ISR to 
empty
    * If the leader replica id does not change and the new epoch is equal or one
    * greater (that is, no updates have been missed), return false to indicate 
to the
-   * replica manager that state is already correct and the become-follower 
steps can be skipped
+   * replica manager that state is already correct and the become-follower 
steps can
+   * be skipped.
    */
   def makeFollower(partitionState: LeaderAndIsrPartitionState,
                    highWatermarkCheckpoints: OffsetCheckpoints,
                    topicId: Option[Uuid]): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      val newLeaderBrokerId = partitionState.leader
-      val oldLeaderEpoch = leaderEpoch
-      // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
+      if (partitionState.partitionEpoch < partitionEpoch) {
+        stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
+          s"and partition state $partitionState since the follower is already 
at a newer partition epoch $partitionEpoch.")
+        return false
+      }
+
+      // Record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper 
path
       controllerEpoch = partitionState.controllerEpoch
 
@@ -650,32 +678,37 @@ class Partition(val topicPartition: TopicPartition,
         removingReplicas = 
partitionState.removingReplicas.asScala.map(_.toInt),
         LeaderRecoveryState.of(partitionState.leaderRecoveryState)
       )
+
       try {
         createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId)
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred. 
makeFollower will be skipping the " +
             s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch.", e)
-
           return false
       }
 
       val followerLog = localLogOrException
-      val leaderEpochEndOffset = followerLog.logEndOffset
-      stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch 
${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochEndOffset with high watermark 
${followerLog.highWatermark}. " +
-        s"Previous leader epoch was $leaderEpoch.")
+      val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
 
+      if (isNewLeaderEpoch) {
+        val leaderEpochEndOffset = followerLog.logEndOffset
+        stateChangeLogger.info(s"Follower $topicPartition starts at leader 
epoch ${partitionState.leaderEpoch} from " +
+          s"offset $leaderEpochEndOffset with partition epoch 
${partitionState.partitionEpoch} and " +
+          s"high watermark ${followerLog.highWatermark}. Previous leader epoch 
was $leaderEpoch.")
+      } else {
+        stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
+          s"and partition state $partitionState since it is already a follower 
with leader epoch $leaderEpoch.")
+      }
+
+      leaderReplicaIdOpt = Some(partitionState.leader)
       leaderEpoch = partitionState.leaderEpoch
       leaderEpochStartOffsetOpt = None
       partitionEpoch = partitionState.partitionEpoch
 
-      if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == 
oldLeaderEpoch) {
-        false
-      } else {
-        leaderReplicaIdOpt = Some(newLeaderBrokerId)
-        true
-      }
+      // We must restart the fetchers when the leader epoch changed regardless 
of
+      // whether the leader changed as well.
+      isNewLeaderEpoch
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index fff7fb273d..1c97671c26 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1599,13 +1599,9 @@ class ReplicaManager(val config: KafkaConfig,
       // Update the partition information to be the leader
       partitionStates.forKeyValue { (partition, partitionState) =>
         try {
-          if (partition.makeLeader(partitionState, highWatermarkCheckpoints, 
topicIds(partitionState.topicName)))
+          if (partition.makeLeader(partitionState, highWatermarkCheckpoints, 
topicIds(partitionState.topicName))) {
             partitionsToMakeLeaders += partition
-          else
-            stateChangeLogger.info(s"Skipped the become-leader state change 
after marking its " +
-              s"partition as leader with correlation id $correlationId from 
controller $controllerId epoch $controllerEpoch for " +
-              s"partition ${partition.topicPartition} (last update controller 
epoch ${partitionState.controllerEpoch}) " +
-              s"since it is already the leader for the partition.")
+          }
         } catch {
           case e: KafkaStorageException =>
             stateChangeLogger.error(s"Skipped the become-leader state change 
with " +
@@ -1681,14 +1677,9 @@ class ReplicaManager(val config: KafkaConfig,
         try {
           if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
             // Only change partition state when the leader is available
-            if (partition.makeFollower(partitionState, 
highWatermarkCheckpoints, topicIds(partitionState.topicName)))
+            if (partition.makeFollower(partitionState, 
highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
               partitionsToMakeFollower += partition
-            else
-              stateChangeLogger.info(s"Skipped the become-follower state 
change after marking its partition as " +
-                s"follower with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch " +
-                s"for partition ${partition.topicPartition} (last update " +
-                s"controller epoch ${partitionState.controllerEpoch}) " +
-                s"since the new leader $newLeaderBrokerId is the same as the 
old leader")
+            }
           } else {
             // The leader broker should always be present in the metadata 
cache.
             // If not, we should record the error message and abort the 
transition process for this partition
@@ -2180,11 +2171,7 @@ class ReplicaManager(val config: KafkaConfig,
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
         try {
           val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-          if (!partition.makeLeader(state, offsetCheckpoints, 
Some(info.topicId))) {
-            stateChangeLogger.info("Skipped the become-leader state change for 
" +
-              s"$tp with topic id ${info.topicId} because this partition is " +
-              "already a local leader.")
-          }
+          partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
           changedPartitions.add(partition)
         } catch {
           case e: KafkaStorageException =>
@@ -2234,9 +2221,6 @@ class ReplicaManager(val config: KafkaConfig,
               val state = info.partition.toLeaderAndIsrPartitionState(tp, 
isNew)
               if (partition.makeFollower(state, offsetCheckpoints, 
Some(info.topicId))) {
                 partitionsToMakeFollower.put(tp, partition)
-              } else {
-                stateChangeLogger.info("Skipped the become-follower state 
change after marking its " +
-                  s"partition as follower for partition $tp with id 
${info.topicId} and partition state $state.")
               }
             }
           }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 042d2500fc..8645c4c9f7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -21,6 +21,7 @@ import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
+import kafka.server.epoch.EpochEntry
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors.{ApiException, 
InconsistentTopicIdException, NotLeaderOrFollowerException, 
OffsetNotAvailableException, OffsetOutOfRangeException}
@@ -39,10 +40,10 @@ import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
+
 import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CountDownLatch, Semaphore}
-
 import kafka.server.epoch.LeaderEpochFileCache
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@@ -2105,6 +2106,204 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  @Test
+  def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state is initialized with unknown offset because it is not
+    // in the ISR.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = 0L,
+      logStartOffset = UnifiedLog.UnknownOffset,
+      logEndOffset = UnifiedLog.UnknownOffset
+    )
+
+    // Follower fetches and updates its replica state.
+    partition.updateFollowerFetchState(
+      followerId = followerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = partition.localLogOrException.logEndOffset
+    )
+
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch. This can happen in KRaft when a partition
+    // is reassigned. The leader epoch is not bumped when we add replicas.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(2)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(2, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state has not been reset.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+  }
+
+  @Test
+  def testDoNotUpdateEpochStartOffsetIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+    assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+
+    val leaderLog = partition.localLogOrException
+    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+
+    // Write to the log to increment the log end offset.
+    leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, 
CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v1".getBytes)
+    ), leaderEpoch = leaderEpoch)
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(2)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(2, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+    assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+  }
+
+  @Test
+  def testIgnoreLeaderPartitionStateChangeWithOlderPartitionEpoch(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val replicas = List(leaderId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+
+    // makeLeader is called again with the same leader epoch but with
+    // a older partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(0)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+  }
+
+  @Test
+  def testIgnoreFollowerPartitionStateChangeWithOlderPartitionEpoch(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialFollowerState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(followerId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId, followerId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeFollower(initialFollowerState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+
+    // makeLeader is called again with the same leader epoch but with
+    // a older partition epoch.
+    val updatedFollowerState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(followerId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId, followerId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertFalse(partition.makeFollower(updatedFollowerState, 
offsetCheckpoints, Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+  }
+
   private def makeLeader(
     topicId: Option[Uuid],
     controllerEpoch: Int,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index da2d2a9084..ac2fc9926d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, FeaturesImage, MetadataImage, ProducerIdsImage, 
TopicsDelta, TopicsImage}
+import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.junit.jupiter.api.Assertions._
@@ -63,7 +64,7 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyInt}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{mock, reset, times, verify, when}
 
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
@@ -3622,6 +3623,221 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithZkPath(): Unit = {
+    val localId = 0
+    val topicPartition = new TopicPartition("foo", 0)
+
+    val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      aliveBrokerIds = Seq(localId, localId + 1, localId + 2),
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      // Make the local replica the follower.
+      var request = makeLeaderAndIsrRequest(
+        topicId = FOO_UUID,
+        topicPartition = topicPartition,
+        replicas = Seq(localId, localId + 1),
+        leaderAndIsr = LeaderAndIsr(
+          leader = localId + 1,
+          leaderEpoch = 0,
+          isr = List(localId, localId + 1),
+          leaderRecoveryState = LeaderRecoveryState.RECOVERED,
+          partitionEpoch = 0
+        )
+      )
+
+      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
+
+      // Check the state of that partition.
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.getPartitionEpoch)
+
+      // Verify that the partition was removed and added back.
+      
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+      
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> 
InitialFetchState(
+        topicId = Some(FOO_UUID),
+        leader = BrokerEndPoint(localId + 1, s"host${localId + 1}", localId + 
1),
+        currentLeaderEpoch = 0,
+        initOffset = 0
+      )))
+
+      reset(mockReplicaFetcherManager)
+
+      // Apply changes that bumps the partition epoch.
+      request = makeLeaderAndIsrRequest(
+        topicId = FOO_UUID,
+        topicPartition = topicPartition,
+        replicas = Seq(localId, localId + 1, localId + 2),
+        leaderAndIsr = LeaderAndIsr(
+          leader = localId + 1,
+          leaderEpoch = 0,
+          isr = List(localId, localId + 1),
+          leaderRecoveryState = LeaderRecoveryState.RECOVERED,
+          partitionEpoch = 1
+        )
+      )
+
+      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      // Partition updates is fenced based on the leader epoch on the ZK path.
+      assertEquals(0, followerPartition.getPartitionEpoch)
+
+      // As the update is fenced based on the leader epoch, 
removeFetcherForPartitions and
+      // addFetcherForPartitions are not called at all.
+      reset(mockReplicaFetcherManager)
+
+      // Apply changes that bumps the leader epoch.
+      request = makeLeaderAndIsrRequest(
+        topicId = FOO_UUID,
+        topicPartition = topicPartition,
+        replicas = Seq(localId, localId + 1, localId + 2),
+        leaderAndIsr = LeaderAndIsr(
+          leader = localId + 2,
+          leaderEpoch = 1,
+          isr = List(localId, localId + 1, localId + 2),
+          leaderRecoveryState = LeaderRecoveryState.RECOVERED,
+          partitionEpoch = 2
+        )
+      )
+
+      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(1, followerPartition.getLeaderEpoch)
+      assertEquals(2, followerPartition.getPartitionEpoch)
+
+      // Verify that the partition was removed and added back.
+      
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+      
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> 
InitialFetchState(
+        topicId = Some(FOO_UUID),
+        leader = BrokerEndPoint(localId + 2, s"host${localId + 2}", localId + 
2),
+        currentLeaderEpoch = 1,
+        initOffset = 0
+      )))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath(): Unit 
= {
+    val localId = 0
+    val topicPartition = new TopicPartition("foo", 0)
+
+    val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      // Make the local replica the follower.
+      var followerTopicsDelta = new TopicsDelta(TopicsImage.EMPTY)
+      followerTopicsDelta.replay(new 
TopicRecord().setName("foo").setTopicId(FOO_UUID))
+      followerTopicsDelta.replay(new PartitionRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1))
+        .setIsr(util.Arrays.asList(localId, localId + 1))
+        .setRemovingReplicas(Collections.emptyList())
+        .setAddingReplicas(Collections.emptyList())
+        .setLeader(localId + 1)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+      )
+      var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+      // Check the state of that partition.
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.getPartitionEpoch)
+
+      // Verify that the partition was removed and added back.
+      
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+      
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> 
InitialFetchState(
+        topicId = Some(FOO_UUID),
+        leader = BrokerEndPoint(localId + 1, "localhost", 9093),
+        currentLeaderEpoch = 0,
+        initOffset = 0
+      )))
+
+      reset(mockReplicaFetcherManager)
+
+      // Apply changes that bumps the partition epoch.
+      followerTopicsDelta = new TopicsDelta(followerMetadataImage.topics())
+      followerTopicsDelta.replay(new PartitionChangeRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2))
+        .setIsr(util.Arrays.asList(localId, localId + 1))
+      )
+      followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(1, followerPartition.getPartitionEpoch)
+
+      // Verify that partition's fetcher was not impacted.
+      verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set.empty)
+      verify(mockReplicaFetcherManager).addFetcherForPartitions(Map.empty)
+
+      reset(mockReplicaFetcherManager)
+
+      // Apply changes that bumps the leader epoch.
+      followerTopicsDelta = new TopicsDelta(followerMetadataImage.topics())
+      followerTopicsDelta.replay(new PartitionChangeRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2))
+        .setIsr(util.Arrays.asList(localId, localId + 1, localId + 2))
+        .setLeader(localId + 2)
+      )
+      println(followerTopicsDelta.changedTopic(FOO_UUID).partitionChanges())
+      followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(1, followerPartition.getLeaderEpoch)
+      assertEquals(2, followerPartition.getPartitionEpoch)
+
+      // Verify that the partition was removed and added back.
+      
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+      
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> 
InitialFetchState(
+        topicId = Some(FOO_UUID),
+        leader = BrokerEndPoint(localId + 2, "localhost", 9093),
+        currentLeaderEpoch = 1,
+        initOffset = 0
+      )))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
   private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): 
TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)

Reply via email to