divijvaidya commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1252999633
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -471,6 +472,9 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.Online(partition) => val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = partitionState.leaderEpoch + + if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete && remoteLogManager.isDefined) Review Comment: Everywhere that you are using remoteLogManager.isDefined, please add a check for `log.remoteLogEnabled()`. This is because while remote storage feature may be ON in the cluster and hence, remoteLogManager is present but it may not be turned ON for that particular topic/partition. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { Review Comment: as an optimization, we also want to remove index entries for this partition in the RemoteIndexCache -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org