satishd commented on code in PR #13487: URL: https://github.com/apache/kafka/pull/13487#discussion_r1158198885
########## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ########## @@ -143,24 +216,44 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, topicIds: util.Map[String, Uuid]): Unit = { debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower") - // Partitions logs are available when this callback is invoked. - // Compact topics and internal topics are filtered here as they are not supported with tiered storage. - def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = { + def filterPartitions(partitions: Set[Partition]): Set[Partition] = { // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that. partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled())) - .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition)) } - val followerTopicPartitions = filterPartitions(partitionsBecomeFollower) - val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader) - debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " + - s"and followers: $followerTopicPartitions") + val leaderPartitionsWithLeaderEpoch = filterPartitions(partitionsBecomeLeader) + .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition) -> p.getLeaderEpoch).toMap + val leaderPartitions = leaderPartitionsWithLeaderEpoch.keySet - if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) { - leaderTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId())) - followerTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId())) + val followerPartitions = filterPartitions(partitionsBecomeFollower) + .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition)) + + def cacheTopicPartitionIds(topicIdPartition: TopicIdPartition): Unit = { + val previousTopicId = topicPartitionIds.put(topicIdPartition.topicPartition(), topicIdPartition.topicId()) + if (previousTopicId != null && previousTopicId != topicIdPartition.topicId()) { + warn(s"Previous cached topic id $previousTopicId for ${topicIdPartition.topicPartition()} does " + Review Comment: Changed it to info level, which may be useful for debugging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org