clolov commented on code in PR #14349: URL: https://github.com/apache/kafka/pull/14349#discussion_r1321614726
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1508,16 +1509,184 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } + @Test + public void testDeleteRetentionSizeBreachingSegments() throws RemoteStorageException, IOException { + AtomicLong logStartOffset = new AtomicLong(0); + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> logStartOffset.set(offset), + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + }) { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + + List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", 0L); + logProps.put("retention.ms", -1L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); + + task.run(); + + assertEquals(200L, logStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); + } + } + + @Test + public void testDeleteRetentionMsBreachingSegments() throws RemoteStorageException, IOException { + AtomicLong logStartOffset = new AtomicLong(0); + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> logStartOffset.set(offset), + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + }) { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + + List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", -1L); + logProps.put("retention.ms", 0L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); + + task.run(); + + assertEquals(200L, logStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); + } + } + + @Test + public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, IOException { + AtomicLong logStartOffset = new AtomicLong(0); + try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> logStartOffset.set(offset), + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + }) { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + + List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", -1L); + logProps.put("retention.ms", 0L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> { + // cancel the task so that we don't delete the second segment + task.cancel(); + return CompletableFuture.runAsync(() -> { + }); + }); + + task.run(); + + assertEquals(200L, logStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); + verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); Review Comment: This is a great idea, apologies for not doing it myself. Hopefully the next commit addresses this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org