divijvaidya commented on code in PR #14349:
URL: https://github.com/apache/kafka/pull/14349#discussion_r1318506043


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -818,56 +818,51 @@ public 
RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData,
                 remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
             }
 
-            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
                 if (!retentionSizeData.isPresent()) {
                     return false;
                 }
 
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
-                    // Assumption that segments contain size >= 0
-                    if (remainingBreachedSize > 0) {
-                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
-                        if (remainingBytes >= 0) {
-                            remainingBreachedSize = remainingBytes;
-                            return true;
-                        }
+                boolean isSegmentDeleted = false;
+                // Assumption that segments contain size >= 0
+                if (remainingBreachedSize > 0) {
+                    long remainingBytes = remainingBreachedSize - 
metadata.segmentSizeInBytes();
+                    if (remainingBytes >= 0) {
+                        remainingBreachedSize = remainingBytes;
+                        isSegmentDeleted = true;
                     }
+                }
 
-                    return false;
-                });
                 if (isSegmentDeleted) {
                     logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
-                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                    logger.info("About to delete remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
                             metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
                 }
                 return isSegmentDeleted;
             }
 
-            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
-                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
                 if (!retentionTimeData.isPresent()) {
                     return false;
                 }
 
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
-                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                boolean isSegmentDeleted = metadata.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs;

Review Comment:
   should this boolean be renamed to "shouldDeleteSegment"



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1006,6 +1005,16 @@ private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, Ex
 
             // Update log start offset with the computed value after retention 
cleanup is done
             remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> 
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+
+            List<String> undeletedSegments = new ArrayList<>();
+            for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
+                if 
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> 
!isCancelled() && isLeader())) {
+                    
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
+                }
+            }
+            if (!undeletedSegments.isEmpty()) {
+                logger.error("The following remote segments could not be 
deleted: {}", String.join(",", undeletedSegments));

Review Comment:
   not an error since it's recoverable and expected when leadership has 
transitioned over. Perhaps a info.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1006,6 +1005,16 @@ private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, Ex
 
             // Update log start offset with the computed value after retention 
cleanup is done
             remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> 
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

Review Comment:
   we might have a situation where we don't delete the segments because we are 
not the leader anymore but we still update the log start offset. It is ok to do 
that. Please add a comment about it here. For example:
   1. we update log start offset
   2. replica fetch takes place and completes. 
   3. replica updates its log start offset based on our new log start offset
   4. that replica becomes new leader
   5. we skip deleting the files here because we are not the leader anymore
   6. the new leader will still delete these files but it will delete them 
under the delete everything below log start offset case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to