kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1266989386


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##########
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
     void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
                                                    Long leaderEpochEndOffset) {
+        // If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+        // Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+        // segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+        // (eg)
+        // case-1: Duplicate segment
+        //      L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+        //      We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+        // case-2: Overlapping segments
+        //      L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+        //      and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+        Map.Entry<Long, RemoteLogSegmentId> lastEntry = offsetToId.lastEntry();
+        while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   1. `highestLogOffset` is the highest end offset of all the uploaded segments 
so far, so while removing the last entry inside the while loop,  updating the 
`highestLogOffset` value is not required. A last entry is eligible to be 
removed only when the `highestLogOffset` seen so far is lesser than the current 
segment end offset. 
   2. The case that you mentioned is about unclean leader election. A passive 
segment is eligible for upload to remote storage only when the 
last-stable-offset is ahead of the passive segment end offset. When B2 was 
restored, B1 was the last standing replica and died with disk failure, so B2 
can only be elected with unclean leader election. 
   
   When B2 elected as unclean leader, it will have 2 segments [1-11] and 
[12-101], where the second segment [12 - 101] contains messages written by more 
than one leader epoch (LE0:12-80 and LE1: 81-101). B2 cannot be able to upload 
the segment S2 (12:101) as the highest offset uploaded so far in the remote 
storage for LE0 is 112. B2 can only be able to upload from segments which 
contain offset 113. 
   
   When B1 was brought back as replica, it truncates itself up-to the largest 
common log prefix offset (target-offset) including the leader-epoch-checkpoint 
file so it cannot serve the uploaded segments from remote storage. If the 
target-offset is lesser than the local-log-start-offset, then the uploaded 
remote log segments will be cleared eventually when smallest (leader_epoch, 
start_offset) in the leader-epoch-checkpoint file is greater the unreferenced 
segment epoch.
   
   `RemoteLogLeaderEpochState` is maintained for one epoch, we cannot extend it 
for multiple leader_epochs.
   
   In remote storage, the records for the offsets from [81-112] will be 
maintained for both LE0 and LE1. With 
[KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation),
 the fetcher can detect and handle the log truncation themselves.
   
   Thanks for the detailed review! Let me know if I'm missing something.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to