This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new c9347cfd5f0 KAFKA-17062: handle dangling "copy_segment_start" state 
when deleting… (#17359)
c9347cfd5f0 is described below

commit c9347cfd5f022daad6dc27591da0a684dd2140ec
Author: Guillaume Mallet <[email protected]>
AuthorDate: Fri Oct 4 07:51:05 2024 +0100

    KAFKA-17062: handle dangling "copy_segment_start" state when deleting… 
(#17359)
    
    The COPY_SEGMENT_STARTED state segments are counted when calculating remote 
retention size. This causes unexpected retention size breached segment 
deletion. This PR fixes it by
      1. only counting COPY_SEGMENT_FINISHED and DELETE_SEGMENT_STARTED state 
segments when calculating remote log size.
      2. During copy Segment, if we encounter errors, we will delete the 
segment immediately.
      3. Tests added.
    
    Reviewers: Luke Chen <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../java/kafka/log/remote/RemoteLogManager.java    |  29 ++-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 244 +++++++++++++++++++--
 3 files changed, 246 insertions(+), 29 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a7b906324af..e068b47dc66 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,7 +39,7 @@
     <suppress 
checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" 
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
-    <suppress 
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" 
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+    <suppress 
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS"
 files="(RemoteLogManager|RemoteLogManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity" 
files="RemoteLogManagerTest.java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 2e897e2836c..e129da29b7c 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -871,7 +871,18 @@ public class RemoteLogManager implements Closeable {
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
-            Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
+            Optional<CustomMetadata> customMetadata = Optional.empty();
+            try {
+                customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
+            } catch (RemoteStorageException e) {
+                try {
+                    
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm);
+                    logger.info("Successfully cleaned segment {} after failing 
to copy segment", id);
+                } catch (RemoteStorageException e1) {
+                    logger.error("Error while cleaning segment {}, consider 
cleaning manually", id, e1);
+                }
+                throw e;
+            }
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
                     customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
@@ -1311,10 +1322,18 @@ public class RemoteLogManager implements Closeable {
                     Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
                     while (segmentsIterator.hasNext()) {
                         RemoteLogSegmentMetadata segmentMetadata = 
segmentsIterator.next();
-                        RemoteLogSegmentId segmentId = 
segmentMetadata.remoteLogSegmentId();
-                        if (!visitedSegmentIds.contains(segmentId) && 
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) 
{
-                            remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
-                            visitedSegmentIds.add(segmentId);
+                        // Only count the size of "COPY_SEGMENT_FINISHED" and 
"DELETE_SEGMENT_STARTED" state segments
+                        // because "COPY_SEGMENT_STARTED" means copy didn't 
complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
+                        // Note: there might be some "COPY_SEGMENT_STARTED" 
segments not counted here.
+                        // Either they are being copied and will be counted 
next time or they are dangling and will be cleaned elsewhere,
+                        // either way, this won't cause more segment deletion.
+                        if 
(segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED) ||
+                                
segmentMetadata.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
+                            RemoteLogSegmentId segmentId = 
segmentMetadata.remoteLogSegmentId();
+                            if (!visitedSegmentIds.contains(segmentId) && 
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) 
{
+                                remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
+                                visitedSegmentIds.add(segmentId);
+                            }
                         }
                     }
                 }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index eeac6bdbae6..4ddcf4a8762 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -110,6 +110,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -158,6 +159,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
@@ -664,10 +666,92 @@ public class RemoteLogManagerTest {
         // The metadata update should not be posted.
         verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
 
-        // Verify the metric for remote writes are not updated.
+        // Verify the metrics
+        assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+        assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+    }
+
+    @Test
+    void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+        long lastStableOffset = 150L;
+        long logEndOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(-1L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+        verify(oldSegment, times(0)).readNextOffset();
+        verify(activeSegment, times(0)).readNextOffset();
+
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false);
+
+        // throw exception when copyLogSegmentData
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class)))
+                .thenThrow(new RemoteStorageException("test"));
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(epochEntry2.epoch);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = 
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+        
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+        // verify the segment is deleted
+        verify(remoteStorageManager, 
times(1)).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
+
+        // The metadata update should not be posted.
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+        // Verify the metrics
         assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
         assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
-        // Verify we did not report any failure for remote writes
         assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
         // Verify aggregate metrics
         assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
@@ -2157,34 +2241,16 @@ public class RemoteLogManagerTest {
                 .thenReturn(CompletableFuture.runAsync(() -> { }));
 
         doAnswer(ans -> {
-            assertEquals(2048, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
-                String.format("Expected to find 2048 for RemoteDeleteLagBytes 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
-            assertEquals(2048, 
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic),
-                    String.format("Expected to find 2048 for 
RemoteDeleteLagBytes for 'Leader' topic metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic)));
-            assertEquals(2, 
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
-                String.format("Expected to find 2 for RemoteDeleteLagSegments 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
-            assertEquals(2, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
-                    String.format("Expected to find 2 for 
RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+            verifyRemoteDeleteMetrics(2048L, 2L);
             return Optional.empty();
         }).doAnswer(ans -> {
-            assertEquals(1024, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
-                String.format("Expected to find 1024 for RemoteDeleteLagBytes 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
-            assertEquals(1, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
-                String.format("Expected to find 1 for RemoteDeleteLagSegments 
for 'Leader' topic metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
-            assertEquals(1024, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
-                    String.format("Expected to find 1024 for 
RemoteDeleteLagBytes metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
-            assertEquals(1, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
-                    String.format("Expected to find 1 for 
RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+            verifyRemoteDeleteMetrics(1024L, 1L);
             return Optional.empty();
         
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
 
         RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
 
-        assertEquals(0L, yammerMetricValue("RemoteDeleteLagBytes"));
-        assertEquals(0L, yammerMetricValue("RemoteDeleteLagSegments"));
-
-        assertEquals(0L, 
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic));
-        assertEquals(0L, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic));
+        verifyRemoteDeleteMetrics(0L, 0L);
 
         task.convertToLeader(0);
         task.cleanupExpiredRemoteLogSegments();
@@ -2194,6 +2260,99 @@ public class RemoteLogManagerTest {
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
+    @Test
+    public void 
testRemoteLogSizeRetentionShouldFilterOutCopySegmentStartState()
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        int segmentSize = 1024;
+        Map<String, Long> logProps = new HashMap<>();
+        // set the retention.bytes to 10 segment size
+        logProps.put("retention.bytes", segmentSize * 10L);
+        logProps.put("retention.ms", -1L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(2000L);
+
+        // creating remote log metadata list:
+        // s1. One segment with "COPY_SEGMENT_STARTED" state to simulate the 
segment was failing on copying to remote storage.
+        //     it should be ignored for both remote log size calculation, but 
get deleted in the 1st run.
+        // s2. One segment with "DELETE_SEGMENT_FINISHED" state to simulate 
the remoteLogMetadataManager doesn't filter it out and returned.
+        //     We should filter it out when calculating remote storage log 
size and deletion
+        // s3. One segment with "DELETE_SEGMENT_STARTED" state to simulate the 
segment was failing on deleting remote log.
+        //     We should count it in when calculating remote storage log size.
+        // s4. Another segment with "COPY_SEGMENT_STARTED" state to simulate 
the segment is copying to remote storage.
+        //     The segment state will change to "COPY_SEGMENT_FINISHED" state 
before checking deletion.
+        //     In the 1st run, this segment should be skipped when calculating 
remote storage size.
+        //     In the 2nd run, we should count it in when calculating remote 
storage log size.
+        // s5. 11 segments with "COPY_SEGMENT_FINISHED" state. These are 
expected to be counted in when calculating remote storage log size
+        //
+        // Expected results (retention.size is 10240 (10 segments)):
+        // In the 1st run, the total remote storage size should be 1024 * 12 
(s3, s5), so 2 segments (s1, s3) will be deleted
+        // due to retention size breached. s1 will be deleted even though it 
is not included in size calculation. But it's fine.
+        // The segment intended to be deleted will be deleted in the next run.
+        // In the 2nd run, the total remote storage size should be 1024 * 12 
(s4, s5)
+        // so 2 segments will be deleted due to retention size breached.
+        RemoteLogSegmentMetadata s1 = createRemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                0, 99, segmentSize, epochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+        RemoteLogSegmentMetadata s2 = createRemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                0, 99, segmentSize, epochEntries, 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
+        RemoteLogSegmentMetadata s3 = createRemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                0, 99, segmentSize, epochEntries, 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+        RemoteLogSegmentMetadata s4CopyStarted = 
createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+                200, 299, segmentSize, epochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+        RemoteLogSegmentMetadata s4CopyFinished = 
createRemoteLogSegmentMetadata(s4CopyStarted.remoteLogSegmentId(),
+                s4CopyStarted.startOffset(), s4CopyStarted.endOffset(), 
segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        List<RemoteLogSegmentMetadata> s5 =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 11, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        List<RemoteLogSegmentMetadata> metadataList = new LinkedList<>();
+        metadataList.addAll(Arrays.asList(s1, s2, s3, s4CopyStarted));
+        metadataList.addAll(s5);
+
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                
.thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
+        
doNothing().when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+
+        // RUN 1
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(epochEntry2.epoch);
+        task.cleanupExpiredRemoteLogSegments();
+        verify(remoteStorageManager, 
times(2)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager).deleteLogSegmentData(s1);
+        // make sure the s2 segment with "DELETE_SEGMENT_FINISHED" state is 
not invoking "deleteLogSegmentData"
+        verify(remoteStorageManager, never()).deleteLogSegmentData(s2);
+        verify(remoteStorageManager).deleteLogSegmentData(s3);
+
+        clearInvocations(remoteStorageManager);
+
+        // RUN 2
+        // update the metadata list to remove deleted s1, s3, and set the 
state in s4 to COPY_SEGMENT_FINISHED
+        List<RemoteLogSegmentMetadata> updatedMetadataList = new 
LinkedList<>();
+        updatedMetadataList.addAll(Arrays.asList(s2, s4CopyFinished));
+        updatedMetadataList.addAll(s5);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(updatedMetadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> updatedMetadataList.iterator());
+
+        
doNothing().when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+        task.cleanupExpiredRemoteLogSegments();
+
+        // make sure 2 segments got deleted
+        verify(remoteStorageManager, 
times(2)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+        verify(remoteStorageManager).deleteLogSegmentData(s4CopyFinished);
+        verify(remoteStorageManager).deleteLogSegmentData(s5.get(0));
+    }
+
     @Test
     public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws 
RemoteStorageException, ExecutionException, InterruptedException {
         RemoteLogManager.RLMTask leaderTask = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
@@ -2311,6 +2470,11 @@ public class RemoteLogManagerTest {
         // Verify aggregate metrics
         assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
         assertEquals(1, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+        // make sure we'll retry the deletion in next run
+        doNothing().when(remoteStorageManager).deleteLogSegmentData(any());
+        task.cleanupExpiredRemoteLogSegments();
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
     }
 
     @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach 
segmentCount={0} deletableSegmentCount={1}")
@@ -2389,6 +2553,21 @@ public class RemoteLogManagerTest {
         verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, 
currentLeaderEpoch);
     }
 
+    private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long 
remoteDeleteLagSegments) {
+        assertEquals(remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+                String.format("Expected to find %d for RemoteDeleteLagBytes 
metric value, but found %d",
+                        remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+        assertEquals(remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+                String.format("Expected to find %d for RemoteDeleteLagSegments 
metric value, but found %d",
+                        remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+        assertEquals(remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic),
+                String.format("Expected to find %d for RemoteDeleteLagBytes 
for 'Leader' topic metric value, but found %d",
+                        remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic)));
+        assertEquals(remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
+                String.format("Expected to find %d for RemoteDeleteLagSegments 
for 'Leader' topic metric value, but found %d",
+                        remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+    }
+
     private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> 
segmentMetadataList,
                                         int deletableSegmentCount,
                                         int currentLeaderEpoch)
@@ -2524,6 +2703,25 @@ public class RemoteLogManagerTest {
         return segmentMetadataList;
     }
 
+    private RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(RemoteLogSegmentId segmentID,
+                                                                    long 
startOffset,
+                                                                    long 
endOffset,
+                                                                    int 
segmentSize,
+                                                                    
List<EpochEntry> epochEntries,
+                                                                    
RemoteLogSegmentState state) {
+        return new RemoteLogSegmentMetadata(
+                segmentID,
+                startOffset,
+                endOffset,
+                time.milliseconds(),
+                brokerId,
+                time.milliseconds(),
+                segmentSize,
+                Optional.empty(),
+                state,
+                truncateAndGetLeaderEpochs(epochEntries, startOffset, 
endOffset));
+    }
+
     private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> 
entries,
                                                           Long startOffset,
                                                           Long endOffset) {

Reply via email to