This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 91bd1baff01 KAFKA-16890: Compute valid log-start-offset when deleting
overlapping remote segments (#16237)
91bd1baff01 is described below
commit 91bd1baff014b1d19376cd0217f918358b295093
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Jun 13 05:18:30 2024 +0530
KAFKA-16890: Compute valid log-start-offset when deleting overlapping
remote segments (#16237)
The listRemoteLogSegments returns the metadata list sorted by the
start-offset. However, the returned metadata list contains all the uploaded
segment information including the duplicate and overlapping
remote-log-segments. The reason for duplicate/overlapping remote-log-segments
cases is explained
[here](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java#L103).
The list returned by the RLMM#listRemoteLogSegments can contain the
duplicate segment metadata at the end of the list. So, while computing the next
log-start-offset we should take the maximum of segments (end-offset + 1).
Reviewers: Satish Duggana <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 8 ++-
.../kafka/log/remote/RemoteLogManagerTest.java | 69 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index c1c87d579ef..3eacbea475f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -983,7 +983,9 @@ public class RemoteLogManager implements Closeable {
}
}
if (shouldDeleteSegment) {
- logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ if (!logStartOffset.isPresent() ||
logStartOffset.getAsLong() < metadata.endOffset() + 1) {
+ logStartOffset = OptionalLong.of(metadata.endOffset()
+ 1);
+ }
logger.info("About to delete remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
}
@@ -1000,7 +1002,9 @@ public class RemoteLogManager implements Closeable {
remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
// are ascending with in an epoch.
- logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ if (!logStartOffset.isPresent() ||
logStartOffset.getAsLong() < metadata.endOffset() + 1) {
+ logStartOffset = OptionalLong.of(metadata.endOffset()
+ 1);
+ }
logger.info("About to delete remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4c4976f060d..3c9b8a48e90 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -2055,6 +2055,75 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
}
+ @ParameterizedTest(name =
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0}
retentionMs={1}")
+ @CsvSource(value = {"0, -1", "-1, 0"})
+ public void testDeletionOnOverlappingRetentionBreachedSegments(long
retentionSize,
+ long
retentionMs)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", retentionMs);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ RemoteLogSegmentMetadata metadata1 =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024,
+ epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED)
+ .get(0);
+ // overlapping segment
+ RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+ metadata1.startOffset(), metadata1.endOffset() + 5,
metadata1.maxTimestampMs(),
+ metadata1.brokerId() + 1, metadata1.eventTimestampMs(),
metadata1.segmentSizeInBytes() + 128,
+ metadata1.customMetadata(), metadata1.state(),
metadata1.segmentLeaderEpochs());
+
+ // When there are overlapping/duplicate segments, the
RemoteLogMetadataManager#listRemoteLogSegments
+ // returns the segments in order of (valid ++ unreferenced) segments:
+ // (eg) B0 uploaded segment S0 with offsets 0-100 and B1 uploaded
segment S1 with offsets 0-200.
+ // We will mark the segment S0 as duplicate and add it to
unreferencedSegmentIds.
+ // The order of segments returned by listRemoteLogSegments will
be S1, S0.
+ // While computing the next-log-start-offset, taking the max of
deleted segment's end-offset + 1.
+ List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
+ metadataList.add(metadata2);
+ metadataList.add(metadata1);
+
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+ // Verify the metrics for remote deletes and for failures is zero
before attempt to delete segments
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(0);
+ task.cleanupExpiredRemoteLogSegments();
+
+ assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
+
+ // Verify the metric for remote delete is updated correctly
+ assertEquals(2,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+ // Verify we did not report any failure for remote deletes
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(2,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+ }
+
@ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
@CsvSource(value = {"0, -1", "-1, 0"})
public void testRemoteDeleteLagsOnRetentionBreachedSegments(long
retentionSize,