divijvaidya commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1252976430


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {

Review Comment:
   as an optimization, we also want to remove index entries for this partition 
in the RemoteIndexCache



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -615,6 +661,9 @@ public void run() {
             try {
                 Optional<UnifiedLog> unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
 
+                // CleanUp/delete deleted remote log segments
+                cleanupDeletedRemoteLogSegments();

Review Comment:
   we do not want to block the existing archival to clean up the deleted 
segments. Clean up for deletion should be done asynchronously with 
RemotePartitionRemover as specific by KIP-405.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
     /**
      * Deletes the internal topic partition info if delete flag is set as true.
      *
-     * @param topicPartition topic partition to be stopped.
+     * @param topicPartitions topic partitions that needs to be stopped.
      * @param delete         flag to indicate whether the given topic 
partitions to be deleted or not.
      */
-    public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+    public void stopPartitions(Set<TopicPartition> topicPartitions,
+                               boolean delete,
+                               BiConsumer<TopicPartition, Throwable> 
errorHandler) {
+        LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+        Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream()
+                .filter(topicIdByPartitionMap::containsKey)
+                .map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+                .collect(Collectors.toSet());
+
+        topicIdPartitions.forEach(tpId -> {
+            try {
+                RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+                if (task != null) {
+                    LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+                    task.cancel();
+                }
+                if (delete) {
+                    LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+                    deleteRemoteLogPartition(tpId);

Review Comment:
   Downsides of current implementation: 
   `replicaManager.stopPartitions` has a dependency on RSM availability. If the 
remote storage is unavailable, stopPartitions will block. 
   
   This downside can be removed instead of deleting the segments synchronously 
in the stopPartitions path, we leave it upto the RLMM plugin to perform the 
cleanup when RLMM.onStopPartitions() is called.
   
   The proposed alternative has the following benefits:
   1. delete could be implemented async by the RLMM plugin since RLMM plugin 
can choose to delete data from remote storage asynchronously when 
stopPartitions is called.
   2. Availability of remote storage does not impact the critical path i.e. if 
remote store is intermittently unavailable, stopPartitions can still go through.
   



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
     /**
      * Deletes the internal topic partition info if delete flag is set as true.
      *
-     * @param topicPartition topic partition to be stopped.
+     * @param topicPartitions topic partitions that needs to be stopped.
      * @param delete         flag to indicate whether the given topic 
partitions to be deleted or not.
      */
-    public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+    public void stopPartitions(Set<TopicPartition> topicPartitions,

Review Comment:
   please consider invalidating the segments belonging to this partition from 
RemoteIndexCache when we are stopping partitions. 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -471,6 +472,9 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.Online(partition) =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
+
+              if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete && 
remoteLogManager.isDefined)

Review Comment:
   Everywhere that you are using remoteLogManager.isDefined, please add a check 
for `log.remoteLogEnabled()`. This is because while remote storage feature may 
be ON in the cluster and hence, remoteLogManager is present but it may not be 
turned ON for that particular topic/partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to