This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a700895234360aad2602227324b7a6dff140cd77
Author: Bob Barrett <[email protected]>
AuthorDate: Sun Aug 12 23:30:09 2018 -0700

    KAFKA-7164; Follower should truncate after every missed leader epoch change 
(#5436)
    
    Currently, we skip the steps to make a replica a follower if the leader 
does not change, including truncating the follower log if necessary. This can 
cause problems if the follower has missed one or more leader updates. Change 
the logic to only skip the steps if the new epoch is the same or one greater 
than the old epoch. Tested with unit tests that verify the behavior of 
`Partition` and that show log truncation when the follower's log is ahead of 
the leader's, the follower has missed [...]
    
    Reviewers: Stanislav Kozlovski <[email protected]>, Jason 
Gustafson <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  23 +++
 .../unit/kafka/server/ReplicaManagerTest.scala     | 229 ++++++++++++++++++++-
 3 files changed, 256 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9339d29..f953722 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -303,12 +303,15 @@ class Partition(val topic: String,
 
   /**
    *  Make the local replica the follower by setting the new leader and ISR to 
empty
-   *  If the leader replica id does not change, return false to indicate the 
replica manager
+   *  If the leader replica id does not change and the new epoch is equal or 
one
+   *  greater (that is, no updates have been missed), return false to indicate 
to the
+    * replica manager that state is already correct and the become-follower 
steps can be skipped
    */
   def makeFollower(controllerId: Int, partitionStateInfo: 
LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val newAssignedReplicas = 
partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
-      val newLeaderBrokerId: Int = partitionStateInfo.basePartitionState.leader
+      val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
+      val oldLeaderEpoch = leaderEpoch
       // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper 
path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
@@ -320,7 +323,9 @@ class Partition(val topic: String,
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
 
-      if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == 
newLeaderBrokerId) {
+      // If the leader is unchanged and the epochs are no more than one change 
apart, indicate that no follower changes are required
+      // Otherwise, we missed a leader epoch update, which means the leader's 
log may have been truncated prior to the current epoch.
+      if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && (leaderEpoch == 
oldLeaderEpoch || leaderEpoch == oldLeaderEpoch + 1)) {
         false
       }
       else {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index fe5d578..6de8e55 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -31,9 +31,11 @@ import 
org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.scalatest.Assertions.assertThrows
+
 import scala.collection.JavaConverters._
 
 class PartitionTest {
@@ -162,6 +164,27 @@ class PartitionTest {
     }
   }
 
+  @Test
+  def testMakeFollowerWithNoLeaderIdChange(): Unit = {
+    val partition = new Partition(topicPartition.topic, 
topicPartition.partition, time, replicaManager)
+
+    // Start off as follower
+    var partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 1, 
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+    partition.makeFollower(0, partitionStateInfo, 0)
+
+    // Request with same leader and epoch increases by more than 1, perform 
become-follower steps
+    partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 3, 
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+    assertTrue(partition.makeFollower(0, partitionStateInfo, 1))
+
+    // Request with same leader and epoch increases by only 1, skip 
become-follower steps
+    partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4, 
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+    assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
+
+    // Request with same leader and same epoch, skip become-follower steps
+    partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4, 
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+    assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
+  }
+
   def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, 
partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 56d4b79..171bcf3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -19,21 +19,26 @@ package kafka.server
 
 import java.io.File
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.log.LogConfig
+import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
+import kafka.cluster.BrokerEndPoint
+import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, 
LeaderAndIsrRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -51,6 +56,11 @@ class ReplicaManagerTest {
   var zkClient: ZkClient = _
   var kafkaZkClient: KafkaZkClient = _
 
+  // Constants defined for readability
+  val zkVersion = 0
+  val correlationId = 0
+  var controllerEpoch = 0
+
   @Before
   def setUp() {
     zkClient = EasyMock.createMock(classOf[ZkClient])
@@ -504,6 +514,216 @@ class ReplicaManagerTest {
     }
   }
 
+  /**
+    * If a partition becomes a follower and the leader is unchanged it should 
check for truncation
+    * if the epoch has increased by more than one (which suggests it has 
missed an update)
+    */
+  @Test
+  def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate() {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val controllerId = 0
+    val controllerEpoch = 0
+    var leaderEpoch = 1
+    val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, followerBrokerId, leaderBrokerId, countDownLatch, 
expectTruncation = true)
+
+    // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
+    val partition = replicaManager.getOrCreatePartition(new 
TopicPartition(topic, topicPartition))
+    partition.getOrCreateReplica(followerBrokerId)
+    partition.makeFollower(controllerId,
+      leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
+      correlationId)
+
+    // Make local partition a follower - because epoch increased by more than 
1, truncation should
+    // trigger even though leader does not change
+    leaderEpoch += 2
+    val leaderAndIsrRequest0 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId, controllerEpoch,
+      collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
+        leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, 
aliveBrokerIds)).asJava,
+      Set(new Node(followerBrokerId, "host1", 0),
+        new Node(leaderBrokerId, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
+      (_, followers) => assertEquals(followerBrokerId, 
followers.head.partitionId))
+    assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
+
+    // Truncation should have happened once
+    EasyMock.verify(mockLogMgr)
+  }
+
+  /**
+    * If a partition becomes a follower and the leader is unchanged but no 
epoch update
+    * has been missed, it should not check for truncation
+    */
+  @Test
+  def testDontBecomeFollowerWhenNoMissedLeaderUpdate() {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val controllerId = 0
+    var leaderEpoch = 1
+    val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+      topicPartition, followerBrokerId, leaderBrokerId, countDownLatch, 
expectTruncation = false)
+
+    // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
+    val partition = replicaManager.getOrCreatePartition(new 
TopicPartition(topic, topicPartition))
+    partition.getOrCreateReplica(followerBrokerId)
+    partition.makeFollower(controllerId,
+      leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
+      correlationId)
+
+    // Make local partition a follower - because epoch did not change, 
truncation should not trigger
+    val leaderAndIsrRequest0 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId, controllerEpoch,
+      collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
+        leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, 
aliveBrokerIds)).asJava,
+      Set(new Node(followerBrokerId, "host1", 0),
+        new Node(leaderBrokerId, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
+      (_, followers) => assertTrue(followers.isEmpty))
+
+    // Make local partition a follower - because epoch increased by only 1 and 
leader did not change,
+    // truncation should not trigger
+    leaderEpoch += 1
+    val leaderAndIsrRequest1 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId, controllerEpoch,
+      collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
+        leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, 
aliveBrokerIds)).asJava,
+      Set(new Node(followerBrokerId, "host1", 0),
+        new Node(leaderBrokerId, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1,
+      (_, followers) => assertTrue(followers.isEmpty))
+
+    // Truncation should not have happened
+    EasyMock.verify(mockLogMgr)
+  }
+
+  private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+                                                 followerBrokerId: Int,
+                                                 leaderBrokerId: Int,
+                                                 countDownLatch: 
CountDownLatch,
+                                                 expectTruncation: Boolean) : 
(ReplicaManager, LogManager) = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+
+    // Setup mock local log to have leader epoch of 3 and offset of 10
+    val localLogOffset = 10
+    val offsetFromLeader = 5
+    val leaderEpochFromLeader = 3
+    val mockScheduler = new MockScheduler(time)
+    val mockBrokerTopicStats = new BrokerTopicStats
+    val mockLogDirFailureChannel = new 
LogDirFailureChannel(config.logDirs.size)
+    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
+    
EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
+    EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
+      .andReturn((leaderEpochFromLeader, localLogOffset))
+    EasyMock.replay(mockLeaderEpochCache)
+    val mockLog = new Log(
+      dir = new File(new File(config.logDirs.head), s"$topic-0"),
+      config = LogConfig(),
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = mockScheduler,
+      brokerTopicStats = mockBrokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = 30000,
+      producerIdExpirationCheckIntervalMs = 30000,
+      topicPartition = new TopicPartition(topic, topicPartition),
+      producerStateManager = new ProducerStateManager(new 
TopicPartition(topic, topicPartition),
+        new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 
30000),
+      logDirFailureChannel = mockLogDirFailureChannel) {
+
+      override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache
+
+      override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
+    }
+
+    // Expect to call LogManager.truncateTo exactly once
+    val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+    EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new 
File(_).getAbsoluteFile)).anyTimes
+    EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
+    EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, 
topicPartition),
+      LogConfig(), isNew = false, isFuture = 
false)).andReturn(mockLog).anyTimes
+    if (expectTruncation) {
+      EasyMock.expect(mockLogMgr.truncateTo(Map(new TopicPartition(topic, 
topicPartition) -> offsetFromLeader),
+        isFuture = false)).once
+    }
+    EasyMock.replay(mockLogMgr)
+
+    val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
+    val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, 
s"host$brokerId", brokerId))
+
+    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
+    aliveBrokerIds.foreach { brokerId =>
+      
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
+    }
+    EasyMock.replay(metadataCache)
+
+    val timer = new MockTimer
+    val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+      purgatoryName = "Produce", timer, reaperEnabled = false)
+    val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+      purgatoryName = "Fetch", timer, reaperEnabled = false)
+    val mockDeleteRecordsPurgatory = new 
DelayedOperationPurgatory[DelayedDeleteRecords](
+      purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
+
+    // Mock network client to show leader offset of 5
+    val quota = QuotaFactory.instantiate(config, metrics, time, "")
+    val blockingSend = new ReplicaFetcherMockBlockingSend(Map(new 
TopicPartition(topic, topicPartition) ->
+      new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava, 
BrokerEndPoint(1, "host1" ,1), time)
+    val replicaManager = new ReplicaManager(config, metrics, time, 
kafkaZkClient, mockScheduler, mockLogMgr,
+      new AtomicBoolean(false), quota, mockBrokerTopicStats,
+      metadataCache, mockLogDirFailureChannel, mockProducePurgatory, 
mockFetchPurgatory,
+      mockDeleteRecordsPurgatory, Option(this.getClass.getName)) {
+
+      override protected def createReplicaFetcherManager(metrics: Metrics,
+                                                     time: Time,
+                                                     threadNamePrefix: 
Option[String],
+                                                     quotaManager: 
ReplicationQuotaManager): ReplicaFetcherManager = {
+        new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, quotaManager) {
+
+          override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+            new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", 
fetcherId,
+              sourceBroker, config, replicaManager, metrics, time, 
quota.follower, Some(blockingSend)) {
+
+              override def doWork() = {
+                // In case the thread starts before the partition is added by 
AbstractFetcherManager,
+                // add it here (it's a no-op if already added)
+                addPartitions(Map(new TopicPartition(topic, topicPartition) -> 
0L))
+                super.doWork()
+
+                // Shut the thread down after one iteration to avoid 
double-counting truncations
+                initiateShutdown()
+                countDownLatch.countDown()
+              }
+            }
+          }
+        }
+      }
+    }
+
+    (replicaManager, mockLogMgr)
+  }
+
+  private def leaderAndIsrPartitionState(leaderEpoch: Int,
+                                         leaderBrokerId: Int,
+                                         aliveBrokerIds: Seq[Integer]) : 
LeaderAndIsrRequest.PartitionState = {
+    new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderBrokerId, 
leaderEpoch, aliveBrokerIds.asJava,
+      zkVersion, aliveBrokerIds.asJava, false)
+  }
+
   private class CallbackResult[T] {
     private var value: Option[T] = None
     private var fun: Option[T => Unit] = None
@@ -532,7 +752,8 @@ class ReplicaManagerTest {
   private def appendRecords(replicaManager: ReplicaManager,
                             partition: TopicPartition,
                             records: MemoryRecords,
-                            isFromClient: Boolean = true): 
CallbackResult[PartitionResponse] = {
+                            isFromClient: Boolean = true,
+                            requiredAcks: Short = -1): 
CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
     def appendCallback(responses: Map[TopicPartition, PartitionResponse]): 
Unit = {
       val response = responses.get(partition)
@@ -542,7 +763,7 @@ class ReplicaManagerTest {
 
     replicaManager.appendRecords(
       timeout = 1000,
-      requiredAcks = -1,
+      requiredAcks = requiredAcks,
       internalTopicsAllowed = false,
       isFromClient = isFromClient,
       entriesPerPartition = Map(partition -> records),

Reply via email to