kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1317237804
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -954,24 +953,29 @@ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, Ex
RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
- boolean isSegmentDeleted = true;
- while (isSegmentDeleted && epochIterator.hasNext()) {
+ boolean canProcess = true;
+ while (canProcess && epochIterator.hasNext()) {
Integer epoch = epochIterator.next();
Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
- while (isSegmentDeleted && segmentsIterator.hasNext()) {
+ while (canProcess && segmentsIterator.hasNext()) {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
return;
}
RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
// check whether the segment contains the required epoch
range with in the current leader epoch lineage.
- if (isRemoteSegmentWithinLeaderEpochs(metadata,
logEndOffset, epochWithOffsets)) {
+ boolean isValidSegment =
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
+ boolean isSegmentDeleted = false;
+ if (isValidSegment) {
isSegmentDeleted =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
-
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
logStartOffset);
+
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+ }
+ if (!isSegmentDeleted) {
+ isSegmentDeleted =
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
logStartOffset);
Review Comment:
Addressed the review comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]