This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b99be76ff1e KAFKA-9087 Replace log high watermark by future log high watermark wh… (#13075) b99be76ff1e is described below commit b99be76ff1e79b10dd284375c41437f214ca587f Author: Chia-Ping Tsai <chia7...@gmail.com> AuthorDate: Sat Jan 7 15:24:36 2023 +0800 KAFKA-9087 Replace log high watermark by future log high watermark wh… (#13075) Reviewers: Ismael Juma <ism...@juma.me.uk>, Justine Olshan <jols...@confluent.io>, Jun Rao <jun...@gmail.com> --- .../main/scala/kafka/server/ReplicaManager.scala | 8 +-- .../unit/kafka/server/ReplicaManagerTest.scala | 57 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6aa08d048da..ac002c6774b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1534,14 +1534,14 @@ class ReplicaManager(val config: KafkaConfig, leaderTopicSet.diff(newFollowerTopics).foreach(brokerTopicStats.removeOldFollowerMetrics) } - protected def maybeAddLogDirFetchers(partitions: Set[Partition], + protected[server] def maybeAddLogDirFetchers(partitions: Set[Partition], offsetCheckpoints: OffsetCheckpoints, topicIds: String => Option[Uuid]): Unit = { val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState] for (partition <- partitions) { val topicPartition = partition.topicPartition - if (logManager.getLog(topicPartition, isFuture = true).isDefined) { - partition.log.foreach { log => + logManager.getLog(topicPartition, isFuture = true).foreach { futureLog => + partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) // Add future replica log to partition's map @@ -1556,7 +1556,7 @@ class ReplicaManager(val config: KafkaConfig, logManager.abortAndPauseCleaning(topicPartition) futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, - partition.getLeaderEpoch, log.highWatermark)) + partition.getLeaderEpoch, futureLog.highWatermark)) } } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9832059c7c0..7bb2cd42482 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -205,6 +205,63 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + val partition = rm.createPartition(new TopicPartition(topic, 0)) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) + appendRecords(rm, new TopicPartition(topic, 0), + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes()))) + logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath) + + partition.createLogIfNotExists(isNew = true, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + // remove cache to disable OffsetsForLeaderEpoch API + partition.futureLog.get.leaderEpochCache = None + + // this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error + rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None) + rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset))) + // make sure alter log dir thread has processed the data + rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.doWork()) + assertEquals(Set.empty, rm.replicaAlterLogDirsManager.failedPartitions.partitions()) + // the future log becomes the current log, so the partition state should get removed + rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => assertEquals(None, t.fetchState(new TopicPartition(topic, 0)))) + } + @Test def testClearPurgatoryOnBecomingFollower(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)