This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 5fa2a2433aa KAFKA-15802: Validate remote segment state before fetching
index (#14727)
5fa2a2433aa is described below
commit 5fa2a2433aac430d24963a525acf9154cc88289a
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Tue Nov 14 18:28:53 2023 +0200
KAFKA-15802: Validate remote segment state before fetching index (#14727)
Reviewers: Satish Duggana <[email protected]>, Divij Vaidya
<[email protected]>, Christo Lolov <[email protected]>, Luke Chen
<[email protected]>, Kamal Chandraprakash<[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 12 +-
.../kafka/log/remote/RemoteLogManagerTest.java | 125 ++++++++++++++++++---
2 files changed, 118 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index a5a30e6d156..e6abdb3c5a1 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -514,11 +514,15 @@ public class RemoteLogManager implements Closeable {
while (maybeEpoch.isPresent()) {
int epoch = maybeEpoch.getAsInt();
+ // KAFKA-15802: Add a new API for RLMM to choose how to implement
the predicate.
+ // currently, all segments are returned and then iterated, and
filtered
Iterator<RemoteLogSegmentMetadata> iterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (iterator.hasNext()) {
RemoteLogSegmentMetadata rlsMetadata = iterator.next();
- if (rlsMetadata.maxTimestampMs() >= timestamp &&
rlsMetadata.endOffset() >= startingOffset &&
- isRemoteSegmentWithinLeaderEpochs(rlsMetadata,
unifiedLog.logEndOffset(), epochWithOffsets)) {
+ if (rlsMetadata.maxTimestampMs() >= timestamp
+ && rlsMetadata.endOffset() >= startingOffset
+ && isRemoteSegmentWithinLeaderEpochs(rlsMetadata,
unifiedLog.logEndOffset(), epochWithOffsets)
+ &&
rlsMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
return lookupTimestamp(rlsMetadata, timestamp,
startingOffset);
}
}
@@ -988,6 +992,10 @@ public class RemoteLogManager implements Closeable {
return;
}
RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
+
+ if
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
+ continue;
+ }
if (segmentsToDelete.contains(metadata)) {
continue;
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 8a6ba870712..424bf83921f 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -980,7 +980,7 @@ public class RemoteLogManagerTest {
leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
leaderEpochFileCache.assign(12, 500L);
- doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch,
validSegmentEpochs);
+ doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch,
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// Fetching message for timestamp `ts` will return the message with
startOffset+1, and `ts+1` as there are no
// messages starting with the startOffset and with `ts`.
@@ -1014,7 +1014,7 @@ public class RemoteLogManagerTest {
leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
leaderEpochFileCache.assign(12, 500L);
- doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch,
validSegmentEpochs);
+ doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch,
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// Fetch offsets for this segment returns empty as the segment epochs
are not with in the leader epoch cache.
Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 =
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset,
leaderEpochFileCache);
@@ -1027,8 +1027,32 @@ public class RemoteLogManagerTest {
assertEquals(Optional.empty(), maybeTimestampAndOffset3);
}
+ @Test
+ void testFindOffsetByTimestampWithSegmentNotReady() throws IOException,
RemoteStorageException {
+ TopicPartition tp = leaderTopicIdPartition.topicPartition();
+
+ long ts = time.milliseconds();
+ long startOffset = 120;
+ int targetLeaderEpoch = 10;
+
+ TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>();
+ validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+ LeaderEpochFileCache leaderEpochFileCache = new
LeaderEpochFileCache(tp, checkpoint);
+ leaderEpochFileCache.assign(4, 99L);
+ leaderEpochFileCache.assign(5, 99L);
+ leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+ leaderEpochFileCache.assign(12, 500L);
+
+ doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch,
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+
+ Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset =
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset,
leaderEpochFileCache);
+ assertEquals(Optional.empty(), maybeTimestampAndOffset);
+ }
+
private void doTestFindOffsetByTimestamp(long ts, long startOffset, int
targetLeaderEpoch,
- TreeMap<Integer, Long>
validSegmentEpochs) throws IOException, RemoteStorageException {
+ TreeMap<Integer, Long>
validSegmentEpochs,
+ RemoteLogSegmentState state)
throws IOException, RemoteStorageException {
TopicPartition tp = leaderTopicIdPartition.topicPartition();
RemoteLogSegmentId remoteLogSegmentId = new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
@@ -1038,6 +1062,7 @@ public class RemoteLogManagerTest {
when(segmentMetadata.startOffset()).thenReturn(startOffset);
when(segmentMetadata.endOffset()).thenReturn(startOffset + 2);
when(segmentMetadata.segmentLeaderEpochs()).thenReturn(validSegmentEpochs);
+ when(segmentMetadata.state()).thenReturn(state);
File tpDir = new File(logDir, tp.toString());
Files.createDirectory(tpDir.toPath());
@@ -1392,9 +1417,9 @@ public class RemoteLogManagerTest {
assertNotNull(remoteLogManager.task(followerTopicIdPartition));
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
-
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100,
1024).iterator());
+
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024,
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
-
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100,
1024).iterator());
+
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100,
1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
@@ -1534,7 +1559,7 @@ public class RemoteLogManagerTest {
when(mockLog.logEndOffset()).thenReturn(200L);
List<RemoteLogSegmentMetadata> metadataList =
- listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries);
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
@@ -1562,7 +1587,7 @@ public class RemoteLogManagerTest {
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
List<RemoteLogSegmentMetadata> metadataList =
- listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries);
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
@@ -1652,7 +1677,7 @@ public class RemoteLogManagerTest {
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
List<RemoteLogSegmentMetadata> segmentMetadataList =
listRemoteLogSegmentMetadata(
- leaderTopicIdPartition, segmentCount, recordsPerSegment,
segmentSize, epochEntries);
+ leaderTopicIdPartition, segmentCount, recordsPerSegment,
segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount,
currentLeaderEpoch);
}
@@ -1689,7 +1714,7 @@ public class RemoteLogManagerTest {
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
List<RemoteLogSegmentMetadata> segmentMetadataList =
listRemoteLogSegmentMetadataByTime(
- leaderTopicIdPartition, segmentCount, deletableSegmentCount,
recordsPerSegment, segmentSize, epochEntries);
+ leaderTopicIdPartition, segmentCount, deletableSegmentCount,
recordsPerSegment, segmentSize, epochEntries,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount,
currentLeaderEpoch);
}
@@ -1723,20 +1748,76 @@ public class RemoteLogManagerTest {
}
}
+
+ @Test
+ public void testDeleteRetentionMsOnExpiredSegment() throws
RemoteStorageException, IOException {
+ AtomicLong logStartOffset = new AtomicLong(0);
+ try (RemoteLogManager remoteLogManager = new
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+ tp -> Optional.of(mockLog),
+ (topicPartition, offset) -> logStartOffset.set(offset),
+ brokerTopicStats) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ }) {
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(0);
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ List<EpochEntry> epochEntries =
Collections.singletonList(epochEntry0);
+
+ List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas =
listRemoteLogSegmentMetadata(
+ leaderTopicIdPartition, 2, 100, 1024, epochEntries,
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
+
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(remoteLogSegmentMetadatas.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenReturn(remoteLogSegmentMetadatas.iterator())
+ .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp,
checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", -1L);
+ logProps.put("retention.ms", 0L);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenAnswer(answer -> CompletableFuture.runAsync(() -> {
}));
+
+ task.cleanupExpiredRemoteLogSegments();
+
+ verifyNoMoreInteractions(remoteStorageManager);
+ assertEquals(0L, logStartOffset.get());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private List<RemoteLogSegmentMetadata>
listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int
segmentCount,
int
recordsPerSegment,
- int
segmentSize) {
- return listRemoteLogSegmentMetadata(topicIdPartition, segmentCount,
recordsPerSegment, segmentSize, Collections.emptyList());
+ int
segmentSize,
+
RemoteLogSegmentState state) {
+ return listRemoteLogSegmentMetadata(topicIdPartition, segmentCount,
recordsPerSegment, segmentSize, Collections.emptyList(), state);
}
private List<RemoteLogSegmentMetadata>
listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int
segmentCount,
int
recordsPerSegment,
int
segmentSize,
-
List<EpochEntry> epochEntries) {
+
List<EpochEntry> epochEntries,
+
RemoteLogSegmentState state) {
return listRemoteLogSegmentMetadataByTime(
- topicIdPartition, segmentCount, 0, recordsPerSegment,
segmentSize, epochEntries);
+ topicIdPartition, segmentCount, 0, recordsPerSegment,
segmentSize, epochEntries, state);
}
private List<RemoteLogSegmentMetadata>
listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition,
@@ -1744,7 +1825,8 @@ public class RemoteLogManagerTest {
int deletableSegmentCount,
int recordsPerSegment,
int segmentSize,
-
List<EpochEntry> epochEntries) {
+
List<EpochEntry> epochEntries,
+
RemoteLogSegmentState state) {
List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
for (int idx = 0; idx < segmentCount; idx++) {
long timestamp = time.milliseconds();
@@ -1755,9 +1837,18 @@ public class RemoteLogManagerTest {
long endOffset = startOffset + recordsPerSegment - 1;
List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ?
totalEpochEntries : epochEntries;
Map<Integer, Long> segmentLeaderEpochs =
truncateAndGetLeaderEpochs(localTotalEpochEntries, startOffset, endOffset);
- segmentMetadataList.add(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(topicIdPartition,
- Uuid.randomUuid()), startOffset, endOffset, timestamp,
brokerId, timestamp, segmentSize,
- segmentLeaderEpochs));
+ RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(
+ new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()),
+ startOffset,
+ endOffset,
+ timestamp,
+ brokerId,
+ timestamp,
+ segmentSize,
+ Optional.empty(),
+ state,
+ segmentLeaderEpochs);
+ segmentMetadataList.add(metadata);
}
return segmentMetadataList;
}