kamalcph commented on code in PR #14349: URL: https://github.com/apache/kafka/pull/14349#discussion_r1320065640
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1508,16 +1511,153 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } + @Test + public void testDeleteRetentionSizeBreachingSegments() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + + List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", 0L); + logProps.put("retention.ms", -1L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> { + // assert that log-start-offset has been moved accordingly + // we skip the first entry as it is the local replica ensuring it has the correct log start offset + assertEquals(200, events.get(1).get(leaderTopicIdPartition.topicPartition())); + return CompletableFuture.runAsync(() -> { }); + }); + + task.run(); + + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); + } + + @Test + public void testDeleteRetentionMsBreachingSegments() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + + List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", -1L); + logProps.put("retention.ms", 0L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> { + // assert that log-start-offset has been moved accordingly + // we skip the first entry as it is the local replica ensuring it has the correct log start offset + assertEquals(200, events.get(1).get(leaderTopicIdPartition.topicPartition())); + return CompletableFuture.runAsync(() -> { }); + }); + + task.run(); + + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); + } + + @Test + public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + + List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenReturn(remoteLogSegmentMetadatas.iterator()) + .thenReturn(remoteLogSegmentMetadatas.iterator()); + + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", -1L); + logProps.put("retention.ms", 0L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> { + // assert that log-start-offset has been moved accordingly + // we skip the first entry as it is the local replica ensuring it has the correct log start offset + assertEquals(200, events.get(1).get(leaderTopicIdPartition.topicPartition())); Review Comment: yes, single AtomicLong will be sufficient. We can assert before and after calling the `task.run()` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org