This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24ed31739e2 KAFKA-16853: Split RemoteLogManagerScheduledThreadPool 
(#16502)
24ed31739e2 is described below

commit 24ed31739e2efc337f53040366c38dedae543308
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Jul 17 16:43:23 2024 +0530

    KAFKA-16853: Split RemoteLogManagerScheduledThreadPool (#16502)
    
    As part of KIP-950, we want to split the 
RemoteLogManagerScheduledThreadPool into separate thread pools (one for copy 
and another for expiration). In this change, we are splitting it into three 
thread pools (one for copy, one for expiration, and another one for follower). 
We are reusing the same thread pool configuration for all three thread pools. 
We can introduce new user-facing configurations later.
    
    Reviewers: Kamal Chandraprakash<[email protected]>, Luke Chen 
<[email protected]>, Christo Lolov <[email protected]>, Satish Duggana 
<[email protected]>
---
 checkstyle/suppressions.xml                        |   1 +
 .../java/kafka/log/remote/RemoteLogManager.java    | 346 +++++++++++++--------
 .../kafka/log/remote/RemoteLogManagerTest.java     | 289 +++++++++++------
 3 files changed, 415 insertions(+), 221 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a36d66bd6fd..231b18c97a4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,6 +40,7 @@
               
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" 
files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition).java"/>
     <suppress 
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" 
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+    <suppress checks="MethodLength" files="RemoteLogManager.java"/>
     <suppress checks="ClassFanOutComplexity" 
files="RemoteLogManagerTest.java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 8beb6971aeb..11c34a0f535 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -179,11 +179,16 @@ public class RemoteLogManager implements Closeable {
 
     private final RemoteIndexCache indexCache;
     private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
-    private final RLMScheduledThreadPool rlmScheduledThreadPool;
+    private final RLMScheduledThreadPool rlmCopyThreadPool;
+    private final RLMScheduledThreadPool rlmExpirationThreadPool;
+    private final RLMScheduledThreadPool followerThreadPool;
 
     private final long delayInMs;
 
-    private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> 
leaderOrFollowerTasks = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> 
leaderCopyRLMTasks = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> 
leaderExpirationRLMTasks = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> 
followerRLMTasks = new ConcurrentHashMap<>();
+    private final Set<RemoteLogSegmentId> segmentIdsBeingCopied = 
ConcurrentHashMap.newKeySet();
 
     // topic ids that are received on leadership changes, this map is cleared 
on stop partitions
     private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = 
new ConcurrentHashMap<>();
@@ -241,12 +246,17 @@ public class RemoteLogManager implements Closeable {
 
         indexCache = new 
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
-        rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+        rlmCopyThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
+            "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+        rlmExpirationThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
+            "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
+        followerThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+            "RLMFollowerScheduledThreadPool", 
"kafka-rlm-follower-thread-pool-");
 
         
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new 
Gauge<Double>() {
             @Override
             public Double value() {
-                return rlmScheduledThreadPool.getIdlePercent();
+                return rlmCopyThreadPool.getIdlePercent();
             }
         });
         remoteReadTimer = 
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
@@ -433,11 +443,8 @@ public class RemoteLogManager implements Closeable {
             throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
         }
 
-        Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = 
filterPartitions(partitionsBecomeLeader)
-                .collect(Collectors.toMap(
-                        partition -> new 
TopicIdPartition(topicIds.get(partition.topic()), partition.topicPartition()),
-                        Partition::getLeaderEpoch));
-        Set<TopicIdPartition> leaderPartitions = 
leaderPartitionsWithLeaderEpoch.keySet();
+        Set<TopicIdPartition> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
+                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
 
         Set<TopicIdPartition> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
                 .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
@@ -450,16 +457,13 @@ public class RemoteLogManager implements Closeable {
             followerPartitions.forEach(this::cacheTopicPartitionIds);
 
             
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);
-            followerPartitions.forEach(topicIdPartition ->
-                    doHandleLeaderOrFollowerPartitions(topicIdPartition, 
RLMTask::convertToFollower));
+            followerPartitions.forEach(this::doHandleFollowerPartition);
 
             // If this node was the previous leader for the partition, then 
the RLMTask might be running in the
             // background thread and might emit metrics. So, removing the 
metrics after marking this node as follower.
             
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
 
-            leaderPartitionsWithLeaderEpoch.forEach((topicIdPartition, 
leaderEpoch) ->
-                    doHandleLeaderOrFollowerPartitions(topicIdPartition,
-                            rlmTask -> rlmTask.convertToLeader(leaderEpoch)));
+            leaderPartitions.forEach(this::doHandleLeaderPartition);
         }
     }
 
@@ -479,11 +483,21 @@ public class RemoteLogManager implements Closeable {
             try {
                 if (topicIdByPartitionMap.containsKey(tp)) {
                     TopicIdPartition tpId = new 
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
-                    RLMTaskWithFuture task = 
leaderOrFollowerTasks.remove(tpId);
-                    if (task != null) {
-                        LOGGER.info("Cancelling the RLM task for tpId: {}", 
tpId);
+                    leaderCopyRLMTasks.computeIfPresent(tpId, 
(topicIdPartition, task) -> {
+                        LOGGER.info("Cancelling the copy RLM task for tpId: 
{}", tpId);
                         task.cancel();
-                    }
+                        return null;
+                    });
+                    leaderExpirationRLMTasks.computeIfPresent(tpId, 
(topicIdPartition, task) -> {
+                        LOGGER.info("Cancelling the expiration RLM task for 
tpId: {}", tpId);
+                        task.cancel();
+                        return null;
+                    });
+                    followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
+                        LOGGER.info("Cancelling the follower RLM task for 
tpId: {}", tpId);
+                        task.cancel();
+                        return null;
+                    });
 
                     removeRemoteTopicPartitionMetrics(tpId);
 
@@ -686,62 +700,95 @@ public class RemoteLogManager implements Closeable {
     }
 
     // VisibleForTesting
-    RLMTask rlmTask(TopicIdPartition topicIdPartition) {
-        RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition);
+    RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
+        RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition);
         if (task != null) {
             return task.rlmTask;
         }
         return null;
     }
 
-    class RLMTask extends CancellableRunnable {
+    abstract class RLMTask extends CancellableRunnable {
 
-        private final TopicIdPartition topicIdPartition;
-        private final int customMetadataSizeLimit;
+        protected final TopicIdPartition topicIdPartition;
         private final Logger logger;
 
-        private volatile int leaderEpoch = -1;
-
-        public RLMTask(TopicIdPartition topicIdPartition, int 
customMetadataSizeLimit) {
+        public RLMTask(TopicIdPartition topicIdPartition) {
             this.topicIdPartition = topicIdPartition;
-            this.customMetadataSizeLimit = customMetadataSizeLimit;
-            LogContext logContext = new LogContext("[RemoteLogManager=" + 
brokerId + " partition=" + topicIdPartition + "] ");
-            logger = logContext.logger(RLMTask.class);
+            this.logger = getLogContext().logger(RLMTask.class);
         }
 
-        boolean isLeader() {
-            return leaderEpoch >= 0;
+        protected LogContext getLogContext() {
+            return new LogContext("[RemoteLogManager=" + brokerId + " 
partition=" + topicIdPartition + "] ");
         }
 
-        // The copied and log-start offset is empty initially for a new leader 
RLMTask, and needs to be fetched inside
+        public void run() {
+            if (isCancelled())
+                return;
+
+            try {
+                Optional<UnifiedLog> unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+
+                if (!unifiedLogOptional.isPresent()) {
+                    return;
+                }
+
+                execute(unifiedLogOptional.get());
+            } catch (InterruptedException ex) {
+                if (!isCancelled()) {
+                    logger.warn("Current thread for topic-partition-id {} is 
interrupted", topicIdPartition, ex);
+                }
+            } catch (RetriableException ex) {
+                logger.debug("Encountered a retryable error while executing 
current task for topic-partition {}", topicIdPartition, ex);
+            } catch (Exception ex) {
+                if (!isCancelled()) {
+                    logger.warn("Current task for topic-partition {} received 
error but it will be scheduled", topicIdPartition, ex);
+                }
+            }
+        }
+
+        protected abstract void execute(UnifiedLog log) throws 
InterruptedException, RemoteStorageException, ExecutionException;
+
+        public String toString() {
+            return this.getClass() + "[" + topicIdPartition + "]";
+        }
+    }
+
+    class RLMCopyTask extends RLMTask {
+        private final int customMetadataSizeLimit;
+        private final Logger logger;
+
+        // The copied and log-start offset is empty initially for a new 
RLMCopyTask, and needs to be fetched inside
         // the task's run() method.
         private volatile Optional<OffsetAndEpoch> copiedOffsetOption = 
Optional.empty();
-        private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = 
false;
+        private volatile boolean isLogStartOffsetUpdated = false;
         private volatile Optional<String> logDirectory = Optional.empty();
 
-        public void convertToLeader(int leaderEpochVal) {
-            if (leaderEpochVal < 0) {
-                throw new KafkaException("leaderEpoch value for topic 
partition " + topicIdPartition + " can not be negative");
-            }
-            if (this.leaderEpoch != leaderEpochVal) {
-                leaderEpoch = leaderEpochVal;
-            }
-            // Reset copied and log-start offset, so that it is set in next 
run of RLMTask
-            copiedOffsetOption = Optional.empty();
-            isLogStartOffsetUpdatedOnBecomingLeader = false;
+        public RLMCopyTask(TopicIdPartition topicIdPartition, int 
customMetadataSizeLimit) {
+            super(topicIdPartition);
+            this.customMetadataSizeLimit = customMetadataSizeLimit;
+            this.logger = getLogContext().logger(RLMCopyTask.class);
         }
 
-        public void convertToFollower() {
-            leaderEpoch = -1;
+        @Override
+        protected void execute(UnifiedLog log) throws InterruptedException {
+            // In the first run after completing altering logDir within 
broker, we should make sure the state is reset. (KAFKA-16711)
+            if (!log.parentDir().equals(logDirectory.orElse(null))) {
+                copiedOffsetOption = Optional.empty();
+                isLogStartOffsetUpdated = false;
+                logDirectory = Optional.of(log.parentDir());
+            }
+
+            copyLogSegmentsToRemote(log);
         }
 
         private void maybeUpdateLogStartOffsetOnBecomingLeader(UnifiedLog log) 
throws RemoteStorageException {
-            if (!isLogStartOffsetUpdatedOnBecomingLeader) {
+            if (!isLogStartOffsetUpdated) {
                 long logStartOffset = findLogStartOffset(topicIdPartition, 
log);
                 
updateRemoteLogStartOffset.accept(topicIdPartition.topicPartition(), 
logStartOffset);
-                isLogStartOffsetUpdatedOnBecomingLeader = true;
-                logger.info("Found the logStartOffset: {} for partition: {} 
after becoming leader, leaderEpoch: {}",
-                        logStartOffset, topicIdPartition, leaderEpoch);
+                isLogStartOffsetUpdated = true;
+                logger.info("Found the logStartOffset: {} for partition: {} 
after becoming leader",
+                        logStartOffset, topicIdPartition);
             }
         }
 
@@ -752,8 +799,7 @@ public class RemoteLogManager implements Closeable {
                 // previous leader epoch till it finds an entry, If there are 
no entries till the earliest leader epoch in leader
                 // epoch cache then it starts copying the segments from the 
earliest epoch entry's offset.
                 copiedOffsetOption = 
Optional.of(findHighestRemoteOffset(topicIdPartition, log));
-                logger.info("Found the highest copiedRemoteOffset: {} for 
partition: {} after becoming leader, " +
-                                "leaderEpoch: {}", copiedOffsetOption, 
topicIdPartition, leaderEpoch);
+                logger.info("Found the highest copiedRemoteOffset: {} for 
partition: {} after becoming leader", copiedOffsetOption, topicIdPartition);
                 copiedOffsetOption.ifPresent(offsetAndEpoch ->  
log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset()));
             }
         }
@@ -810,9 +856,9 @@ public class RemoteLogManager implements Closeable {
                                 topicIdPartition, copiedOffset, 
log.activeSegment().baseOffset());
                     } else {
                         for (EnrichedLogSegment candidateLogSegment : 
candidateLogSegments) {
-                            if (isCancelled() || !isLeader()) {
-                                logger.info("Skipping copying log segments as 
the current task state is changed, cancelled: {} leader:{}",
-                                        isCancelled(), isLeader());
+                            if (isCancelled()) {
+                                logger.info("Skipping copying log segments as 
the current task state is changed, cancelled: {}",
+                                        isCancelled());
                                 return;
                             }
 
@@ -835,7 +881,14 @@ public class RemoteLogManager implements Closeable {
                             } finally {
                                 copyQuotaManagerLock.unlock();
                             }
-                            copyLogSegment(log, 
candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
+
+                            RemoteLogSegmentId segmentId = 
RemoteLogSegmentId.generateNew(topicIdPartition);
+                            segmentIdsBeingCopied.add(segmentId);
+                            try {
+                                copyLogSegment(log, 
candidateLogSegment.logSegment, segmentId, 
candidateLogSegment.nextSegmentOffset);
+                            } finally {
+                                segmentIdsBeingCopied.remove(segmentId);
+                            }
                         }
                     }
                 } else {
@@ -857,14 +910,13 @@ public class RemoteLogManager implements Closeable {
             }
         }
 
-        private void copyLogSegment(UnifiedLog log, LogSegment segment, long 
nextSegmentBaseOffset)
+        private void copyLogSegment(UnifiedLog log, LogSegment segment, 
RemoteLogSegmentId segmentId, long nextSegmentBaseOffset)
                 throws InterruptedException, ExecutionException, 
RemoteStorageException, IOException,
                 CustomMetadataSizeLimitExceededException {
             File logFile = segment.log().file();
             String logFileName = logFile.getName();
 
             logger.info("Copying {} to remote storage.", logFileName);
-            RemoteLogSegmentId id = 
RemoteLogSegmentId.generateNew(topicIdPartition);
 
             long endOffset = nextSegmentBaseOffset - 1;
             int tieredEpoch = 0;
@@ -874,7 +926,7 @@ public class RemoteLogManager implements Closeable {
             Map<Integer, Long> segmentLeaderEpochs = new 
HashMap<>(epochEntries.size());
             epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, 
entry.startOffset));
 
-            RemoteLogSegmentMetadata copySegmentStartedRlsm = new 
RemoteLogSegmentMetadata(id, segment.baseOffset(), endOffset,
+            RemoteLogSegmentMetadata copySegmentStartedRlsm = new 
RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
                     segment.largestTimestamp(), brokerId, time.milliseconds(), 
segment.log().sizeInBytes(),
                     segmentLeaderEpochs, tieredEpoch);
 
@@ -888,7 +940,7 @@ public class RemoteLogManager implements Closeable {
             brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
             Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
-            RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
+            RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
                     customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
 
             if (customMetadata.isPresent()) {
@@ -932,7 +984,7 @@ public class RemoteLogManager implements Closeable {
 
         // VisibleForTesting
         void recordLagStats(long bytesLag, long segmentsLag) {
-            if (isLeader()) {
+            if (!isCancelled()) {
                 String topic = topicIdPartition.topic();
                 int partition = topicIdPartition.partition();
                 brokerTopicStats.recordRemoteCopyLagBytes(topic, partition, 
bytesLag);
@@ -943,55 +995,25 @@ public class RemoteLogManager implements Closeable {
         private Path toPathIfExists(File file) {
             return file.exists() ? file.toPath() : null;
         }
+    }
 
-        public void run() {
-            if (isCancelled())
-                return;
-
-            try {
-                Optional<UnifiedLog> unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
-
-                if (!unifiedLogOptional.isPresent()) {
-                    return;
-                }
+    class RLMExpirationTask extends RLMTask {
+        private final Logger logger;
 
-                UnifiedLog log = unifiedLogOptional.get();
-                // In the first run after completing altering logDir within 
broker, we should make sure the state is reset. (KAFKA-16711)
-                if (!log.parentDir().equals(logDirectory.orElse(null))) {
-                    copiedOffsetOption = Optional.empty();
-                    isLogStartOffsetUpdatedOnBecomingLeader = false;
-                    logDirectory = Optional.of(log.parentDir());
-                }
+        public RLMExpirationTask(TopicIdPartition topicIdPartition) {
+            super(topicIdPartition);
+            this.logger = getLogContext().logger(RLMExpirationTask.class);
+        }
 
-                if (isLeader()) {
-                    // Copy log segments to remote storage
-                    copyLogSegmentsToRemote(log);
-                    // Cleanup/delete expired remote log segments
-                    cleanupExpiredRemoteLogSegments();
-                } else {
-                    OffsetAndEpoch offsetAndEpoch = 
findHighestRemoteOffset(topicIdPartition, log);
-                    // Update the highest offset in remote storage for this 
partition's log so that the local log segments
-                    // are not deleted before they are copied to remote 
storage.
-                    
log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset());
-                }
-            } catch (InterruptedException ex) {
-                if (!isCancelled()) {
-                    logger.warn("Current thread for topic-partition-id {} is 
interrupted", topicIdPartition, ex);
-                }
-            } catch (RetriableException ex) {
-                logger.debug("Encountered a retryable error while executing 
current task for topic-partition {}", topicIdPartition, ex);
-            } catch (Exception ex) {
-                if (!isCancelled()) {
-                    logger.warn("Current task for topic-partition {} received 
error but it will be scheduled", topicIdPartition, ex);
-                }
-            }
+        @Override
+        protected void execute(UnifiedLog log) throws InterruptedException, 
RemoteStorageException, ExecutionException {
+            // Cleanup/delete expired remote log segments
+            cleanupExpiredRemoteLogSegments();
         }
 
         public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
-            if (isLeader()) {
-                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
-                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
-            }
+            logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
         }
 
         class RemoteLogRetentionHandler {
@@ -1137,7 +1159,7 @@ public class RemoteLogManager implements Closeable {
         }
 
         void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, 
ExecutionException, InterruptedException {
-            if (isCancelled() || !isLeader()) {
+            if (isCancelled()) {
                 logger.info("Returning from remote log segments cleanup as the 
task state is changed");
                 return;
             }
@@ -1200,12 +1222,18 @@ public class RemoteLogManager implements Closeable {
                 Integer epoch = epochIterator.next();
                 Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
                 while (canProcess && segmentsIterator.hasNext()) {
-                    if (isCancelled() || !isLeader()) {
+                    if (isCancelled()) {
                         logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
                         return;
                     }
                     RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
 
+                    if 
(segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
+                        logger.debug("Copy for the segment {} is currently in 
process. Skipping cleanup for it and the remaining segments",
+                                metadata.remoteLogSegmentId());
+                        canProcess = false;
+                        continue;
+                    }
                     if 
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
                         continue;
                     }
@@ -1252,7 +1280,7 @@ public class RemoteLogManager implements Closeable {
             updateRemoteDeleteLagWith(segmentsLeftToDelete, 
sizeOfDeletableSegmentsBytes);
             List<String> undeletedSegments = new ArrayList<>();
             for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
-                if 
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> 
!isCancelled() && isLeader())) {
+                if 
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> 
!isCancelled())) {
                     
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
                 } else {
                     sizeOfDeletableSegmentsBytes -= 
segmentMetadata.segmentSizeInBytes();
@@ -1281,7 +1309,7 @@ public class RemoteLogManager implements Closeable {
                     int epoch = epochsToClean.next();
                     Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
                     while (segmentsToBeCleaned.hasNext()) {
-                        if (!isCancelled() && isLeader()) {
+                        if (!isCancelled()) {
                             RemoteLogSegmentMetadata nextSegmentMetadata = 
segmentsToBeCleaned.next();
                             sizeOfDeletableSegmentsBytes += 
nextSegmentMetadata.segmentSizeInBytes();
                             listOfSegmentsToBeCleaned.add(nextSegmentMetadata);
@@ -1292,7 +1320,7 @@ public class RemoteLogManager implements Closeable {
                 segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
                 updateRemoteDeleteLagWith(segmentsLeftToDelete, 
sizeOfDeletableSegmentsBytes);
                 for (RemoteLogSegmentMetadata segmentMetadata : 
listOfSegmentsToBeCleaned) {
-                    if (!isCancelled() && isLeader()) {
+                    if (!isCancelled()) {
                         // No need to update the log-start-offset even though 
the segment is deleted as these epochs/offsets are earlier to that value.
                         if 
(remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentMetadata)) {
                             sizeOfDeletableSegmentsBytes -= 
segmentMetadata.segmentSizeInBytes();
@@ -1348,9 +1376,20 @@ public class RemoteLogManager implements Closeable {
 
             return Optional.empty();
         }
+    }
 
-        public String toString() {
-            return this.getClass() + "[" + topicIdPartition + "]";
+    class RLMFollowerTask extends RLMTask {
+
+        public RLMFollowerTask(TopicIdPartition topicIdPartition) {
+            super(topicIdPartition);
+        }
+
+        @Override
+        protected void execute(UnifiedLog log) throws InterruptedException, 
RemoteStorageException, ExecutionException {
+            OffsetAndEpoch offsetAndEpoch = 
findHighestRemoteOffset(topicIdPartition, log);
+            // Update the highest offset in remote storage for this 
partition's log so that the local log segments
+            // are not deleted before they are copied to remote storage.
+            log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset());
         }
     }
 
@@ -1761,19 +1800,48 @@ public class RemoteLogManager implements Closeable {
                 new RemoteLogReader(fetchInfo, this, callback, 
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
     }
 
-    void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
-                                            Consumer<RLMTask> 
convertToLeaderOrFollower) {
-        RLMTaskWithFuture rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
-                topicIdPartition -> {
-                    RLMTask task = new RLMTask(topicIdPartition, 
rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
-                    // set this upfront when it is getting initialized instead 
of doing it after scheduling.
-                    convertToLeaderOrFollower.accept(task);
-                    LOGGER.info("Created a new task: {} and getting 
scheduled", task);
-                    ScheduledFuture<?> future = 
rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
-                    return new RLMTaskWithFuture(task, future);
-                }
-        );
-        convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask);
+    void doHandleLeaderPartition(TopicIdPartition topicPartition) {
+        RLMTaskWithFuture followerRLMTaskWithFuture = 
followerRLMTasks.remove(topicPartition);
+        if (followerRLMTaskWithFuture != null) {
+            LOGGER.info("Cancelling the follower task: {}", 
followerRLMTaskWithFuture.rlmTask);
+            followerRLMTaskWithFuture.cancel();
+        }
+
+        leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> 
{
+            RLMCopyTask task = new RLMCopyTask(topicIdPartition, 
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
+            // set this upfront when it is getting initialized instead of 
doing it after scheduling.
+            LOGGER.info("Created a new copy task: {} and getting scheduled", 
task);
+            ScheduledFuture<?> future = 
rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
+            return new RLMTaskWithFuture(task, future);
+        });
+
+        leaderExpirationRLMTasks.computeIfAbsent(topicPartition, 
topicIdPartition -> {
+            RLMExpirationTask task = new RLMExpirationTask(topicIdPartition);
+            LOGGER.info("Created a new expiration task: {} and getting 
scheduled", task);
+            ScheduledFuture<?> future = 
rlmExpirationThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
+            return new RLMTaskWithFuture(task, future);
+        });
+    }
+
+    void doHandleFollowerPartition(TopicIdPartition topicPartition) {
+        RLMTaskWithFuture copyRLMTaskWithFuture = 
leaderCopyRLMTasks.remove(topicPartition);
+        if (copyRLMTaskWithFuture != null) {
+            LOGGER.info("Cancelling the copy task: {}", 
copyRLMTaskWithFuture.rlmTask);
+            copyRLMTaskWithFuture.cancel();
+        }
+
+        RLMTaskWithFuture expirationRLMTaskWithFuture = 
leaderExpirationRLMTasks.remove(topicPartition);
+        if (expirationRLMTaskWithFuture != null) {
+            LOGGER.info("Cancelling the expiration task: {}", 
expirationRLMTaskWithFuture.rlmTask);
+            expirationRLMTaskWithFuture.cancel();
+        }
+
+        followerRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
+            RLMFollowerTask task = new RLMFollowerTask(topicIdPartition);
+            LOGGER.info("Created a new follower task: {} and getting 
scheduled", task);
+            ScheduledFuture<?> future = 
followerThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
+            return new RLMTaskWithFuture(task, future);
+        });
     }
 
     static class RLMTaskWithFuture {
@@ -1803,19 +1871,25 @@ public class RemoteLogManager implements Closeable {
     public void close() {
         synchronized (this) {
             if (!closed) {
-                
leaderOrFollowerTasks.values().forEach(RLMTaskWithFuture::cancel);
+                leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
+                
leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
+                followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
                 Utils.closeQuietly(remoteLogStorageManager, 
"RemoteLogStorageManager");
                 Utils.closeQuietly(remoteLogMetadataManager, 
"RemoteLogMetadataManager");
                 Utils.closeQuietly(indexCache, "RemoteIndexCache");
 
-                rlmScheduledThreadPool.close();
+                rlmCopyThreadPool.close();
+                rlmExpirationThreadPool.close();
+                followerThreadPool.close();
                 try {
                     shutdownAndAwaitTermination(remoteStorageReaderThreadPool, 
"RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
                 } finally {
                     removeMetrics();
                 }
 
-                leaderOrFollowerTasks.clear();
+                leaderCopyRLMTasks.clear();
+                leaderExpirationRLMTasks.clear();
+                followerRLMTasks.clear();
                 closed = true;
             }
         }
@@ -1883,18 +1957,28 @@ public class RemoteLogManager implements Closeable {
     }
 
     //Visible for testing
-    RLMTaskWithFuture task(TopicIdPartition partition) {
-        return leaderOrFollowerTasks.get(partition);
+    RLMTaskWithFuture leaderCopyTask(TopicIdPartition partition) {
+        return leaderCopyRLMTasks.get(partition);
+    }
+    RLMTaskWithFuture leaderExpirationTask(TopicIdPartition partition) {
+        return leaderExpirationRLMTasks.get(partition);
+    }
+    RLMTaskWithFuture followerTask(TopicIdPartition partition) {
+        return followerRLMTasks.get(partition);
     }
 
     static class RLMScheduledThreadPool {
 
         private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
         private final int poolSize;
+        private final String threadPoolName;
+        private final String threadNamePrefix;
         private final ScheduledThreadPoolExecutor scheduledThreadPool;
 
-        public RLMScheduledThreadPool(int poolSize) {
+        public RLMScheduledThreadPool(int poolSize, String threadPoolName, 
String threadNamePrefix) {
             this.poolSize = poolSize;
+            this.threadPoolName = threadPoolName;
+            this.threadNamePrefix = threadNamePrefix;
             scheduledThreadPool = createPool();
         }
 
@@ -1907,7 +1991,7 @@ public class RemoteLogManager implements Closeable {
                 private final AtomicInteger sequence = new AtomicInteger();
 
                 public Thread newThread(Runnable r) {
-                    return KafkaThread.daemon("kafka-rlm-thread-pool-" + 
sequence.incrementAndGet(), r);
+                    return KafkaThread.daemon(threadNamePrefix + 
sequence.incrementAndGet(), r);
                 }
             });
 
@@ -1924,7 +2008,7 @@ public class RemoteLogManager implements Closeable {
         }
 
         public void close() {
-            shutdownAndAwaitTermination(scheduledThreadPool, 
"RLMScheduledThreadPool", 10, TimeUnit.SECONDS);
+            shutdownAndAwaitTermination(scheduledThreadPool, threadPoolName, 
10, TimeUnit.SECONDS);
         }
     }
 
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index c9c37de929b..4e5f70f9337 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -550,8 +550,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(2);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
         task.copyLogSegmentsToRemote(mockLog);
 
         // verify remoteLogMetadataManager did add the expected 
RemoteLogSegmentMetadata
@@ -658,8 +657,7 @@ public class RemoteLogManagerTest {
                 .thenReturn(Optional.of(customMetadata));
         
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
-        task.convertToLeader(2);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, customMetadataSizeLimit);
         task.copyLogSegmentsToRemote(mockLog);
 
         ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = 
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
@@ -1032,7 +1030,9 @@ public class RemoteLogManagerTest {
 
         Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
 
-        when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 100L);
+        // This method is called by both Copy and Expiration task. On the 
first call, both tasks should see 175 bytes as
+        // the local log segments size
+        when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 175L, 100L);
         when(activeSegment.size()).thenReturn(100);
         
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L).thenReturn(1L);
 
@@ -1154,8 +1154,7 @@ public class RemoteLogManagerTest {
         // Verify aggregate metrics
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(2);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
         task.copyLogSegmentsToRemote(mockLog);
 
         // Verify we attempted to copy log segment metadata to remote storage
@@ -1171,40 +1170,6 @@ public class RemoteLogManagerTest {
         assertEquals(1, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
     }
 
-    @Test
-    void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws 
Exception {
-        long oldSegmentStartOffset = 0L;
-        long nextSegmentStartOffset = 150L;
-
-        // leader epoch preparation
-        checkpoint.write(totalEpochEntries);
-        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, 
scheduler);
-        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
-        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
-
-        // create 2 log segments, with 0 and 150 as log start offset
-        LogSegment oldSegment = mock(LogSegment.class);
-        LogSegment activeSegment = mock(LogSegment.class);
-
-        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
-        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
-
-        when(mockLog.activeSegment()).thenReturn(activeSegment);
-        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
-        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
-        when(mockLog.lastStableOffset()).thenReturn(250L);
-
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToFollower();
-        task.copyLogSegmentsToRemote(mockLog);
-
-        // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
-        verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
-        verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
-        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
-        verify(mockLog).updateHighestOffsetInRemoteStorage(anyLong());
-    }
-
     @Test
     void 
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized() 
throws Exception {
         long oldSegmentStartOffset = 0L;
@@ -1240,8 +1205,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(0);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
         task.run();
 
         // verify the remoteLogMetadataManager never add any metadata and 
remoteStorageManager never copy log segments
@@ -1374,13 +1338,13 @@ public class RemoteLogManagerTest {
         RemoteLogManager spyRemoteLogManager = spy(remoteLogManager);
         spyRemoteLogManager.onLeadershipChange(
             Collections.emptySet(), 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
-        
verify(spyRemoteLogManager).doHandleLeaderOrFollowerPartitions(eq(followerTopicIdPartition),
 any(java.util.function.Consumer.class));
+        
verify(spyRemoteLogManager).doHandleFollowerPartition(eq(followerTopicIdPartition));
 
         Mockito.reset(spyRemoteLogManager);
 
         spyRemoteLogManager.onLeadershipChange(
             Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
-        
verify(spyRemoteLogManager).doHandleLeaderOrFollowerPartitions(eq(leaderTopicIdPartition),
 any(java.util.function.Consumer.class));
+        
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition));
     }
 
     private MemoryRecords records(long timestamp,
@@ -1393,16 +1357,6 @@ public class RemoteLogManagerTest {
             );
     }
 
-    @Test
-    void testRLMTaskShouldSetLeaderEpochCorrectly() {
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        assertFalse(task.isLeader());
-        task.convertToLeader(1);
-        assertTrue(task.isLeader());
-        task.convertToFollower();
-        assertFalse(task.isLeader());
-    }
-
     @Test
     void testFindOffsetByTimestamp() throws IOException, 
RemoteStorageException {
         remoteLogManager.startup();
@@ -1813,7 +1767,7 @@ public class RemoteLogManagerTest {
         when(log.logSegments(5L, Long.MAX_VALUE))
                 
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, 
segment2, activeSegment)));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
         List<RemoteLogManager.EnrichedLogSegment> expected =
                 Arrays.asList(
                         new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -1839,7 +1793,7 @@ public class RemoteLogManagerTest {
         when(log.logSegments(5L, Long.MAX_VALUE))
                 
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, 
segment2, segment3, activeSegment)));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
         List<RemoteLogManager.EnrichedLogSegment> expected =
                 Arrays.asList(
                         new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -1887,12 +1841,14 @@ public class RemoteLogManagerTest {
         partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, false));
         
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
                 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
-        assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
-        assertNotNull(remoteLogManager.task(followerTopicIdPartition));
+        assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+        
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
 
         remoteLogManager.stopPartitions(partitions, errorHandler);
-        assertNull(remoteLogManager.task(leaderTopicIdPartition));
-        assertNull(remoteLogManager.task(followerTopicIdPartition));
+        assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+        
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNull(remoteLogManager.followerTask(followerTopicIdPartition));
         verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
         verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
         verify(remoteLogMetadataManager, 
times(0)).updateRemoteLogSegmentMetadata(any());
@@ -1908,8 +1864,9 @@ public class RemoteLogManagerTest {
         partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, true));
         
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
                 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
-        assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
-        assertNotNull(remoteLogManager.task(followerTopicIdPartition));
+        assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+        
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
 
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
                 
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
@@ -1921,8 +1878,9 @@ public class RemoteLogManagerTest {
                 .thenReturn(dummyFuture);
 
         remoteLogManager.stopPartitions(partitions, errorHandler);
-        assertNull(remoteLogManager.task(leaderTopicIdPartition));
-        assertNull(remoteLogManager.task(followerTopicIdPartition));
+        assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+        
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNull(remoteLogManager.followerTask(followerTopicIdPartition));
         verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
         verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
         verify(remoteLogMetadataManager, 
times(16)).updateRemoteLogSegmentMetadata(any());
@@ -2027,13 +1985,174 @@ public class RemoteLogManagerTest {
                 return remoteLogMetadataManager;
             }
         }) {
-            RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-            task.convertToLeader(4);
+            RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
             task.copyLogSegmentsToRemote(mockLog);
             assertEquals(600L, logStartOffset.get());
         }
     }
 
+    @Test
+    public void testDeletionSkippedForSegmentsBeingCopied() throws 
RemoteStorageException, IOException, InterruptedException, ExecutionException {
+        RemoteLogMetadataManager remoteLogMetadataManager = new 
NoOpRemoteLogMetadataManager() {
+            List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
+
+            @Override
+            public synchronized CompletableFuture<Void> 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+                metadataList.add(remoteLogSegmentMetadata);
+                return CompletableFuture.runAsync(() -> { });
+            }
+
+            @Override
+            public synchronized CompletableFuture<Void> 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
remoteLogSegmentMetadataUpdate) {
+                metadataList = metadataList.stream()
+                        .map(m -> {
+                            if 
(m.remoteLogSegmentId().equals(remoteLogSegmentMetadataUpdate.remoteLogSegmentId()))
 {
+                                return 
m.createWithUpdates(remoteLogSegmentMetadataUpdate);
+                            }
+                            return m;
+                        })
+                        .collect(Collectors.toList());
+                return CompletableFuture.runAsync(() -> { });
+            }
+
+            @Override
+            public Optional<Long> highestOffsetForEpoch(TopicIdPartition 
topicIdPartition, int leaderEpoch) {
+                return Optional.of(-1L);
+            }
+
+            @Override
+            public synchronized Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition) {
+                return metadataList.iterator();
+            }
+
+            @Override
+            public synchronized Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) {
+                return metadataList.iterator();
+            }
+        };
+
+        remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> currentLogStartOffset.set(offset),
+                brokerTopicStats, metrics) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+            public RLMQuotaManager createRLMCopyQuotaManager() {
+                return rlmCopyQuotaManager;
+            }
+            public Duration quotaTimeout() {
+                return Duration.ofMillis(100);
+            }
+            @Override
+            long findLogStartOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) {
+                return 0L;
+            }
+        };
+
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+        long lastStableOffset = 150L;
+        long logEndOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(Collections.singletonList(epochEntry0));
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        File tempFile = TestUtils.tempFile();
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+        File tempDir = TestUtils.tempDirectory();
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CountDownLatch copyLogSegmentLatch = new CountDownLatch(1);
+        CountDownLatch copySegmentDataLatch = new CountDownLatch(1);
+        doAnswer(ans -> {
+            // unblock the expiration thread
+            copySegmentDataLatch.countDown();
+            // introduce a delay in copying segment data
+            copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);
+            return Optional.empty();
+        
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class));
+        
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
+
+        // Set up expiration behaviour
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", 0L);
+        logProps.put("retention.ms", -1L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        RemoteLogManager.RLMCopyTask copyTask = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
+        Thread copyThread  = new Thread(() -> {
+            try {
+                copyTask.copyLogSegmentsToRemote(mockLog);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        RemoteLogManager.RLMExpirationTask expirationTask = 
remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
+        Thread expirationThread = new Thread(() -> {
+            try {
+                // wait until copy thread has started copying segment data
+                copySegmentDataLatch.await();
+                expirationTask.cleanupExpiredRemoteLogSegments();
+            } catch (RemoteStorageException | ExecutionException | 
InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        copyThread.start();
+        expirationThread.start();
+
+        copyThread.join(10_000);
+        expirationThread.join(1_000);
+
+        // Verify no segments were deleted
+        verify(remoteStorageManager, 
times(0)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+
+        // Run expiration task again and verify the copied segment was deleted
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).next();
+        expirationTask.cleanupExpiredRemoteLogSegments();
+        verify(remoteStorageManager, 
times(1)).deleteLogSegmentData(remoteLogSegmentMetadata);
+    }
+
     @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1}")
     @CsvSource(value = {"0, -1", "-1, 0"})
     public void testDeletionOnRetentionBreachedSegments(long retentionSize,
@@ -2070,8 +2189,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(0);
+        RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
         task.cleanupExpiredRemoteLogSegments();
 
         assertEquals(200L, currentLogStartOffset.get());
@@ -2139,8 +2257,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(0);
+        RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
         task.cleanupExpiredRemoteLogSegments();
 
         assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
@@ -2206,7 +2323,7 @@ public class RemoteLogManagerTest {
             return Optional.empty();
         
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
 
         assertEquals(0L, yammerMetricValue("RemoteDeleteLagBytes"));
         assertEquals(0L, yammerMetricValue("RemoteDeleteLagSegments"));
@@ -2214,7 +2331,6 @@ public class RemoteLogManagerTest {
         assertEquals(0L, 
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic));
         assertEquals(0L, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic));
 
-        task.convertToLeader(0);
         task.cleanupExpiredRemoteLogSegments();
 
         assertEquals(200L, currentLogStartOffset.get());
@@ -2224,8 +2340,7 @@ public class RemoteLogManagerTest {
 
     @Test
     public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws 
RemoteStorageException, ExecutionException, InterruptedException {
-        RemoteLogManager.RLMTask leaderTask = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        leaderTask.convertToLeader(0);
+        RemoteLogManager.RLMExpirationTask leaderTask = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
 
         
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
         when(mockLog.logEndOffset()).thenReturn(200L);
@@ -2264,8 +2379,7 @@ public class RemoteLogManagerTest {
         verify(remoteStorageManager, 
never()).deleteLogSegmentData(metadataList.get(1));
 
         // test that the 2nd log segment will be deleted by the new leader
-        RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new 
RLMTask(followerTopicIdPartition, 128);
-        newLeaderTask.convertToLeader(1);
+        RemoteLogManager.RLMExpirationTask newLeaderTask = 
remoteLogManager.new RLMExpirationTask(followerTopicIdPartition);
 
         Iterator<RemoteLogSegmentMetadata> firstIterator = 
metadataList.iterator();
         firstIterator.next();
@@ -2324,8 +2438,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(0);
+        RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
         doThrow(new RemoteStorageException("Failed to delete 
segment")).when(remoteStorageManager).deleteLogSegmentData(any());
         assertThrows(RemoteStorageException.class, 
task::cleanupExpiredRemoteLogSegments);
 
@@ -2432,8 +2545,7 @@ public class RemoteLogManagerTest {
                 });
         
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
                 .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(currentLeaderEpoch);
+        RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
         task.cleanupExpiredRemoteLogSegments();
 
         ArgumentCaptor<RemoteLogSegmentMetadata> deletedMetadataCapture = 
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
@@ -2462,8 +2574,7 @@ public class RemoteLogManagerTest {
                 return remoteLogMetadataManager;
             }
         }) {
-            RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-            task.convertToLeader(0);
+            RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
 
             
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
             when(mockLog.logEndOffset()).thenReturn(200L);
@@ -2862,7 +2973,7 @@ public class RemoteLogManagerTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testCopyQuota(boolean quotaExceeded) throws Exception {
-        RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+        RemoteLogManager.RLMCopyTask task = setupRLMTask(quotaExceeded);
 
         if (quotaExceeded) {
             // Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
@@ -2921,7 +3032,7 @@ public class RemoteLogManagerTest {
     }
 
     // helper method to set up a RemoteLogManager.RLMTask for testing copy 
quota behaviour
-    private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded) 
throws RemoteStorageException, IOException {
+    private RemoteLogManager.RLMCopyTask setupRLMTask(boolean quotaExceeded) 
throws RemoteStorageException, IOException {
         long oldSegmentStartOffset = 0L;
         long nextSegmentStartOffset = 150L;
 
@@ -2981,8 +3092,7 @@ public class RemoteLogManagerTest {
         when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded 
? 1000L : 0L);
         doNothing().when(rlmCopyQuotaManager).record(anyInt());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(2);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
         return task;
     }
 
@@ -3053,8 +3163,7 @@ public class RemoteLogManagerTest {
         when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(0L, 1000L);
         doNothing().when(rlmCopyQuotaManager).record(anyInt());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
-        task.convertToLeader(2);
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
 
         // Verify that the copy operation times out, since the second segment 
cannot be copied due to quota being exceeded
         assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
@@ -3074,7 +3183,7 @@ public class RemoteLogManagerTest {
         remoteLogManager.startup();
         remoteLogManager.onLeadershipChange(
                 Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
-        RemoteLogManager.RLMTask rlmTask = 
remoteLogManager.rlmTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) 
remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
         assertNotNull(rlmTask);
         rlmTask.recordLagStats(1024, 2);
         assertEquals(1024, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());

Reply via email to