This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 91bd1baff01 KAFKA-16890: Compute valid log-start-offset when deleting 
overlapping remote segments (#16237)
91bd1baff01 is described below

commit 91bd1baff014b1d19376cd0217f918358b295093
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Jun 13 05:18:30 2024 +0530

    KAFKA-16890: Compute valid log-start-offset when deleting overlapping 
remote segments (#16237)
    
    The listRemoteLogSegments returns the metadata list sorted by the 
start-offset. However, the returned metadata list contains all the uploaded 
segment information including the duplicate and overlapping 
remote-log-segments. The reason for duplicate/overlapping remote-log-segments 
cases is explained 
[here](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java#L103).
    
    The list returned by the RLMM#listRemoteLogSegments can contain the 
duplicate segment metadata at the end of the list. So, while computing the next 
log-start-offset we should take the maximum of segments (end-offset + 1).
    
    Reviewers: Satish Duggana <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  8 ++-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 69 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index c1c87d579ef..3eacbea475f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -983,7 +983,9 @@ public class RemoteLogManager implements Closeable {
                     }
                 }
                 if (shouldDeleteSegment) {
-                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    if (!logStartOffset.isPresent() || 
logStartOffset.getAsLong() < metadata.endOffset() + 1) {
+                        logStartOffset = OptionalLong.of(metadata.endOffset() 
+ 1);
+                    }
                     logger.info("About to delete remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
                             metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
                 }
@@ -1000,7 +1002,9 @@ public class RemoteLogManager implements Closeable {
                     remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
                     // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
                     // are ascending with in an epoch.
-                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    if (!logStartOffset.isPresent() || 
logStartOffset.getAsLong() < metadata.endOffset() + 1) {
+                        logStartOffset = OptionalLong.of(metadata.endOffset() 
+ 1);
+                    }
                     logger.info("About to delete remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
                             metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
                 }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4c4976f060d..3c9b8a48e90 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -2055,6 +2055,75 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
     }
 
+    @ParameterizedTest(name = 
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} 
retentionMs={1}")
+    @CsvSource(value = {"0, -1", "-1, 0"})
+    public void testDeletionOnOverlappingRetentionBreachedSegments(long 
retentionSize,
+                                                                   long 
retentionMs)
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", retentionMs);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        RemoteLogSegmentMetadata metadata1 = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024,
+                epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED)
+                .get(0);
+        // overlapping segment
+        RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                metadata1.startOffset(), metadata1.endOffset() + 5, 
metadata1.maxTimestampMs(),
+                metadata1.brokerId() + 1, metadata1.eventTimestampMs(), 
metadata1.segmentSizeInBytes() + 128,
+                metadata1.customMetadata(), metadata1.state(), 
metadata1.segmentLeaderEpochs());
+
+        // When there are overlapping/duplicate segments, the 
RemoteLogMetadataManager#listRemoteLogSegments
+        // returns the segments in order of (valid ++ unreferenced) segments:
+        // (eg) B0 uploaded segment S0 with offsets 0-100 and B1 uploaded 
segment S1 with offsets 0-200.
+        //      We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+        //      The order of segments returned by listRemoteLogSegments will 
be S1, S0.
+        // While computing the next-log-start-offset, taking the max of 
deleted segment's end-offset + 1.
+        List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
+        metadataList.add(metadata2);
+        metadataList.add(metadata1);
+
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+        // Verify the metrics for remote deletes and for failures is zero 
before attempt to delete segments
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+        task.cleanupExpiredRemoteLogSegments();
+
+        assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
+
+        // Verify the metric for remote delete is updated correctly
+        assertEquals(2, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+        // Verify we did not report any failure for remote deletes
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(2, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+    }
+
     @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1}")
     @CsvSource(value = {"0, -1", "-1, 0"})
     public void testRemoteDeleteLagsOnRetentionBreachedSegments(long 
retentionSize,

Reply via email to