kamalcph commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1279280694


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) 
throws InterruptedException
                 if (lso < 0) {
                     logger.warn("lastStableOffset for partition {} is {}, 
which should not be negative.", topicIdPartition, lso);
                 } else if (lso > 0 && copiedOffset < lso) {
-                    // Copy segments only till the last-stable-offset as 
remote storage should contain only committed/acked
-                    // messages
-                    long toOffset = lso;
-                    logger.debug("Checking for segments to copy, copiedOffset: 
{} and toOffset: {}", copiedOffset, toOffset);
-                    long activeSegBaseOffset = 
log.activeSegment().baseOffset();
                     // log-start-offset can be ahead of the read-offset, when:
                     // 1) log-start-offset gets incremented via delete-records 
API (or)
                     // 2) enabling the remote log for the first time
                     long fromOffset = Math.max(copiedOffset + 1, 
log.logStartOffset());
-                    ArrayList<LogSegment> sortedSegments = new 
ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, 
toOffset)));
-                    
sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
-                    List<Long> sortedBaseOffsets = 
sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList());
-                    int activeSegIndex = 
Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset);
-
-                    // sortedSegments becomes empty list when fromOffset and 
toOffset are same, and activeSegIndex becomes -1
-                    if (activeSegIndex < 0) {
+                    long activeSegmentBaseOffset = 
log.activeSegment().baseOffset();
+
+                    // Segments which match the following criteria are 
eligible for copying to remote storage:
+                    // 1) Segment is not the active segment and
+                    // 2) Segment end-offset is less than the 
last-stable-offset as remote storage should contain only
+                    //    committed/acked messages
+                    List<EnrichedLogSegment> candidateSegments = 
enrichedLogSegments(log, fromOffset)
+                            .stream()
+                            .filter(enrichedSegment ->  
enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && 
enrichedSegment.nextSegmentOffset <= lso)

Review Comment:
   > If a segment contains 0 records and log.roll.ms timeout passed, then what 
will be the baseOffset of next active segment? 
   
   If the active segment is empty, then it [won't be 
rotated](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L71).
 
   
   The in-memory check `enrichedSegment.logSegment.baseOffset() != 
activeSegmentBaseOffset` is kind of redundant, we can remove it later when 
required.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to