This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18199028a67 KAFKA-17995: Fix errors in remote segment cleanup when 
retention.ms is large (#17794)
18199028a67 is described below

commit 18199028a672fd973ac37bf26316994babc2a6da
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Nov 14 18:24:45 2024 +0800

    KAFKA-17995: Fix errors in remote segment cleanup when retention.ms is 
large (#17794)
    
    If a user has configured value of `retention.ms` to a value greater than 
current unix timestamp epoch, then we fail cleanup of a remote log segment with 
an error. This change fixes the bug by handling this case of large 
`retention.ms` correctly.
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  5 ++--
 .../kafka/log/remote/RemoteLogManagerTest.java     | 29 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 237619bceea..fbe26e8a673 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -1399,8 +1399,9 @@ public class RemoteLogManager implements Closeable {
         }
 
         private Optional<RetentionTimeData> buildRetentionTimeData(long 
retentionMs) {
-            return retentionMs > -1
-                    ? Optional.of(new RetentionTimeData(retentionMs, 
time.milliseconds() - retentionMs))
+            long cleanupUntilMs = time.milliseconds() - retentionMs;
+            return retentionMs > -1 && cleanupUntilMs >= 0
+                    ? Optional.of(new RetentionTimeData(retentionMs, 
cleanupUntilMs))
                     : Optional.empty();
         }
 
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 881da2b32c9..21ae868aecc 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -2765,6 +2765,35 @@ public class RemoteLogManagerTest {
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
+    @Test
+    public void testDeleteRetentionMsBiggerThanTimeMs() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+        // add 1 month to the current time to avoid flaky test
+        LogConfig mockLogConfig = new LogConfig(Map.of("retention.ms", 
time.milliseconds() + 24 * 30 * 60 * 60 * 1000L));
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        RemoteLogManager.RLMExpirationTask leaderTask = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+        List<RemoteLogSegmentMetadata> metadataList =
+            listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+            .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+            .thenAnswer(ans -> metadataList.iterator());
+
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        assertDoesNotThrow(leaderTask::cleanupExpiredRemoteLogSegments);
+
+        verify(remoteStorageManager, never()).deleteLogSegmentData(any());
+    }
+
     @ParameterizedTest(name = "testFailedDeleteExpiredSegments 
retentionSize={0} retentionMs={1}")
     @CsvSource(value = {"0, -1", "-1, 0"})
     public void testFailedDeleteExpiredSegments(long retentionSize,

Reply via email to