divijvaidya commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1252999633


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -471,6 +472,9 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.Online(partition) =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
+
+              if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete && 
remoteLogManager.isDefined)

Review Comment:
   Everywhere that you are using remoteLogManager.isDefined, please add a check 
for `log.remoteLogEnabled()`. This is because while remote storage feature may 
be ON in the cluster and hence, remoteLogManager is present but it may not be 
turned ON for that particular topic/partition.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {

Review Comment:
   as an optimization, we also want to remove index entries for this partition 
in the RemoteIndexCache



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to