kamalcph commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1266989386
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ########## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { + // If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. + // Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the + // segment for the same leader-epoch which is a super-set of previously uploaded segments. + // (eg) + // case-1: Duplicate segment + // L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. + // We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. + // case-2: Overlapping segments + // L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, + // and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. + Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry(); + while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: 1. `highestLogOffset` is the highest end offset of all the uploaded segments so far, so while removing the last entry inside the while loop, updating the `highestLogOffset` value is not required. A last entry is eligible to be removed only when the `highestLogOffset` seen so far is lesser than the current segment end offset. 2. The case that you mentioned is about unclean leader election. A passive segment is eligible for upload to remote storage only when the last-stable-offset is ahead of the passive segment end offset. When B2 was restored, B1 was the last standing replica and died with disk failure, so B2 can only be elected with unclean leader election. When B2 elected as unclean leader, it will have 2 segments [1-11] and [12-101], where the second segment [12 - 101] contains messages written by more than one leader epoch (LE0:12-80 and LE1: 81-101). B2 cannot be able to upload the segment S2 (12:101) as the highest offset uploaded so far in the remote storage for LE0 is 112. B2 can only be able to upload from segments which contain offset 113. When B1 was brought back as replica, it truncates itself up-to the largest common log prefix offset (target-offset) including the leader-epoch-checkpoint file so it cannot serve the uploaded segments from remote storage. If the target-offset is lesser than the local-log-start-offset, then the uploaded remote log segments will be cleared eventually when smallest (leader_epoch, start_offset) in the leader-epoch-checkpoint file is greater the unreferenced segment epoch. `RemoteLogLeaderEpochState` is maintained for one epoch, we cannot extend it for multiple leader_epochs. In remote storage, the records for the offsets from [81-112] will be maintained for both LE0 and LE1. With [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation), the fetcher can detect and handle the log truncation themselves. Thanks for the detailed review! Let me know if I'm missing something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org