This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 18199028a67 KAFKA-17995: Fix errors in remote segment cleanup when
retention.ms is large (#17794)
18199028a67 is described below
commit 18199028a672fd973ac37bf26316994babc2a6da
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Nov 14 18:24:45 2024 +0800
KAFKA-17995: Fix errors in remote segment cleanup when retention.ms is
large (#17794)
If a user has configured value of `retention.ms` to a value greater than
current unix timestamp epoch, then we fail cleanup of a remote log segment with
an error. This change fixes the bug by handling this case of large
`retention.ms` correctly.
Reviewers: Divij Vaidya <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 5 ++--
.../kafka/log/remote/RemoteLogManagerTest.java | 29 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 237619bceea..fbe26e8a673 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -1399,8 +1399,9 @@ public class RemoteLogManager implements Closeable {
}
private Optional<RetentionTimeData> buildRetentionTimeData(long
retentionMs) {
- return retentionMs > -1
- ? Optional.of(new RetentionTimeData(retentionMs,
time.milliseconds() - retentionMs))
+ long cleanupUntilMs = time.milliseconds() - retentionMs;
+ return retentionMs > -1 && cleanupUntilMs >= 0
+ ? Optional.of(new RetentionTimeData(retentionMs,
cleanupUntilMs))
: Optional.empty();
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 881da2b32c9..21ae868aecc 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -2765,6 +2765,35 @@ public class RemoteLogManagerTest {
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
+ @Test
+ public void testDeleteRetentionMsBiggerThanTimeMs() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ // add 1 month to the current time to avoid flaky test
+ LogConfig mockLogConfig = new LogConfig(Map.of("retention.ms",
time.milliseconds() + 24 * 30 * 60 * 60 * 1000L));
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ RemoteLogManager.RLMExpirationTask leaderTask = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024,
epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+ assertDoesNotThrow(leaderTask::cleanupExpiredRemoteLogSegments);
+
+ verify(remoteStorageManager, never()).deleteLogSegmentData(any());
+ }
+
@ParameterizedTest(name = "testFailedDeleteExpiredSegments
retentionSize={0} retentionMs={1}")
@CsvSource(value = {"0, -1", "-1, 0"})
public void testFailedDeleteExpiredSegments(long retentionSize,