divijvaidya commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1166622209


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -581,11 +588,18 @@ public void run() {
                 if (isLeader()) {
                     // Copy log segments to remote storage
                     copyLogSegmentsToRemote();
+                    // Cleanup/delete expired remote log segments
+                    cleanupExpiredRemoteLogSegments();

Review Comment:
   Should this operation be performed in a separate thread pool which can have 
a defined quota? (similar to how we perform cleaning for local log using 
separate cleaner/background threads).
   
   I am concerned that this may impact the rate of copy to remote if amount of 
cleaning is large. Also, it's perhaps better to have different scaling 
characteristics for cleaning from remote vs. copying. Copying maybe considered 
urgent since slowness in copying can potentially fill up disk whereas cleaning 
from remote may be a lower priority activity.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each
+            // iteration. But the logic can become little complex and need to 
cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining 
these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {

Review Comment:
   We don't need to calculate this for time based retention. Right? If yes, can 
we refactor the code here so that we perform size calculation (since it 
requires a full scan over all log segments) only when it's required i.e. for 
size based retention.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each
+            // iteration. But the logic can become little complex and need to 
cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining 
these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += 
segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their 
base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their 
end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + 
totalSizeEarlierToLocalLogStartOffset;
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            long remainingBreachedSize = totalSize - 
log.config().retentionSize;
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(remainingBreachedSize);
+
+            boolean checkTimestampRetention = retentionMs > -1;
+            long cleanupTs = time.milliseconds() - retentionMs;
+            boolean checkSizeRetention = log.config().retentionSize > -1;
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = 
leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, 
epochEntry.epoch);

Review Comment:
   we are only interested in segments in state COPY_SEGMENT_FINISHED or 
DELETE_SEGMENT_STARTED here. Right? (DELETE_SEGMENT_STARTED to clean up any 
stragglers)
   
   Can we make this more explicit by filtering on them?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);

Review Comment:
   please guard this update with `isLeader()`



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each
+            // iteration. But the logic can become little complex and need to 
cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining 
these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += 
segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their 
base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their 
end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + 
totalSizeEarlierToLocalLogStartOffset;
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            long remainingBreachedSize = totalSize - 
log.config().retentionSize;
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(remainingBreachedSize);
+
+            boolean checkTimestampRetention = retentionMs > -1;
+            long cleanupTs = time.milliseconds() - retentionMs;
+            boolean checkSizeRetention = log.config().retentionSize > -1;
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = 
leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, 
epochEntry.epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+                    isSegmentDeleted =

Review Comment:
   could we check for `if (isCancelled() || !isLeader())` here again please to 
short circuit this expensive loop during shutdown.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each

Review Comment:
   I would appreciate your response on 
https://lists.apache.org/thread/pc6x3xr4b8y84g43dcbdzh170kb5oz1v where we 
discussed that even with caching mechanism, warm up of the cache is going to 
slow down the copying. I agree that this can be discussed outside the scope of 
this PR but adding the above thread as FYI.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1378,20 +1416,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  private[log] def localRetentionMs: Long = {

Review Comment:
   I would suggest to have address local retention in a separate PR. We can 
limit this PR to handling remote log retention only.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
     
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+    info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+      s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
     logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {
+    _localLogStartOffset = offset
+
+    if (highWatermark < offset) {
+      updateHighWatermark(offset)
+    }
+
+    if (this.recoveryPoint < offset) {
+      localLog.updateRecoveryPoint(offset)
+    }
+  }
+
+  def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
+    if (!remoteLogEnabled()) {
+      warn("Ignoring the call as the remote log storage is disabled")

Review Comment:
   Do we need this at a warn level?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.

Review Comment:
   Unreferenced segments are not only the ones which have a lower epoch that 
earliest known epoch, they could also be ones which have an epoch that is not 
part of active epoch chain. How are we handling that?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())

Review Comment:
   `isCancelled() || !isLeader()`
   
   Also, please add a log which can help operator understand that this was 
actually cancelled. 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size 
after deletion will be " +
+                            "${remainingBreachedSize + 
log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean 
checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long 
retentionMs)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= 
cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time " +
+                            "{}ms breach based on the largest record timestamp 
in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each
+            // iteration. But the logic can become little complex and need to 
cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining 
these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {

Review Comment:
   The current logic may cause deletion of more data than anticipated.
   
   This is because it is possible to have remote segments satisfying this 
condition which are not part of the current leadership epoch chain. Calculation 
of totalSizeEarlierToLocalLogStartOffset may include these segments as well and 
hence, the calculation of totalSize will include local segments + all remote 
segments (< local log start offset). 
   The calculated TotalSize will be actually larger than the actual total size 
(where actual total size = size of remote + local log for the active epoch 
chain). This will lead to higher value of remainingBreachedSize than actual and 
hence, more data gets deleted than necessary.
   
   Is this making sense? Else I can provide an example to explain it better.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to