satishd commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1158198885


##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -143,24 +216,44 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
                          topicIds: util.Map[String, Uuid]): Unit = {
     debug(s"Received leadership changes for leaders: $partitionsBecomeLeader 
and followers: $partitionsBecomeFollower")
 
-    // Partitions logs are available when this callback is invoked.
-    // Compact topics and internal topics are filtered here as they are not 
supported with tiered storage.
-    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+    def filterPartitions(partitions: Set[Partition]): Set[Partition] = {
       // We are not specifically checking for internal topics etc here as 
`log.remoteLogEnabled()` already handles that.
       partitions.filter(partition => partition.log.exists(log => 
log.remoteLogEnabled()))
-        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), 
partition.topicPartition))
     }
 
-    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
-    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
-    debug(s"Effective topic partitions after filtering compact and internal 
topics, leaders: $leaderTopicPartitions " +
-      s"and followers: $followerTopicPartitions")
+    val leaderPartitionsWithLeaderEpoch = 
filterPartitions(partitionsBecomeLeader)
+      .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition) 
-> p.getLeaderEpoch).toMap
+    val leaderPartitions = leaderPartitionsWithLeaderEpoch.keySet
 
-    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
-      leaderTopicPartitions.foreach(x => 
topicPartitionIds.put(x.topicPartition(), x.topicId()))
-      followerTopicPartitions.foreach(x => 
topicPartitionIds.put(x.topicPartition(), x.topicId()))
+    val followerPartitions = filterPartitions(partitionsBecomeFollower)
+      .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition))
+
+    def cacheTopicPartitionIds(topicIdPartition: TopicIdPartition): Unit = {
+      val previousTopicId = 
topicPartitionIds.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId())
+      if (previousTopicId != null && previousTopicId != 
topicIdPartition.topicId()) {
+        warn(s"Previous cached topic id $previousTopicId for 
${topicIdPartition.topicPartition()} does " +

Review Comment:
   Changed it to info level, which may be useful for debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to