This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e26e7c06e75 KAFKA-20091: Fix inconsistency in time-based retention
checks between remote and local segment deletion logic. (#21352)
e26e7c06e75 is described below
commit e26e7c06e756e26022fb9f769f5f71fbc79c18b3
Author: Jian <[email protected]>
AuthorDate: Wed Jan 28 18:24:51 2026 +0800
KAFKA-20091: Fix inconsistency in time-based retention checks between
remote and local segment deletion logic. (#21352)
When currentTime- largestTimestamp of segment = retentionMs, the
behavior is different:
1. Local Segment delete: **Not delete**
```
org.apache.kafka.storage.internals.log.UnifiedLog#deleteRetentionMsBreachedSegments
boolean delete = startMs - segment.largestTimestamp() > retentionMs;
```
2. Remote segement delete: **Delete**
```
org.apache.kafka.server.log.remote.storage.RemoteLogManager.RLMExpirationTask.RemoteLogRetentionHandler#isSegmentBreachedByRetentionTime
shouldDeleteSegment = metadata.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs;
//cleanupUntilMs is time.milliseconds() - retentionMs;
private Optional<RetentionTimeData> buildRetentionTimeData(long
retentionMs) {
long cleanupUntilMs = time.milliseconds() - retentionMs;
return retentionMs > -1 && cleanupUntilMs >= 0
? Optional.of(new RetentionTimeData(retentionMs,
cleanupUntilMs))
: Optional.empty();
}
```
cc @kamalcph
Thanks for your comments on
[KIP-1241](https://cwiki.apache.org/confluence/x/A4LMFw) . While I
thinking through one of the your comments and reading the code, I
noticed a potential problem in this area, so I opened this PR. Many
thanks.
---------
Signed-off-by: stroller <[email protected]>
Reviewers: Saket Ranjan <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
.../log/remote/storage/RemoteLogManager.java | 2 +-
.../log/remote/storage/RemoteLogManagerTest.java | 57 ++++++++++++++++------
2 files changed, 42 insertions(+), 17 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 443f955a5fb..08c0f25b90c 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -1207,7 +1207,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
if (retentionTimeData.isEmpty()) {
return shouldDeleteSegment;
}
- shouldDeleteSegment = metadata.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs;
+ shouldDeleteSegment = metadata.maxTimestampMs() <
retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index c8efe9884d7..435c9958b5d 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -2593,10 +2593,12 @@ public class RemoteLogManagerTest {
verify(remoteStorageManager,
times(1)).deleteLogSegmentData(remoteLogSegmentMetadata);
}
- @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
- @CsvSource(value = {"0, -1", "-1, 0"})
+ // expectDeletion=false tests that segments with maxTimestampMs ==
cleanupUntilMs are NOT deleted (strict less-than)
+ @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments
retentionSize={0} retentionMs={1} expectDeletion={2}")
+ @CsvSource(value = {"0, -1, true", "-1, 0, true", "-1, 0, false"})
public void testDeletionOnRetentionBreachedSegments(long retentionSize,
- long retentionMs)
+ long retentionMs,
+ boolean expectDeletion)
throws RemoteStorageException, ExecutionException,
InterruptedException {
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", retentionSize);
@@ -2630,19 +2632,27 @@ public class RemoteLogManagerTest {
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
+ if (expectDeletion) {
+ advanceTimeToMakeSegmentDeletable();
+ }
task.cleanupExpiredRemoteLogSegments();
- assertEquals(200L, currentLogStartOffset.get());
- verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
- verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
-
- // Verify the metric for remote delete is updated correctly
- assertEquals(2,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
- // Verify we did not report any failure for remote deletes
- assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
- // Verify aggregate metrics
- assertEquals(2,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
- assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+ if (expectDeletion) {
+ assertEquals(200L, currentLogStartOffset.get());
+
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
+
+ // Verify the metric for remote delete is updated correctly
+ assertEquals(2,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+ // Verify we did not report any failure for remote deletes
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(2,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+ } else {
+ assertEquals(0L, currentLogStartOffset.get());
+ verify(remoteStorageManager, never()).deleteLogSegmentData(any());
+ }
}
@ParameterizedTest(name =
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0}
retentionMs={1}")
@@ -2698,6 +2708,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
+ advanceTimeToMakeSegmentDeletable();
task.cleanupExpiredRemoteLogSegments();
assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
@@ -2752,7 +2763,7 @@ public class RemoteLogManagerTest {
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
verifyRemoteDeleteMetrics(0L, 0L);
-
+ advanceTimeToMakeSegmentDeletable();
task.cleanupExpiredRemoteLogSegments();
assertEquals(200L, currentLogStartOffset.get());
@@ -2884,6 +2895,7 @@ public class RemoteLogManagerTest {
});
});
+ advanceTimeToMakeSegmentDeletable();
leaderTask.cleanupExpiredRemoteLogSegments();
assertEquals(200L, currentLogStartOffset.get());
@@ -2981,6 +2993,7 @@ public class RemoteLogManagerTest {
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
doThrow(new RemoteStorageException("Failed to delete
segment")).when(remoteStorageManager).deleteLogSegmentData(any());
+ advanceTimeToMakeSegmentDeletable();
assertThrows(RemoteStorageException.class,
task::cleanupExpiredRemoteLogSegments);
assertEquals(100L, currentLogStartOffset.get());
@@ -3036,6 +3049,7 @@ public class RemoteLogManagerTest {
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
doThrow(new RetriableRemoteStorageException("Failed to delete segment
with retriable
exception")).when(remoteStorageManager).deleteLogSegmentData(any());
+ advanceTimeToMakeSegmentDeletable();
assertThrows(RetriableRemoteStorageException.class,
task::cleanupExpiredRemoteLogSegments);
assertEquals(100L, currentLogStartOffset.get());
@@ -3129,6 +3143,15 @@ public class RemoteLogManagerTest {
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount);
}
+ /**
+ * Segments created with current time won't be deleted immediately since
+ * retention check uses {@code maxTimestampMs < cleanupUntilMs} (not
{@code <=}).
+ * Advance time by 1ms to make segments eligible for deletion.
+ */
+ private void advanceTimeToMakeSegmentDeletable() {
+ ((MockTime) time).sleep(1);
+ }
+
private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long
remoteDeleteLagSegments) {
assertEquals(remoteDeleteLagBytes,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
String.format("Expected to find %d for RemoteDeleteLagBytes
metric value, but found %d",
@@ -3256,7 +3279,9 @@ public class RemoteLogManagerTest {
for (int idx = 0; idx < segmentCount; idx++) {
long timestamp = time.milliseconds();
if (idx < deletableSegmentCount) {
- timestamp = time.milliseconds() - 1;
+ // Use -2 instead of -1 because some test cases use
retentionMs=1.
+ // With -1, segment's maxTimestampMs == cleanupUntilMs, so the
segment won't be deleted.
+ timestamp = time.milliseconds() - 2;
}
long startOffset = (long) idx * recordsPerSegment;
long endOffset = startOffset + recordsPerSegment - 1;