clolov commented on code in PR #14128:
URL: https://github.com/apache/kafka/pull/14128#discussion_r1279333354
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log)
throws InterruptedException
if (lso < 0) {
logger.warn("lastStableOffset for partition {} is {},
which should not be negative.", topicIdPartition, lso);
} else if (lso > 0 && copiedOffset < lso) {
- // Copy segments only till the last-stable-offset as
remote storage should contain only committed/acked
- // messages
- long toOffset = lso;
- logger.debug("Checking for segments to copy, copiedOffset:
{} and toOffset: {}", copiedOffset, toOffset);
- long activeSegBaseOffset =
log.activeSegment().baseOffset();
// log-start-offset can be ahead of the read-offset, when:
// 1) log-start-offset gets incremented via delete-records
API (or)
// 2) enabling the remote log for the first time
long fromOffset = Math.max(copiedOffset + 1,
log.logStartOffset());
- ArrayList<LogSegment> sortedSegments = new
ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset,
toOffset)));
-
sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
- List<Long> sortedBaseOffsets =
sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList());
- int activeSegIndex =
Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset);
-
- // sortedSegments becomes empty list when fromOffset and
toOffset are same, and activeSegIndex becomes -1
- if (activeSegIndex < 0) {
+ long activeSegmentBaseOffset =
log.activeSegment().baseOffset();
+
+ // Segments which match the following criteria are
eligible for copying to remote storage:
+ // 1) Segment is not the active segment and
+ // 2) Segment end-offset is less than the
last-stable-offset as remote storage should contain only
+ // committed/acked messages
+ List<EnrichedLogSegment> candidateSegments =
enrichedLogSegments(log, fromOffset)
+ .stream()
+ .filter(enrichedSegment ->
enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset &&
enrichedSegment.nextSegmentOffset <= lso)
Review Comment:
Nit: Instead of doing this filtering after we have created the enriched
segments list and iterating again can we not push the condition as part of
creating the enriched segments list?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]