This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 5fa2a2433aa KAFKA-15802: Validate remote segment state before fetching 
index (#14727)
5fa2a2433aa is described below

commit 5fa2a2433aac430d24963a525acf9154cc88289a
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Tue Nov 14 18:28:53 2023 +0200

    KAFKA-15802: Validate remote segment state before fetching index (#14727)
    
    Reviewers: Satish Duggana <[email protected]>, Divij Vaidya 
<[email protected]>, Christo Lolov <[email protected]>, Luke Chen 
<[email protected]>, Kamal Chandraprakash<[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  12 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 125 ++++++++++++++++++---
 2 files changed, 118 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index a5a30e6d156..e6abdb3c5a1 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -514,11 +514,15 @@ public class RemoteLogManager implements Closeable {
         while (maybeEpoch.isPresent()) {
             int epoch = maybeEpoch.getAsInt();
 
+            // KAFKA-15802: Add a new API for RLMM to choose how to implement 
the predicate.
+            // currently, all segments are returned and then iterated, and 
filtered
             Iterator<RemoteLogSegmentMetadata> iterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
             while (iterator.hasNext()) {
                 RemoteLogSegmentMetadata rlsMetadata = iterator.next();
-                if (rlsMetadata.maxTimestampMs() >= timestamp && 
rlsMetadata.endOffset() >= startingOffset &&
-                        isRemoteSegmentWithinLeaderEpochs(rlsMetadata, 
unifiedLog.logEndOffset(), epochWithOffsets)) {
+                if (rlsMetadata.maxTimestampMs() >= timestamp
+                    && rlsMetadata.endOffset() >= startingOffset
+                    && isRemoteSegmentWithinLeaderEpochs(rlsMetadata, 
unifiedLog.logEndOffset(), epochWithOffsets)
+                    && 
rlsMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
                     return lookupTimestamp(rlsMetadata, timestamp, 
startingOffset);
                 }
             }
@@ -988,6 +992,10 @@ public class RemoteLogManager implements Closeable {
                         return;
                     }
                     RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+
+                    if 
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
+                        continue;
+                    }
                     if (segmentsToDelete.contains(metadata)) {
                         continue;
                     }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 8a6ba870712..424bf83921f 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -980,7 +980,7 @@ public class RemoteLogManagerTest {
         leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
         leaderEpochFileCache.assign(12, 500L);
 
-        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs);
+        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
 
         // Fetching message for timestamp `ts` will return the message with 
startOffset+1, and `ts+1` as there are no
         // messages starting with the startOffset and with `ts`.
@@ -1014,7 +1014,7 @@ public class RemoteLogManagerTest {
         leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
         leaderEpochFileCache.assign(12, 500L);
 
-        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs);
+        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
 
         // Fetch offsets for this segment returns empty as the segment epochs 
are not with in the leader epoch cache.
         Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);
@@ -1027,8 +1027,32 @@ public class RemoteLogManagerTest {
         assertEquals(Optional.empty(), maybeTimestampAndOffset3);
     }
 
+    @Test
+    void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, 
RemoteStorageException {
+        TopicPartition tp = leaderTopicIdPartition.topicPartition();
+
+        long ts = time.milliseconds();
+        long startOffset = 120;
+        int targetLeaderEpoch = 10;
+
+        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>();
+        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+        LeaderEpochFileCache leaderEpochFileCache = new 
LeaderEpochFileCache(tp, checkpoint);
+        leaderEpochFileCache.assign(4, 99L);
+        leaderEpochFileCache.assign(5, 99L);
+        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+        leaderEpochFileCache.assign(12, 500L);
+
+        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.empty(), maybeTimestampAndOffset);
+    }
+
     private void doTestFindOffsetByTimestamp(long ts, long startOffset, int 
targetLeaderEpoch,
-                                             TreeMap<Integer, Long> 
validSegmentEpochs) throws IOException, RemoteStorageException {
+                                             TreeMap<Integer, Long> 
validSegmentEpochs,
+                                             RemoteLogSegmentState state) 
throws IOException, RemoteStorageException {
         TopicPartition tp = leaderTopicIdPartition.topicPartition();
         RemoteLogSegmentId remoteLogSegmentId = new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
 
@@ -1038,6 +1062,7 @@ public class RemoteLogManagerTest {
         when(segmentMetadata.startOffset()).thenReturn(startOffset);
         when(segmentMetadata.endOffset()).thenReturn(startOffset + 2);
         
when(segmentMetadata.segmentLeaderEpochs()).thenReturn(validSegmentEpochs);
+        when(segmentMetadata.state()).thenReturn(state);
 
         File tpDir = new File(logDir, tp.toString());
         Files.createDirectory(tpDir.toPath());
@@ -1392,9 +1417,9 @@ public class RemoteLogManagerTest {
         assertNotNull(remoteLogManager.task(followerTopicIdPartition));
 
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
-                
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 
1024).iterator());
+                
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
-                
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100, 
1024).iterator());
+                
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100, 
1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
         CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
         dummyFuture.complete(null);
         when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
@@ -1534,7 +1559,7 @@ public class RemoteLogManagerTest {
         when(mockLog.logEndOffset()).thenReturn(200L);
 
         List<RemoteLogSegmentMetadata> metadataList =
-                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries);
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
         
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
                 .thenReturn(metadataList.iterator());
         
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
@@ -1562,7 +1587,7 @@ public class RemoteLogManagerTest {
         List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
 
         List<RemoteLogSegmentMetadata> metadataList =
-                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries);
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
         
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
                 .thenReturn(metadataList.iterator());
         
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
@@ -1652,7 +1677,7 @@ public class RemoteLogManagerTest {
         
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
 
         List<RemoteLogSegmentMetadata> segmentMetadataList = 
listRemoteLogSegmentMetadata(
-                leaderTopicIdPartition, segmentCount, recordsPerSegment, 
segmentSize, epochEntries);
+                leaderTopicIdPartition, segmentCount, recordsPerSegment, 
segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
         verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, 
currentLeaderEpoch);
     }
 
@@ -1689,7 +1714,7 @@ public class RemoteLogManagerTest {
         
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
 
         List<RemoteLogSegmentMetadata> segmentMetadataList = 
listRemoteLogSegmentMetadataByTime(
-                leaderTopicIdPartition, segmentCount, deletableSegmentCount, 
recordsPerSegment, segmentSize, epochEntries);
+                leaderTopicIdPartition, segmentCount, deletableSegmentCount, 
recordsPerSegment, segmentSize, epochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
         verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, 
currentLeaderEpoch);
     }
 
@@ -1723,20 +1748,76 @@ public class RemoteLogManagerTest {
         }
     }
 
+
+    @Test
+    public void testDeleteRetentionMsOnExpiredSegment() throws 
RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(0);
+
+            
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = 
Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(
+                    leaderTopicIdPartition, 2, 100, 1024, epochEntries, 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
+
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, 
checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", -1L);
+            logProps.put("retention.ms", 0L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { 
}));
+
+            task.cleanupExpiredRemoteLogSegments();
+
+            verifyNoMoreInteractions(remoteStorageManager);
+            assertEquals(0L, logStartOffset.get());
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private List<RemoteLogSegmentMetadata> 
listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                         int 
segmentCount,
                                                                         int 
recordsPerSegment,
-                                                                        int 
segmentSize) {
-        return listRemoteLogSegmentMetadata(topicIdPartition, segmentCount, 
recordsPerSegment, segmentSize, Collections.emptyList());
+                                                                        int 
segmentSize,
+                                                                        
RemoteLogSegmentState state) {
+        return listRemoteLogSegmentMetadata(topicIdPartition, segmentCount, 
recordsPerSegment, segmentSize, Collections.emptyList(), state);
     }
 
     private List<RemoteLogSegmentMetadata> 
listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                         int 
segmentCount,
                                                                         int 
recordsPerSegment,
                                                                         int 
segmentSize,
-                                                                        
List<EpochEntry> epochEntries) {
+                                                                        
List<EpochEntry> epochEntries,
+                                                                        
RemoteLogSegmentState state) {
         return listRemoteLogSegmentMetadataByTime(
-                topicIdPartition, segmentCount, 0, recordsPerSegment, 
segmentSize, epochEntries);
+                topicIdPartition, segmentCount, 0, recordsPerSegment, 
segmentSize, epochEntries, state);
     }
 
     private List<RemoteLogSegmentMetadata> 
listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition,
@@ -1744,7 +1825,8 @@ public class RemoteLogManagerTest {
                                                                               
int deletableSegmentCount,
                                                                               
int recordsPerSegment,
                                                                               
int segmentSize,
-                                                                              
List<EpochEntry> epochEntries) {
+                                                                              
List<EpochEntry> epochEntries,
+                                                                              
RemoteLogSegmentState state) {
         List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
         for (int idx = 0; idx < segmentCount; idx++) {
             long timestamp = time.milliseconds();
@@ -1755,9 +1837,18 @@ public class RemoteLogManagerTest {
             long endOffset = startOffset + recordsPerSegment - 1;
             List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ? 
totalEpochEntries : epochEntries;
             Map<Integer, Long> segmentLeaderEpochs = 
truncateAndGetLeaderEpochs(localTotalEpochEntries, startOffset, endOffset);
-            segmentMetadataList.add(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition,
-                    Uuid.randomUuid()), startOffset, endOffset, timestamp, 
brokerId, timestamp, segmentSize,
-                    segmentLeaderEpochs));
+            RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(
+                    new RemoteLogSegmentId(topicIdPartition, 
Uuid.randomUuid()),
+                    startOffset,
+                    endOffset,
+                    timestamp,
+                    brokerId,
+                    timestamp,
+                    segmentSize,
+                    Optional.empty(),
+                    state,
+                    segmentLeaderEpochs);
+            segmentMetadataList.add(metadata);
         }
         return segmentMetadataList;
     }

Reply via email to