This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new c9347cfd5f0 KAFKA-17062: handle dangling "copy_segment_start" state
when deleting… (#17359)
c9347cfd5f0 is described below
commit c9347cfd5f022daad6dc27591da0a684dd2140ec
Author: Guillaume Mallet <[email protected]>
AuthorDate: Fri Oct 4 07:51:05 2024 +0100
KAFKA-17062: handle dangling "copy_segment_start" state when deleting…
(#17359)
The COPY_SEGMENT_STARTED state segments are counted when calculating remote
retention size. This causes unexpected retention size breached segment
deletion. This PR fixes it by
1. only counting COPY_SEGMENT_FINISHED and DELETE_SEGMENT_STARTED state
segments when calculating remote log size.
2. During copy Segment, if we encounter errors, we will delete the
segment immediately.
3. Tests added.
Reviewers: Luke Chen <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../java/kafka/log/remote/RemoteLogManager.java | 29 ++-
.../kafka/log/remote/RemoteLogManagerTest.java | 244 +++++++++++++++++++--
3 files changed, 246 insertions(+), 29 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a7b906324af..e068b47dc66 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,7 +39,7 @@
<suppress
checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
- <suppress
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling"
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+ <suppress
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS"
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 2e897e2836c..e129da29b7c 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -871,7 +871,18 @@ public class RemoteLogManager implements Closeable {
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
- Optional<CustomMetadata> customMetadata =
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
+ Optional<CustomMetadata> customMetadata = Optional.empty();
+ try {
+ customMetadata =
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
+ } catch (RemoteStorageException e) {
+ try {
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm);
+ logger.info("Successfully cleaned segment {} after failing
to copy segment", id);
+ } catch (RemoteStorageException e1) {
+ logger.error("Error while cleaning segment {}, consider
cleaning manually", id, e1);
+ }
+ throw e;
+ }
RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
@@ -1311,10 +1322,18 @@ public class RemoteLogManager implements Closeable {
Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (segmentsIterator.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata =
segmentsIterator.next();
- RemoteLogSegmentId segmentId =
segmentMetadata.remoteLogSegmentId();
- if (!visitedSegmentIds.contains(segmentId) &&
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries))
{
- remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
- visitedSegmentIds.add(segmentId);
+ // Only count the size of "COPY_SEGMENT_FINISHED" and
"DELETE_SEGMENT_STARTED" state segments
+ // because "COPY_SEGMENT_STARTED" means copy didn't
complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
+ // Note: there might be some "COPY_SEGMENT_STARTED"
segments not counted here.
+ // Either they are being copied and will be counted
next time or they are dangling and will be cleaned elsewhere,
+ // either way, this won't cause more segment deletion.
+ if
(segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED) ||
+
segmentMetadata.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
+ RemoteLogSegmentId segmentId =
segmentMetadata.remoteLogSegmentId();
+ if (!visitedSegmentIds.contains(segmentId) &&
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries))
{
+ remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
+ visitedSegmentIds.add(segmentId);
+ }
}
}
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index eeac6bdbae6..4ddcf4a8762 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -110,6 +110,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -158,6 +159,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
@@ -664,10 +666,92 @@ public class RemoteLogManagerTest {
// The metadata update should not be posted.
verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
- // Verify the metric for remote writes are not updated.
+ // Verify the metrics
+ assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+ assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+ assertEquals(1,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+ }
+
+ @Test
+ void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+ long lastStableOffset = 150L;
+ long logEndOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(-1L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+ verify(oldSegment, times(0)).readNextOffset();
+ verify(activeSegment, times(0)).readNextOffset();
+
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+ when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false);
+
+ // throw exception when copyLogSegmentData
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class)))
+ .thenThrow(new RemoteStorageException("test"));
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(epochEntry2.epoch);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+ // verify the segment is deleted
+ verify(remoteStorageManager,
times(1)).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
+
+ // The metadata update should not be posted.
+ verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+ // Verify the metrics
assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
- // Verify we did not report any failure for remote writes
assertEquals(1,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
@@ -2157,34 +2241,16 @@ public class RemoteLogManagerTest {
.thenReturn(CompletableFuture.runAsync(() -> { }));
doAnswer(ans -> {
- assertEquals(2048,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
- String.format("Expected to find 2048 for RemoteDeleteLagBytes
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
- assertEquals(2048,
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic),
- String.format("Expected to find 2048 for
RemoteDeleteLagBytes for 'Leader' topic metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic)));
- assertEquals(2,
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
- String.format("Expected to find 2 for RemoteDeleteLagSegments
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
- assertEquals(2,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
- String.format("Expected to find 2 for
RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+ verifyRemoteDeleteMetrics(2048L, 2L);
return Optional.empty();
}).doAnswer(ans -> {
- assertEquals(1024,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
- String.format("Expected to find 1024 for RemoteDeleteLagBytes
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
- assertEquals(1,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
- String.format("Expected to find 1 for RemoteDeleteLagSegments
for 'Leader' topic metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
- assertEquals(1024,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
- String.format("Expected to find 1024 for
RemoteDeleteLagBytes metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
- assertEquals(1,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
- String.format("Expected to find 1 for
RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+ verifyRemoteDeleteMetrics(1024L, 1L);
return Optional.empty();
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- assertEquals(0L, yammerMetricValue("RemoteDeleteLagBytes"));
- assertEquals(0L, yammerMetricValue("RemoteDeleteLagSegments"));
-
- assertEquals(0L,
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic));
- assertEquals(0L,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic));
+ verifyRemoteDeleteMetrics(0L, 0L);
task.convertToLeader(0);
task.cleanupExpiredRemoteLogSegments();
@@ -2194,6 +2260,99 @@ public class RemoteLogManagerTest {
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
+ @Test
+ public void
testRemoteLogSizeRetentionShouldFilterOutCopySegmentStartState()
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ int segmentSize = 1024;
+ Map<String, Long> logProps = new HashMap<>();
+ // set the retention.bytes to 10 segment size
+ logProps.put("retention.bytes", segmentSize * 10L);
+ logProps.put("retention.ms", -1L);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(2000L);
+
+ // creating remote log metadata list:
+ // s1. One segment with "COPY_SEGMENT_STARTED" state to simulate the
segment was failing on copying to remote storage.
+ // it should be ignored for both remote log size calculation, but
get deleted in the 1st run.
+ // s2. One segment with "DELETE_SEGMENT_FINISHED" state to simulate
the remoteLogMetadataManager doesn't filter it out and returned.
+ // We should filter it out when calculating remote storage log
size and deletion
+ // s3. One segment with "DELETE_SEGMENT_STARTED" state to simulate the
segment was failing on deleting remote log.
+ // We should count it in when calculating remote storage log size.
+ // s4. Another segment with "COPY_SEGMENT_STARTED" state to simulate
the segment is copying to remote storage.
+ // The segment state will change to "COPY_SEGMENT_FINISHED" state
before checking deletion.
+ // In the 1st run, this segment should be skipped when calculating
remote storage size.
+ // In the 2nd run, we should count it in when calculating remote
storage log size.
+ // s5. 11 segments with "COPY_SEGMENT_FINISHED" state. These are
expected to be counted in when calculating remote storage log size
+ //
+ // Expected results (retention.size is 10240 (10 segments)):
+ // In the 1st run, the total remote storage size should be 1024 * 12
(s3, s5), so 2 segments (s1, s3) will be deleted
+ // due to retention size breached. s1 will be deleted even though it
is not included in size calculation. But it's fine.
+ // The segment intended to be deleted will be deleted in the next run.
+ // In the 2nd run, the total remote storage size should be 1024 * 12
(s4, s5)
+ // so 2 segments will be deleted due to retention size breached.
+ RemoteLogSegmentMetadata s1 = createRemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+ 0, 99, segmentSize, epochEntries,
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+ RemoteLogSegmentMetadata s2 = createRemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+ 0, 99, segmentSize, epochEntries,
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
+ RemoteLogSegmentMetadata s3 = createRemoteLogSegmentMetadata(new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+ 0, 99, segmentSize, epochEntries,
RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+ RemoteLogSegmentMetadata s4CopyStarted =
createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition,
Uuid.randomUuid()),
+ 200, 299, segmentSize, epochEntries,
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+ RemoteLogSegmentMetadata s4CopyFinished =
createRemoteLogSegmentMetadata(s4CopyStarted.remoteLogSegmentId(),
+ s4CopyStarted.startOffset(), s4CopyStarted.endOffset(),
segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+ List<RemoteLogSegmentMetadata> s5 =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 11, 100,
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+ List<RemoteLogSegmentMetadata> metadataList = new LinkedList<>();
+ metadataList.addAll(Arrays.asList(s1, s2, s3, s4CopyStarted));
+ metadataList.addAll(s5);
+
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+
.thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
+
doNothing().when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+
+ // RUN 1
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(epochEntry2.epoch);
+ task.cleanupExpiredRemoteLogSegments();
+ verify(remoteStorageManager,
times(2)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager).deleteLogSegmentData(s1);
+ // make sure the s2 segment with "DELETE_SEGMENT_FINISHED" state is
not invoking "deleteLogSegmentData"
+ verify(remoteStorageManager, never()).deleteLogSegmentData(s2);
+ verify(remoteStorageManager).deleteLogSegmentData(s3);
+
+ clearInvocations(remoteStorageManager);
+
+ // RUN 2
+ // update the metadata list to remove deleted s1, s3, and set the
state in s4 to COPY_SEGMENT_FINISHED
+ List<RemoteLogSegmentMetadata> updatedMetadataList = new
LinkedList<>();
+ updatedMetadataList.addAll(Arrays.asList(s2, s4CopyFinished));
+ updatedMetadataList.addAll(s5);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(updatedMetadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> updatedMetadataList.iterator());
+
+
doNothing().when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+ task.cleanupExpiredRemoteLogSegments();
+
+ // make sure 2 segments got deleted
+ verify(remoteStorageManager,
times(2)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+ verify(remoteStorageManager).deleteLogSegmentData(s4CopyFinished);
+ verify(remoteStorageManager).deleteLogSegmentData(s5.get(0));
+ }
+
@Test
public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws
RemoteStorageException, ExecutionException, InterruptedException {
RemoteLogManager.RLMTask leaderTask = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
@@ -2311,6 +2470,11 @@ public class RemoteLogManagerTest {
// Verify aggregate metrics
assertEquals(1,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
assertEquals(1,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+ // make sure we'll retry the deletion in next run
+ doNothing().when(remoteStorageManager).deleteLogSegmentData(any());
+ task.cleanupExpiredRemoteLogSegments();
+ verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
}
@ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach
segmentCount={0} deletableSegmentCount={1}")
@@ -2389,6 +2553,21 @@ public class RemoteLogManagerTest {
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount,
currentLeaderEpoch);
}
+ private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long
remoteDeleteLagSegments) {
+ assertEquals(remoteDeleteLagBytes,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+ String.format("Expected to find %d for RemoteDeleteLagBytes
metric value, but found %d",
+ remoteDeleteLagBytes,
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+ assertEquals(remoteDeleteLagSegments,
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+ String.format("Expected to find %d for RemoteDeleteLagSegments
metric value, but found %d",
+ remoteDeleteLagSegments,
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+ assertEquals(remoteDeleteLagBytes,
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic),
+ String.format("Expected to find %d for RemoteDeleteLagBytes
for 'Leader' topic metric value, but found %d",
+ remoteDeleteLagBytes,
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic)));
+ assertEquals(remoteDeleteLagSegments,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
+ String.format("Expected to find %d for RemoteDeleteLagSegments
for 'Leader' topic metric value, but found %d",
+ remoteDeleteLagSegments,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+ }
+
private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata>
segmentMetadataList,
int deletableSegmentCount,
int currentLeaderEpoch)
@@ -2524,6 +2703,25 @@ public class RemoteLogManagerTest {
return segmentMetadataList;
}
+ private RemoteLogSegmentMetadata
createRemoteLogSegmentMetadata(RemoteLogSegmentId segmentID,
+ long
startOffset,
+ long
endOffset,
+ int
segmentSize,
+
List<EpochEntry> epochEntries,
+
RemoteLogSegmentState state) {
+ return new RemoteLogSegmentMetadata(
+ segmentID,
+ startOffset,
+ endOffset,
+ time.milliseconds(),
+ brokerId,
+ time.milliseconds(),
+ segmentSize,
+ Optional.empty(),
+ state,
+ truncateAndGetLeaderEpochs(epochEntries, startOffset,
endOffset));
+ }
+
private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry>
entries,
Long startOffset,
Long endOffset) {