This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b99be76ff1e KAFKA-9087 Replace log high watermark by future log high 
watermark wh… (#13075)
b99be76ff1e is described below

commit b99be76ff1e79b10dd284375c41437f214ca587f
Author: Chia-Ping Tsai <chia7...@gmail.com>
AuthorDate: Sat Jan 7 15:24:36 2023 +0800

    KAFKA-9087 Replace log high watermark by future log high watermark wh… 
(#13075)
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Justine Olshan 
<jols...@confluent.io>, Jun Rao <jun...@gmail.com>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  8 +--
 .../unit/kafka/server/ReplicaManagerTest.scala     | 57 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6aa08d048da..ac002c6774b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1534,14 +1534,14 @@ class ReplicaManager(val config: KafkaConfig,
     
leaderTopicSet.diff(newFollowerTopics).foreach(brokerTopicStats.removeOldFollowerMetrics)
   }
 
-  protected def maybeAddLogDirFetchers(partitions: Set[Partition],
+  protected[server] def maybeAddLogDirFetchers(partitions: Set[Partition],
                                        offsetCheckpoints: OffsetCheckpoints,
                                        topicIds: String => Option[Uuid]): Unit 
= {
     val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, 
InitialFetchState]
     for (partition <- partitions) {
       val topicPartition = partition.topicPartition
-      if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
-        partition.log.foreach { log =>
+      logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>
+        partition.log.foreach { _ =>
           val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
           // Add future replica log to partition's map
@@ -1556,7 +1556,7 @@ class ReplicaManager(val config: KafkaConfig,
           logManager.abortAndPauseCleaning(topicPartition)
 
           futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-            partition.getLeaderEpoch, log.highWatermark))
+            partition.getLeaderEpoch, futureLog.highWatermark))
         }
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9832059c7c0..7bb2cd42482 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -205,6 +205,63 @@ class ReplicaManagerTest {
     when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
+    val dir1 = TestUtils.tempDir()
+    val dir2 = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+    val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+    mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+    
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = logManager,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+    val partition = rm.createPartition(new TopicPartition(topic, 0))
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+      new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+    rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(0)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(0)
+        .setIsr(Seq[Integer](0).asJava)
+        .setPartitionEpoch(0)
+        .setReplicas(Seq[Integer](0).asJava)
+        .setIsNew(false)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
+      Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
+    appendRecords(rm, new TopicPartition(topic, 0),
+      MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first 
message".getBytes()), new SimpleRecord("second message".getBytes())))
+    logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), 
dir2.getAbsolutePath)
+
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
+      new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+    // remove cache to disable OffsetsForLeaderEpoch API
+    partition.futureLog.get.leaderEpochCache = None
+
+    // this method should use hw of future log to create log dir fetcher. 
Otherwise, it causes offset mismatch error
+    rm.maybeAddLogDirFetchers(Set(partition), new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None)
+    rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, 
s.fetchOffset)))
+    // make sure alter log dir thread has processed the data
+    rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
t.doWork())
+    assertEquals(Set.empty, 
rm.replicaAlterLogDirsManager.failedPartitions.partitions())
+    // the future log becomes the current log, so the partition state should 
get removed
+    rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
assertEquals(None, t.fetchState(new TopicPartition(topic, 0))))
+  }
+
   @Test
   def testClearPurgatoryOnBecomingFollower(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)

Reply via email to