Could we simplify the test to the following version? Here are the improvements
the suggested approach may provide:
1) In general we want the test to be as efficient and concise as possible as
long as it catches the bug. So it is probably preferred to implement the test
without running something 100 times.
2) The test in the current patch actually tests to scenarios, i.e. log cleanup
with concurrent topic deletion, and log cleanup with concurrent log truncation.
In general these are independent scenarios and it may be better (and most
consistent with existing test patterns) to separate them into two tests.
3) The current test name
`testLogsRetentionDeletionTruncationHandlePauseAndResumeCleaning` is not very
easy to understand. The new test names
`testConcurrentLogCleanupAndLogTruncation` and
`testConcurrentLogCleanupAndTopicDeletion` may be easier to understand.
BTW, if we use `TestUtils.assertConcurrent `, it is preferred to also make its
comment more informative, e.g. use `Should support log cleanup with concurrent
topic deletion` may be better than `Concurrent log race condition test`.
```
@Test
def testConcurrentLogCleanupAndLogTruncation(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// LogManager.cleanup() starts
val pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions()
// Log truncation happens due to unclean leader election
cleanerManager.abortAndPauseCleaning(log.topicPartition)
cleanerManager.resumeCleaning(Seq(log.topicPartition))
// LogManager.cleanup() finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
}
@Test
def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// LogManager.cleanup() starts
val pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions()
// Broker processes StopReplicaRequest with delete=true
cleanerManager.abortCleaning(log.topicPartition)
// LogManager.cleanup() finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
}
```
[ Full content available at: https://github.com/apache/kafka/pull/5694 ]
This message was relayed via gitbox.apache.org for [email protected]