divijvaidya commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1293244517


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -698,11 +707,329 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                logger.debug("No leader epoch cache available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final Set<Integer> epochsSet = new HashSet<>();
+            // Good to have an API from RLMM to get all the remote leader 
epochs of all the segments of a partition
+            // instead of going through all the segments and building it here.
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            NavigableMap<Integer, Long> epochWithOffsets = 
leaderEpochCache.epochWithOffsets();
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
+
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
+                    log.onlyLocalLogSegmentsSize(), log.logEndOffset(), 
epochWithOffsets);

Review Comment:
   could we please store the value of log.logEndOffset() at the beginning of 
clean up process and use the stored value for all calculations? Asking because 
endOffset may move behind the scenes while we are processing cleaning. 
   
   The overall idea is that this cleanup should be executing on a snapshot of 
log state.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -698,11 +707,329 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                logger.debug("No leader epoch cache available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final Set<Integer> epochsSet = new HashSet<>();
+            // Good to have an API from RLMM to get all the remote leader 
epochs of all the segments of a partition
+            // instead of going through all the segments and building it here.
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            NavigableMap<Integer, Long> epochWithOffsets = 
leaderEpochCache.epochWithOffsets();
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
+
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
+                    log.onlyLocalLogSegmentsSize(), log.logEndOffset(), 
epochWithOffsets);
+            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochIterator.hasNext()) {
+                Integer epoch = epochIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    if (isCancelled() || !isLeader()) {
+                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
+                        return;
+                    }
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+
+                    // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
+                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
log.logEndOffset(), epochWithOffsets)) {
+                        isSegmentDeleted =
+                                
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
log.logStartOffset());
+                    }
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are 
less than the earliest-epoch known
+            // to the leader. This will remove the unreferenced segments in 
the remote storage. This is needed for
+            // unclean leader election scenarios as the remote storage can 
have epochs earlier to the current leader's
+            // earliest leader epoch.
+            if (earliestEpochEntryOptional.isPresent()) {
+                EpochEntry earliestEpochEntry = 
earliestEpochEntryOptional.get();
+                Iterator<Integer> epochsToClean = 
remoteLeaderEpochs.stream().filter(x -> x < 
earliestEpochEntry.epoch).iterator();
+                while (epochsToClean.hasNext()) {
+                    int epoch = epochsToClean.next();
+                    Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                    while (segmentsToBeCleaned.hasNext()) {
+                        if (isCancelled() || !isLeader()) {
+                            return;
+                        }
+                        // No need to update the log-start-offset even though 
the segment is deleted as these epochs/offsets are earlier to that value.
+                        
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentsToBeCleaned.next());
+                    }
+                }
+            }
+
+            // Update log start offset with the computed value after retention 
cleanup is done
+            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> 
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+        }
+
+        private Optional<RetentionTimeData> buildRetentionTimeData(long 
retentionMs) {
+            return retentionMs > -1
+                    ? Optional.of(new RetentionTimeData(retentionMs, 
time.milliseconds() - retentionMs))
+                    : Optional.empty();
+        }
+
+        private Optional<RetentionSizeData> buildRetentionSizeData(long 
retentionSize,
+                                                                   long 
onlyLocalLogSegmentsSize,
+                                                                   long 
logEndOffset,
+                                                                   
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
+            if (retentionSize > -1) {
+                long remoteLogSizeBytes = 0L;
+                for (Integer epoch : epochEntries.navigableKeySet()) {
+                    // remoteLogSize(topicIdPartition, epochEntry.epoch) may 
not be completely accurate as the remote
+                    // log size may be computed for all the segments but not 
for segments with in the current
+                    // partition's leader epoch lineage. Better to revisit 
this API.
+                    // remoteLogSizeBytes += 
remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch);
+                    Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);

Review Comment:
   Note that same segment may span across multiple epochs. Hence, same segment 
ID will be returned multiple times here and we will count it's size multiple 
times. 
   
   May I suggest:
   
   ```
   epochEntries.navigableKeySet().iterator
       .flatMap(epochEntry => 
remoteLogMetadataManager.listRemoteLogSegments(tpId, epochEntry.epoch).asScala)
       .filter(isRemoteSegmentWithinLeaderEpochs(epochEntries, _, logEndOffset))
       .distinctBy(_.remoteLogSegmentId.id())
       .map(segment => segment.segmentSizeInBytes())
       .reduceOption((a, b) => a.add(b))
       .getOrElse(0)
   ```
   
   
   Also, if you agree that this was a bug, please add a unit test that should 
have failed.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -698,11 +707,329 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                logger.debug("No leader epoch cache available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final Set<Integer> epochsSet = new HashSet<>();
+            // Good to have an API from RLMM to get all the remote leader 
epochs of all the segments of a partition
+            // instead of going through all the segments and building it here.
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            NavigableMap<Integer, Long> epochWithOffsets = 
leaderEpochCache.epochWithOffsets();
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
+
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
+                    log.onlyLocalLogSegmentsSize(), log.logEndOffset(), 
epochWithOffsets);
+            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochIterator.hasNext()) {
+                Integer epoch = epochIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    if (isCancelled() || !isLeader()) {
+                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
+                        return;
+                    }
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+
+                    // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
+                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
log.logEndOffset(), epochWithOffsets)) {
+                        isSegmentDeleted =
+                                
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
log.logStartOffset());
+                    }
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are 
less than the earliest-epoch known
+            // to the leader. This will remove the unreferenced segments in 
the remote storage. This is needed for
+            // unclean leader election scenarios as the remote storage can 
have epochs earlier to the current leader's
+            // earliest leader epoch.
+            if (earliestEpochEntryOptional.isPresent()) {
+                EpochEntry earliestEpochEntry = 
earliestEpochEntryOptional.get();
+                Iterator<Integer> epochsToClean = 
remoteLeaderEpochs.stream().filter(x -> x < 
earliestEpochEntry.epoch).iterator();
+                while (epochsToClean.hasNext()) {
+                    int epoch = epochsToClean.next();
+                    Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                    while (segmentsToBeCleaned.hasNext()) {
+                        if (isCancelled() || !isLeader()) {
+                            return;
+                        }
+                        // No need to update the log-start-offset even though 
the segment is deleted as these epochs/offsets are earlier to that value.
+                        
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentsToBeCleaned.next());
+                    }
+                }
+            }
+
+            // Update log start offset with the computed value after retention 
cleanup is done
+            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> 
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+        }
+
+        private Optional<RetentionTimeData> buildRetentionTimeData(long 
retentionMs) {
+            return retentionMs > -1
+                    ? Optional.of(new RetentionTimeData(retentionMs, 
time.milliseconds() - retentionMs))
+                    : Optional.empty();
+        }
+
+        private Optional<RetentionSizeData> buildRetentionSizeData(long 
retentionSize,
+                                                                   long 
onlyLocalLogSegmentsSize,
+                                                                   long 
logEndOffset,
+                                                                   
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
+            if (retentionSize > -1) {
+                long remoteLogSizeBytes = 0L;
+                for (Integer epoch : epochEntries.navigableKeySet()) {
+                    // remoteLogSize(topicIdPartition, epochEntry.epoch) may 
not be completely accurate as the remote
+                    // log size may be computed for all the segments but not 
for segments with in the current
+                    // partition's leader epoch lineage. Better to revisit 
this API.
+                    // remoteLogSizeBytes += 
remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch);
+                    Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                    while (segmentsIterator.hasNext()) {
+                        RemoteLogSegmentMetadata segmentMetadata = 
segmentsIterator.next();
+                        if (isRemoteSegmentWithinLeaderEpochs(segmentMetadata, 
logEndOffset, epochEntries)) {
+                            remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
+                        }
+                    }
+                }
+
+                // This is the total size of segments in local log that have 
their base-offset > local-log-start-offset
+                // and size of the segments in remote storage which have their 
end-offset < local-log-start-offset.
+                long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
+                if (totalSize > retentionSize) {
+                    long remainingBreachedSize = totalSize - retentionSize;
+                    RetentionSizeData retentionSizeData = new 
RetentionSizeData(retentionSize, remainingBreachedSize);
+                    return Optional.of(retentionSizeData);
+                }
+            }
+
+            return Optional.empty();
+        }
+
         public String toString() {
             return this.getClass().toString() + "[" + topicIdPartition + "]";
         }
     }
 
+    /**
+     * Returns true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
+     * The constraints here are as follows:
+     * - The segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset in the partition leader epoch lineage.
+     * - The segment's end offset should be less than or equal to the 
respective leader epoch's offset in the partition leader epoch lineage.
+     * - The segment's epoch lineage(epoch and offset) should be same as 
leader epoch lineage((epoch and offset)) except
+     * for the first and the last epochs in the segment.
+     *
+     * @param segmentMetadata The remote segment metadata to be validated.
+     * @param logEndOffset    The log end offset of the partition.
+     * @param leaderEpochs    The leader epoch lineage of the partition.
+     * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
+     */
+    // Visible for testing
+    public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,

Review Comment:
   you need to use this to correctly filter out segments at 
`findOffsetByTimestamp` method as well please.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2173,6 +2225,14 @@ object UnifiedLog extends Logging {
     }
   }
 
+  private[log] def localRetentionMs(config: LogConfig): Long = {
+    if (config.remoteLogConfig.remoteStorageEnable) 
config.remoteLogConfig.localRetentionMs else config.retentionMs
+  }
+
+  private[log] def localRetentionSize(config: LogConfig): Long = {
+    if (config.remoteLogConfig.remoteStorageEnable) 
config.remoteLogConfig.localRetentionBytes else config.retentionSize
+  }
+

Review Comment:
   You can instead use similar methods already present in LogConfig.
   
   see LogConfig.localRetentionBytes() and LogConfig.localRetentionMs()
   
   (you will probably have to modify them to add new case of `if 
(config.remoteLogConfig.remoteStorageEnable)`
   
   



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -102,9 +102,9 @@ public String topicWarningMessage(String topicName) {
 
     public static class RemoteLogConfig {
 
-        private final boolean remoteStorageEnable;
-        private final long localRetentionMs;
-        private final long localRetentionBytes;
+        public final boolean remoteStorageEnable;
+        public final long localRetentionMs;
+        public final long localRetentionBytes;

Review Comment:
   I believe we already have public accessor functions in LogConfig for these.
   
   See LogConfig.localRetentionMs(), LogConfig.localRetentionBytes() and 
LogConfig.remoteStorageEnable()



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
     
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+    info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+      s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
     logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   The code in this PR still uses this method. No? What am I missing?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -696,11 +704,327 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                logger.debug("No leader epoch cache available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final Set<Integer> epochsSet = new HashSet<>();
+            // Good to have an API from RLMM to get all the remote leader 
epochs of all the segments of a partition
+            // instead of going through all the segments and building it here.
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            NavigableMap<Integer, Long> epochWithOffsets = 
leaderEpochCache.epochWithOffsets();
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
+
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
+                    log.onlyLocalLogSegmentsSize(), log.logEndOffset(), 
epochWithOffsets);
+            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochIterator.hasNext()) {
+                Integer epoch = epochIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    if (isCancelled() || !isLeader()) {
+                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
+                        return;
+                    }
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+
+                    // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
+                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
log.logEndOffset(), epochWithOffsets)) {
+                        isSegmentDeleted =
+                                
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
log.logStartOffset());
+                    }
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are 
less than the earliest-epoch known

Review Comment:
   Isn't it possible for older epoch chain to become the current chain after 
another unclean election?
   
   For example:
   
   Time T1: Leader epoch chain
   ```
   -----------------------------
   leader-epoch | start-offset |
   -----------------------------
        0              0
        1              180
        2              400
   -----------------------------
   ```
   Time T2: Unclean leader election occurs where the new leader loses all 
existing data and starts with new leader epoch
   ```
   -----------------------------
   leader-epoch | start-offset |
   -----------------------------
        3              700
        4              780
        6              900
        7              990                                      
   -----------------------------
   ```
   Time T3: Unclean leader election occurs again but the old leader from T1 
becomes new leader (epoch 8). In this case, the current epoch chain will be 
0->1->2->8. But we have deleted data from remote already pertaining to 0,1 and 
2, even if it was not eligible for deletion based on retention. 
   
   To remedy this situation, may I suggest that we delete the unreferenced 
segments "only" if we definitely know that they can be cleaned i.e. when they 
have exceeded the retention time or when the size in remote itself is greater 
than retention size. I have to check but I believe that local log solves it in 
a similar manner.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to