kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1316896078
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -954,24 +953,29 @@ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, Ex
RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
- boolean isSegmentDeleted = true;
- while (isSegmentDeleted && epochIterator.hasNext()) {
+ boolean canProcess = true;
+ while (canProcess && epochIterator.hasNext()) {
Integer epoch = epochIterator.next();
Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
- while (isSegmentDeleted && segmentsIterator.hasNext()) {
+ while (canProcess && segmentsIterator.hasNext()) {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
return;
}
RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
// check whether the segment contains the required epoch
range with in the current leader epoch lineage.
- if (isRemoteSegmentWithinLeaderEpochs(metadata,
logEndOffset, epochWithOffsets)) {
+ boolean isValidSegment =
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
+ boolean isSegmentDeleted = false;
+ if (isValidSegment) {
isSegmentDeleted =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
-
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
logStartOffset);
+
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+ }
+ if (!isSegmentDeleted) {
+ isSegmentDeleted =
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
logStartOffset);
Review Comment:
When the log-start-offset is moved by the user, the leader-epoch-checkpoint
file gets truncated as per the log-start-offset. Until the rlm-cleaner-thread
runs in the next iteration, the remote log segments won't be removed.
The `isRemoteSegmentWithinLeaderEpoch` verifies whether the epochs present
in the segment lies in the checkpoint file. It will always return false since
the checkpoint file was already truncated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]