This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a15012078b9 KAFKA-15479: Remote log segments should be considered once
for retention breach (#14407)
a15012078b9 is described below
commit a15012078b9fa7a31ede40121bbb509cb17af793
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Mon Sep 25 17:41:53 2023 +0530
KAFKA-15479: Remote log segments should be considered once for retention
breach (#14407)
When a remote log segment contains multiple epoch, then it gets considered
for multiple times during breach by retention size/time/start-offset. This will
affect the deletion by remote log retention size as it deletes the number of
segments less than expected. This is a follow-up of KAFKA-15352
Reviewers: Divij Vaidya <[email protected]>, Christo Lolov
<[email protected]>, Satish Duggana <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 90 ++---
.../kafka/log/remote/RemoteLogManagerTest.java | 367 ++++++++++++---------
2 files changed, 250 insertions(+), 207 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 484777dcf75..b25beb2e016 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -562,7 +562,6 @@ public class RemoteLogManager implements Closeable {
}
cache.truncateFromEnd(endOffset);
}
-
return checkpoint;
}
@@ -707,7 +706,8 @@ public class RemoteLogManager implements Closeable {
}
}
- private void copyLogSegment(UnifiedLog log, LogSegment segment, long
nextSegmentBaseOffset) throws InterruptedException, ExecutionException,
RemoteStorageException, IOException,
+ private void copyLogSegment(UnifiedLog log, LogSegment segment, long
nextSegmentBaseOffset)
+ throws InterruptedException, ExecutionException,
RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
@@ -833,13 +833,11 @@ public class RemoteLogManager implements Closeable {
remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
}
- private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
+ private boolean
isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) {
+ boolean shouldDeleteSegment = false;
if (!retentionSizeData.isPresent()) {
- return false;
+ return shouldDeleteSegment;
}
-
- boolean shouldDeleteSegment = false;
-
// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize -
metadata.segmentSizeInBytes();
@@ -848,7 +846,6 @@ public class RemoteLogManager implements Closeable {
shouldDeleteSegment = true;
}
}
-
if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
logger.info("About to delete remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
@@ -857,12 +854,12 @@ public class RemoteLogManager implements Closeable {
return shouldDeleteSegment;
}
- public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
+ public boolean
isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) {
+ boolean shouldDeleteSegment = false;
if (!retentionTimeData.isPresent()) {
- return false;
+ return shouldDeleteSegment;
}
-
- boolean shouldDeleteSegment = metadata.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs;
+ shouldDeleteSegment = metadata.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
@@ -874,9 +871,9 @@ public class RemoteLogManager implements Closeable {
return shouldDeleteSegment;
}
- private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
- long
logStartOffset,
-
NavigableMap<Integer, Long> leaderEpochEntries) {
+ private boolean
isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadata,
+ long
logStartOffset,
+
NavigableMap<Integer, Long> leaderEpochEntries) {
boolean shouldDeleteSegment = false;
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and
`leaderEpochEntries.firstEntry().getValue()` should be same
@@ -917,10 +914,8 @@ public class RemoteLogManager implements Closeable {
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
segmentMetadata.customMetadata(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
-
// Delete the segment in remote storage.
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
-
// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
@@ -933,7 +928,7 @@ public class RemoteLogManager implements Closeable {
}
- private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException,
ExecutionException, InterruptedException {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments cleanup as the
task state is changed");
return;
@@ -994,13 +989,15 @@ public class RemoteLogManager implements Closeable {
return;
}
RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
-
+ if (segmentsToDelete.contains(metadata)) {
+ continue;
+ }
// When the log-start-offset is moved by the user, the
leader-epoch-checkpoint file gets truncated
// as per the log-start-offset. Until the
rlm-cleaner-thread runs in the next iteration, those
// remote log segments won't be removed. The
`isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the
checkpoint file. It will always return false
// since the checkpoint file was already truncated.
- boolean shouldDeleteSegment =
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
+ boolean shouldDeleteSegment =
remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!shouldDeleteSegment) {
@@ -1008,8 +1005,8 @@ public class RemoteLogManager implements Closeable {
isValidSegment =
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
shouldDeleteSegment =
-
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+
remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
+
remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
}
}
if (shouldDeleteSegment) {
@@ -1019,6 +1016,27 @@ public class RemoteLogManager implements Closeable {
}
}
+ // Update log start offset with the computed value after retention
cleanup is done
+ remoteLogRetentionHandler.logStartOffset.ifPresent(offset ->
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+
+ // At this point in time we have updated the log start offsets,
but not initiated a deletion.
+ // Either a follower has picked up the changes to the log start
offset, or they have not.
+ // If the follower HAS picked up the changes, and they become the
leader this replica won't successfully complete
+ // the deletion.
+ // However, the new leader will correctly pick up all breaching
segments as log start offset breaching ones
+ // and delete them accordingly.
+ // If the follower HAS NOT picked up the changes, and they become
the leader then they will go through this process
+ // again and delete them with the original deletion reason i.e.
size, time or log start offset breach.
+ List<String> undeletedSegments = new ArrayList<>();
+ for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
+ if
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x ->
!isCancelled() && isLeader())) {
+
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
+ }
+ }
+ if (!undeletedSegments.isEmpty()) {
+ logger.info("The following remote segments could not be
deleted: {}", String.join(",", undeletedSegments));
+ }
+
// Remove the remote log segments whose segment-leader-epochs are
less than the earliest-epoch known
// to the leader. This will remove the unreferenced segments in
the remote storage. This is needed for
// unclean leader election scenarios as the remote storage can
have epochs earlier to the current leader's
@@ -1041,27 +1059,6 @@ public class RemoteLogManager implements Closeable {
}
}
}
-
- // Update log start offset with the computed value after retention
cleanup is done
- remoteLogRetentionHandler.logStartOffset.ifPresent(offset ->
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
-
- // At this point in time we have updated the log start offsets,
but not initiated a deletion.
- // Either a follower has picked up the changes to the log start
offset, or they have not.
- // If the follower HAS picked up the changes, and they become the
leader this replica won't successfully complete
- // the deletion.
- // However, the new leader will correctly pick up all breaching
segments as log start offset breaching ones
- // and delete them accordingly.
- // If the follower HAS NOT picked up the changes, and they become
the leader then they will go through this process
- // again and delete them with the original deletion reason i.e.
size, time or log start offset breach.
- List<String> undeletedSegments = new ArrayList<>();
- for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
- if
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x ->
!isCancelled() && isLeader())) {
-
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
- }
- }
- if (!undeletedSegments.isEmpty()) {
- logger.info("The following remote segments could not be
deleted: {}", String.join(",", undeletedSegments));
- }
}
private Optional<RetentionTimeData> buildRetentionTimeData(long
retentionMs) {
@@ -1180,7 +1177,12 @@ public class RemoteLogManager implements Closeable {
}
}
// segment end offset should be with in the log end offset.
- return segmentEndOffset < logEndOffset;
+ if (segmentEndOffset >= logEndOffset) {
+ LOGGER.debug("Segment {} end offset {} is more than log end offset
{}.",
+ segmentMetadata.remoteLogSegmentId(), segmentEndOffset,
logEndOffset);
+ return false;
+ }
+ return true;
}
/**
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 5c0578d359f..07045ed811e 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -74,6 +74,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
@@ -105,6 +106,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@@ -189,6 +191,7 @@ public class RemoteLogManagerTest {
return epochs;
}
};
+ private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
private final UnifiedLog mockLog = mock(UnifiedLog.class);
@@ -204,7 +207,7 @@ public class RemoteLogManagerTest {
kafka.utils.TestUtils.clearYammerMetrics();
remoteLogManager = new RemoteLogManager(remoteLogManagerConfig,
brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
- (topicPartition, offset) -> { },
+ (topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
@@ -1259,14 +1262,15 @@ public class RemoteLogManagerTest {
segmentEpochs7), logEndOffset, leaderEpochToStartOffset));
// Test a remote segment having larger end offset than the log end
offset
- TreeMap<Integer, Long> segmentEpochs8 = new TreeMap<>();
- segmentEpochs8.put(1, 15L);
- segmentEpochs8.put(2, 20L);
-
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
15,
95, // larger than log end offset
- segmentEpochs8), logEndOffset, leaderEpochToStartOffset));
+ leaderEpochToStartOffset), logEndOffset,
leaderEpochToStartOffset));
+
+
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+ 15,
+ 90, // equal to the log end offset
+ leaderEpochToStartOffset), logEndOffset,
leaderEpochToStartOffset));
// Test whether a segment's first offset is earlier to the respective
epoch's start offset
TreeMap<Integer, Long> segmentEpochs9 = new TreeMap<>();
@@ -1521,188 +1525,212 @@ public class RemoteLogManagerTest {
}
}
- @Test
- public void testDeleteRetentionSizeBreachingSegments() throws
RemoteStorageException, IOException {
- AtomicLong logStartOffset = new AtomicLong(0);
- try (RemoteLogManager remoteLogManager = new
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
- tp -> Optional.of(mockLog),
- (topicPartition, offset) -> logStartOffset.set(offset),
- brokerTopicStats) {
- public RemoteStorageManager createRemoteStorageManager() {
- return remoteStorageManager;
- }
- public RemoteLogMetadataManager createRemoteLogMetadataManager() {
- return remoteLogMetadataManager;
- }
- }) {
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
-
-
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
- when(mockLog.logEndOffset()).thenReturn(200L);
-
- List<EpochEntry> epochEntries =
Collections.singletonList(epochEntry0);
-
- List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024,
epochEntries);
-
-
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
- .thenReturn(remoteLogSegmentMetadatas.iterator());
-
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
- .thenReturn(remoteLogSegmentMetadatas.iterator())
- .thenReturn(remoteLogSegmentMetadatas.iterator())
- .thenReturn(remoteLogSegmentMetadatas.iterator());
-
- checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp,
checkpoint);
- when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
+ @CsvSource(value = {"0, -1", "-1, 0"})
+ public void testDeletionOnRetentionBreachedSegments(long retentionSize,
+ long retentionMs)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", retentionMs);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
- Map<String, Long> logProps = new HashMap<>();
- logProps.put("retention.bytes", 0L);
- logProps.put("retention.ms", -1L);
- LogConfig mockLogConfig = new LogConfig(logProps);
- when(mockLog.config()).thenReturn(mockLogConfig);
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
-
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
- .thenAnswer(answer -> CompletableFuture.runAsync(() -> {
}));
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
- task.run();
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(0);
+ task.cleanupExpiredRemoteLogSegments();
- assertEquals(200L, logStartOffset.get());
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
- }
+ assertEquals(200L, currentLogStartOffset.get());
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
@Test
- public void testDeleteRetentionMsBreachingSegments() throws
RemoteStorageException, IOException {
- AtomicLong logStartOffset = new AtomicLong(0);
- try (RemoteLogManager remoteLogManager = new
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
- tp -> Optional.of(mockLog),
- (topicPartition, offset) -> logStartOffset.set(offset),
- brokerTopicStats) {
- public RemoteStorageManager createRemoteStorageManager() {
- return remoteStorageManager;
- }
- public RemoteLogMetadataManager createRemoteLogMetadataManager() {
- return remoteLogMetadataManager;
- }
- }) {
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
+ public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ RemoteLogManager.RLMTask leaderTask = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ leaderTask.convertToLeader(0);
-
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
- when(mockLog.logEndOffset()).thenReturn(200L);
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
- List<EpochEntry> epochEntries =
Collections.singletonList(epochEntry0);
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
- List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024,
epochEntries);
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
-
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
- .thenReturn(remoteLogSegmentMetadatas.iterator());
-
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
- .thenReturn(remoteLogSegmentMetadatas.iterator())
- .thenReturn(remoteLogSegmentMetadatas.iterator());
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
- checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp,
checkpoint);
- when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", -1L);
+ logProps.put("retention.ms", 0L);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenAnswer(answer -> {
+ // cancel the task so that we don't delete the second
segment
+ leaderTask.cancel();
+ return CompletableFuture.runAsync(() -> {
+ });
+ });
- Map<String, Long> logProps = new HashMap<>();
- logProps.put("retention.bytes", -1L);
- logProps.put("retention.ms", 0L);
- LogConfig mockLogConfig = new LogConfig(logProps);
- when(mockLog.config()).thenReturn(mockLogConfig);
+ leaderTask.cleanupExpiredRemoteLogSegments();
-
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
- .thenAnswer(answer -> CompletableFuture.runAsync(() -> {
}));
+ assertEquals(200L, currentLogStartOffset.get());
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+ verify(remoteStorageManager,
never()).deleteLogSegmentData(metadataList.get(1));
- task.run();
+ // test that the 2nd log segment will be deleted by the new leader
+ RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new
RLMTask(followerTopicIdPartition, 128);
+ newLeaderTask.convertToLeader(1);
- assertEquals(200L, logStartOffset.get());
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
- }
- }
+ Iterator<RemoteLogSegmentMetadata> firstIterator =
metadataList.iterator();
+ firstIterator.next();
+ Iterator<RemoteLogSegmentMetadata> secondIterator =
metadataList.iterator();
+ secondIterator.next();
+ Iterator<RemoteLogSegmentMetadata> thirdIterator =
metadataList.iterator();
+ thirdIterator.next();
- @Test
- public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws
RemoteStorageException, IOException {
- AtomicLong logStartOffset = new AtomicLong(0);
- try (RemoteLogManager remoteLogManager = new
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
- tp -> Optional.of(mockLog),
- (topicPartition, offset) -> logStartOffset.set(offset),
- brokerTopicStats) {
- public RemoteStorageManager createRemoteStorageManager() {
- return remoteStorageManager;
- }
- public RemoteLogMetadataManager createRemoteLogMetadataManager() {
- return remoteLogMetadataManager;
- }
- }) {
- RemoteLogManager.RLMTask leaderTask = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- leaderTask.convertToLeader(0);
-
-
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
- when(mockLog.logEndOffset()).thenReturn(200L);
-
- List<EpochEntry> epochEntries =
Collections.singletonList(epochEntry0);
-
- List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024,
epochEntries);
-
-
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
- .thenReturn(remoteLogSegmentMetadatas.iterator());
-
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
- .thenReturn(remoteLogSegmentMetadatas.iterator())
- .thenReturn(remoteLogSegmentMetadatas.iterator());
-
- checkpoint.write(epochEntries);
- LeaderEpochFileCache cache = new LeaderEpochFileCache(tp,
checkpoint);
- when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
-
- Map<String, Long> logProps = new HashMap<>();
- logProps.put("retention.bytes", -1L);
- logProps.put("retention.ms", 0L);
- LogConfig mockLogConfig = new LogConfig(logProps);
- when(mockLog.config()).thenReturn(mockLogConfig);
-
-
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
- .thenAnswer(answer -> {
- // cancel the task so that we don't delete the second
segment
- leaderTask.cancel();
- return CompletableFuture.runAsync(() -> {
- });
- });
+
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
+ .thenReturn(firstIterator);
+
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition,
0))
+ .thenReturn(secondIterator)
+ .thenReturn(thirdIterator);
- leaderTask.run();
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
- assertEquals(200L, logStartOffset.get());
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
- verify(remoteStorageManager,
never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+ newLeaderTask.cleanupExpiredRemoteLogSegments();
- // test that the 2nd log segment will be deleted by the new leader
- RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new
RLMTask(followerTopicIdPartition, 128);
- newLeaderTask.convertToLeader(1);
+ assertEquals(200L, currentLogStartOffset.get());
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
+ }
- Iterator<RemoteLogSegmentMetadata> firstIterator =
remoteLogSegmentMetadatas.iterator();
- firstIterator.next();
- Iterator<RemoteLogSegmentMetadata> secondIterator =
remoteLogSegmentMetadatas.iterator();
- secondIterator.next();
- Iterator<RemoteLogSegmentMetadata> thirdIterator =
remoteLogSegmentMetadatas.iterator();
- thirdIterator.next();
+ @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach
segmentCount={0} deletableSegmentCount={1}")
+ @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
+ public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,
+ int
deletableSegmentCount)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ int recordsPerSegment = 100;
+ int segmentSize = 1024;
+ List<EpochEntry> epochEntries = Arrays.asList(
+ new EpochEntry(0, 0L),
+ new EpochEntry(1, 20L),
+ new EpochEntry(3, 50L),
+ new EpochEntry(4, 100L)
+ );
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ int currentLeaderEpoch = epochEntries.get(epochEntries.size() -
1).epoch;
+
+ long localLogSegmentsSize = 512L;
+ long retentionSize = ((long) segmentCount - deletableSegmentCount) *
segmentSize + localLogSegmentsSize;
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", -1L);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ long localLogStartOffset = (long) segmentCount * recordsPerSegment;
+ long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1;
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
-
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
- .thenReturn(firstIterator);
-
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition,
0))
- .thenReturn(secondIterator)
- .thenReturn(thirdIterator);
+ List<RemoteLogSegmentMetadata> segmentMetadataList =
listRemoteLogSegmentMetadata(
+ leaderTopicIdPartition, segmentCount, recordsPerSegment,
segmentSize, epochEntries);
+ verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount,
currentLeaderEpoch);
+ }
-
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
- .thenAnswer(answer -> CompletableFuture.runAsync(() -> {
}));
+ @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionTimeBreach
segmentCount={0} deletableSegmentCount={1}")
+ @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
+ public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount,
+ int
deletableSegmentCount)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ int recordsPerSegment = 100;
+ int segmentSize = 1024;
+ List<EpochEntry> epochEntries = Arrays.asList(
+ new EpochEntry(0, 0L),
+ new EpochEntry(1, 20L),
+ new EpochEntry(3, 50L),
+ new EpochEntry(4, 100L)
+ );
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ int currentLeaderEpoch = epochEntries.get(epochEntries.size() -
1).epoch;
+
+ long localLogSegmentsSize = 512L;
+ long retentionSize = -1L;
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", 1L);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ long localLogStartOffset = (long) segmentCount * recordsPerSegment;
+ long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1;
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
- newLeaderTask.run();
+ List<RemoteLogSegmentMetadata> segmentMetadataList =
listRemoteLogSegmentMetadataByTime(
+ leaderTopicIdPartition, segmentCount, deletableSegmentCount,
recordsPerSegment, segmentSize, epochEntries);
+ verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount,
currentLeaderEpoch);
+ }
- assertEquals(200L, logStartOffset.get());
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+ private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata>
segmentMetadataList,
+ int deletableSegmentCount,
+ int currentLeaderEpoch)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(segmentMetadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition),
anyInt()))
+ .thenAnswer(invocation -> {
+ int leaderEpoch = invocation.getArgument(1);
+ return segmentMetadataList.stream()
+ .filter(segmentMetadata ->
segmentMetadata.segmentLeaderEpochs().containsKey(leaderEpoch))
+ .iterator();
+ });
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(currentLeaderEpoch);
+ task.cleanupExpiredRemoteLogSegments();
+
+ ArgumentCaptor<RemoteLogSegmentMetadata> deletedMetadataCapture =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+ verify(remoteStorageManager,
times(deletableSegmentCount)).deleteLogSegmentData(deletedMetadataCapture.capture());
+ if (deletableSegmentCount > 0) {
+ List<RemoteLogSegmentMetadata> deletedMetadataList =
deletedMetadataCapture.getAllValues();
+ RemoteLogSegmentMetadata expectedEndMetadata =
segmentMetadataList.get(deletableSegmentCount - 1);
+ assertEquals(segmentMetadataList.get(0),
deletedMetadataList.get(0));
+ assertEquals(expectedEndMetadata,
deletedMetadataList.get(deletedMetadataList.size() - 1));
+ assertEquals(currentLogStartOffset.get(),
expectedEndMetadata.endOffset() + 1);
}
}
@@ -1718,9 +1746,22 @@ public class RemoteLogManagerTest {
int
recordsPerSegment,
int
segmentSize,
List<EpochEntry> epochEntries) {
+ return listRemoteLogSegmentMetadataByTime(
+ topicIdPartition, segmentCount, 0, recordsPerSegment,
segmentSize, epochEntries);
+ }
+
+ private List<RemoteLogSegmentMetadata>
listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition,
+
int segmentCount,
+
int deletableSegmentCount,
+
int recordsPerSegment,
+
int segmentSize,
+
List<EpochEntry> epochEntries) {
List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
for (int idx = 0; idx < segmentCount; idx++) {
long timestamp = time.milliseconds();
+ if (idx < deletableSegmentCount) {
+ timestamp = time.milliseconds() - 1;
+ }
long startOffset = (long) idx * recordsPerSegment;
long endOffset = startOffset + recordsPerSegment - 1;
List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ?
totalEpochEntries : epochEntries;