This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 74d5c9165423c545b3ce08629074afadbeb767b7
Author: Anna Povzner <[email protected]>
AuthorDate: Tue Aug 28 10:45:00 2018 -0700

    KAFKA-7128; Follower has to catch up to offset within current leader epoch 
to join ISR (#5557)
    
    If follower is not in ISR, it has to fetch up to start offset of the 
current leader epoch. Otherwise we risk losing committed data. Added unit test 
to verify this behavior.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 25 ++++--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 89 +++++++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 +-
 3 files changed, 107 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index f953722..e3a8186 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -63,6 +63,9 @@ class Partition(val topic: String,
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+  // start offset for 'leaderEpoch' above (leader epoch of the current leader 
for this partition),
+  // defined when this broker is leader for partition
+  @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
   @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
 
@@ -241,6 +244,7 @@ class Partition(val topic: String,
       allReplicasMap.clear()
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
+      leaderEpochStartOffsetOpt = None
       removePartitionMetrics()
       logManager.asyncDelete(topicPartition)
       logManager.asyncDelete(topicPartition, isFuture = true)
@@ -265,17 +269,19 @@ class Partition(val topic: String,
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- 
newAssignedReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
+      newAssignedReplicas.foreach(id => getOrCreateReplica(id, 
partitionStateInfo.isNew))
 
-      info(s"$topicPartition starts at Leader Epoch 
${partitionStateInfo.basePartitionState.leaderEpoch} from offset 
${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: 
$leaderEpoch")
+      val leaderReplica = getReplica().get
+      val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset
+      info(s"$topicPartition starts at Leader Epoch 
${partitionStateInfo.basePartitionState.leaderEpoch} from " +
+        s"offset $leaderEpochStartOffset. Previous Leader Epoch was: 
$leaderEpoch")
 
       //We cache the leader epoch here, persisting it only if it's local 
(hence having a log dir)
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
-      newAssignedReplicas.foreach(id => getOrCreateReplica(id, 
partitionStateInfo.isNew))
-
+      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
       val isNewLeader = leaderReplicaIdOpt.map(_ != 
localBrokerId).getOrElse(true)
 
-      val leaderReplica = getReplica().get
       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
@@ -321,6 +327,7 @@ class Partition(val topic: String,
       (assignedReplicas.map(_.brokerId) -- 
newAssignedReplicas).foreach(removeReplica)
       inSyncReplicas = Set.empty[Replica]
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
+      leaderEpochStartOffsetOpt = None
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
 
       // If the leader is unchanged and the epochs are no more than one change 
apart, indicate that no follower changes are required
@@ -365,7 +372,11 @@ class Partition(val topic: String,
 
   /**
    * Check and maybe expand the ISR of the partition.
-   * A replica will be added to ISR if its LEO >= current hw of the partition.
+   * A replica will be added to ISR if its LEO >= current hw of the partition 
and it is caught up to
+   * an offset within the current leader epoch. A replica must be caught up to 
the current leader
+   * epoch before it can join ISR, because otherwise, if there is committed 
data between current
+   * leader's HW and LEO, the replica may become the leader before it fetches 
the committed data
+   * and the data will be lost.
    *
    * Technically, a replica shouldn't be in ISR if it hasn't caught up for 
longer than replicaLagTimeMaxMs,
    * even if its log end offset is >= HW. However, to be consistent with how 
the follower determines
@@ -382,9 +393,11 @@ class Partition(val topic: String,
         case Some(leaderReplica) =>
           val replica = getReplica(replicaId).get
           val leaderHW = leaderReplica.highWatermark
+          val fetchOffset = 
logReadResult.info.fetchOffsetMetadata.messageOffset
           if (!inSyncReplicas.contains(replica) &&
              assignedReplicas.map(_.brokerId).contains(replicaId) &&
-             replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
+             replica.logEndOffset.offsetDiff(leaderHW) >= 0 &&
+             leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) {
             val newInSyncReplicas = inSyncReplicas + replica
             info(s"Expanding ISR from 
${inSyncReplicas.map(_.brokerId).mkString(",")} " +
               s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 6de8e55..b238b8e 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -22,10 +22,10 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.UnexpectedAppendOffsetException
-import kafka.log.{Log, LogConfig, LogManager, CleanerConfig}
+import kafka.log.{LogConfig, LogManager, CleanerConfig}
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils, MockScheduler}
-import kafka.utils.timer.MockTimer
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.metrics.Metrics
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.scalatest.Assertions.assertThrows
+import org.easymock.EasyMock
 
 import scala.collection.JavaConverters._
 
@@ -69,10 +70,16 @@ class PartitionTest {
     val brokerProps = TestUtils.createBrokerConfig(brokerId, 
TestUtils.MockZkConnect)
     brokerProps.put("log.dir", logDir.getAbsolutePath)
     val brokerConfig = KafkaConfig.fromProps(brokerProps)
+    val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     replicaManager = new ReplicaManager(
-      config = brokerConfig, metrics, time, zkClient = null, new 
MockScheduler(time),
+      config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new 
MockScheduler(time),
       logManager, new AtomicBoolean(false), 
QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
       brokerTopicStats, new MetadataCache(brokerId), new 
LogDirFailureChannel(brokerConfig.logDirs.size))
+
+    EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), 
EasyMock.anyString())).andReturn(logProps).anyTimes()
+    EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+      .andReturn((true, 0)).anyTimes()
+    EasyMock.replay(kafkaZkClient)
   }
 
   @After
@@ -185,6 +192,82 @@ class PartitionTest {
     assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
   }
 
+  @Test
+  def 
testFollowerDoesNotJoinISRUntilCaughtUpToOffsetWithinCurrentLeaderEpoch(): Unit 
= {
+    val controllerEpoch = 3
+    val leader = brokerId
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val controllerId = brokerId + 3
+    val replicas = List[Integer](leader, follower1, follower2).asJava
+    val isr = List[Integer](leader, follower2).asJava
+    val leaderEpoch = 8
+    val batch1 = TestUtils.records(records = List(new 
SimpleRecord("k1".getBytes, "v1".getBytes),
+                                                  new 
SimpleRecord("k2".getBytes, "v2".getBytes)))
+    val batch2 = TestUtils.records(records = List(new 
SimpleRecord("k3".getBytes, "v1".getBytes),
+                                                  new 
SimpleRecord("k4".getBytes, "v2".getBytes),
+                                                  new 
SimpleRecord("k5".getBytes, "v3".getBytes)))
+    val batch3 = TestUtils.records(records = List(new 
SimpleRecord("k6".getBytes, "v1".getBytes),
+                                                  new 
SimpleRecord("k7".getBytes, "v2".getBytes)))
+
+    val partition = new Partition(topicPartition.topic, 
topicPartition.partition, time, replicaManager)
+    assertTrue("Expected first makeLeader() to return 'leader changed'",
+               partition.makeLeader(controllerId, new 
LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 
1, replicas, true), 0))
+    assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
+    assertEquals("ISR", Set[Integer](leader, follower2), 
partition.inSyncReplicas.map(_.brokerId))
+
+    // after makeLeader(() call, partition should know about all the replicas
+    val leaderReplica = partition.getReplica(leader).get
+    val follower1Replica = partition.getReplica(follower1).get
+    val follower2Replica = partition.getReplica(follower2).get
+
+    // append records with initial leader epoch
+    val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, 
isFromClient = true).lastOffset
+    partition.appendRecordsToLeader(batch2, isFromClient = true)
+    assertEquals("Expected leader's HW not move", 
leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset)
+
+    // let the follower in ISR move leader's HW to move further but below LEO
+    def readResult(fetchInfo: FetchDataInfo, leaderReplica: Replica): 
LogReadResult = {
+      LogReadResult(info = fetchInfo,
+                    highWatermark = leaderReplica.highWatermark.messageOffset,
+                    leaderLogStartOffset = leaderReplica.logStartOffset,
+                    leaderLogEndOffset = 
leaderReplica.logEndOffset.messageOffset,
+                    followerLogStartOffset = 0,
+                    fetchTimeMs = time.milliseconds,
+                    readSize = 10240,
+                    lastStableOffset = None)
+    }
+    partition.updateReplicaLogReadResult(
+      follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), 
batch1), leaderReplica))
+    partition.updateReplicaLogReadResult(
+      follower2Replica, 
readResult(FetchDataInfo(LogOffsetMetadata(lastOffsetOfFirstBatch), batch2), 
leaderReplica))
+    assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, 
leaderReplica.highWatermark.messageOffset)
+
+    // current leader becomes follower and then leader again (without any new 
records appended)
+    partition.makeFollower(
+      controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, 
follower2, leaderEpoch + 1, isr, 1, replicas, false), 1)
+    assertTrue("Expected makeLeader() to return 'leader changed' after 
makeFollower()",
+               partition.makeLeader(controllerEpoch, new 
LeaderAndIsrRequest.PartitionState(
+                 controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, 
false), 2))
+    val currentLeaderEpochStartOffset = 
leaderReplica.logEndOffset.messageOffset
+
+    // append records with the latest leader epoch
+    partition.appendRecordsToLeader(batch3, isFromClient = true)
+
+    // fetch from follower not in ISR from log start offset should not add 
this follower to ISR
+    partition.updateReplicaLogReadResult(follower1Replica,
+                                         
readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
+    partition.updateReplicaLogReadResult(follower1Replica,
+                                         
readResult(FetchDataInfo(LogOffsetMetadata(lastOffsetOfFirstBatch), batch2), 
leaderReplica))
+    assertEquals("ISR", Set[Integer](leader, follower2), 
partition.inSyncReplicas.map(_.brokerId))
+
+    // fetch from the follower not in ISR from start offset of the current 
leader epoch should
+    // add this follower to ISR
+    partition.updateReplicaLogReadResult(follower1Replica,
+                                         
readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), 
batch3), leaderReplica))
+    assertEquals("ISR", Set[Integer](leader, follower1, follower2), 
partition.inSyncReplicas.map(_.brokerId))
+ }
+
   def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, 
partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 171bcf3..3be33a2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -627,7 +627,7 @@ class ReplicaManagerTest {
     val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
     
EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
-      .andReturn((leaderEpochFromLeader, localLogOffset))
+      .andReturn(localLogOffset)
     EasyMock.replay(mockLeaderEpochCache)
     val mockLog = new Log(
       dir = new File(new File(config.logDirs.head), s"$topic-0"),
@@ -682,7 +682,7 @@ class ReplicaManagerTest {
     // Mock network client to show leader offset of 5
     val quota = QuotaFactory.instantiate(config, metrics, time, "")
     val blockingSend = new ReplicaFetcherMockBlockingSend(Map(new 
TopicPartition(topic, topicPartition) ->
-      new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava, 
BrokerEndPoint(1, "host1" ,1), time)
+      new EpochEndOffset(offsetFromLeader)).asJava, BrokerEndPoint(1, "host1" 
,1), time)
     val replicaManager = new ReplicaManager(config, metrics, time, 
kafkaZkClient, mockScheduler, mockLogMgr,
       new AtomicBoolean(false), quota, mockBrokerTopicStats,
       metadataCache, mockLogDirFailureChannel, mockProducePurgatory, 
mockFetchPurgatory,

Reply via email to