Could we simplify the test to the following version? Here are the improvements 
the suggested approach may provide:

1) In general we want the test to be as efficient and concise as possible as 
long as it catches the bug. So it is probably preferred to implement the test 
without running something 100 times.

2) The test in the current patch actually tests to scenarios, i.e. log cleanup 
with concurrent topic deletion, and log cleanup with concurrent log truncation. 
In general these are independent scenarios and it may be better (and most 
consistent with existing test patterns) to separate them into two tests.

3) The current test name 
`testLogsRetentionDeletionTruncationHandlePauseAndResumeCleaning` is not very 
easy to understand. The new test names 
`testConcurrentLogCleanupAndLogTruncation` and 
`testConcurrentLogCleanupAndTopicDeletion` may be easier to understand.

BTW, if we use `TestUtils.assertConcurrent `, it is preferred to also make its 
comment more informative, e.g. use `Should support log cleanup with concurrent 
topic deletion` may be better than `Concurrent log race condition test`.


```
  @Test
  def testConcurrentLogCleanupAndLogTruncation(): Unit = {
    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
    val cleanerManager: LogCleanerManager = createCleanerManager(log)

    // LogManager.cleanup() starts
    val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
    // Log truncation happens due to unclean leader election
    cleanerManager.abortAndPauseCleaning(log.topicPartition)
    cleanerManager.resumeCleaning(Seq(log.topicPartition))
    // LogManager.cleanup() finishes and pausedPartitions are resumed
    cleanerManager.resumeCleaning(pausedPartitions.map(_._1))

    assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
  }

  @Test
  def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
    val cleanerManager: LogCleanerManager = createCleanerManager(log)

    // LogManager.cleanup() starts
    val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
    // Broker processes StopReplicaRequest with delete=true
    cleanerManager.abortCleaning(log.topicPartition)
    // LogManager.cleanup() finishes and pausedPartitions are resumed
    cleanerManager.resumeCleaning(pausedPartitions.map(_._1))

    assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
  }
```

[ Full content available at: https://github.com/apache/kafka/pull/5694 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to