This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2bb74bf  KAFKA-7979 - Clean up threads and increase timeout in 
PartitionTest (#6378)
2bb74bf is described below

commit 2bb74bfc3bc7e94c00280a7de884b95f160f8797
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Mar 6 14:13:32 2019 +0000

    KAFKA-7979 - Clean up threads and increase timeout in PartitionTest (#6378)
    
    Stack trace generated from the test failure shows that test failed even 
though threads were runnable and making progress, indicating that the timeout 
may be too small when test machine is slow. Increasing timeout from 10 to 15 
seconds, consistent with the default wait in other tests. Thread dump also 
showed a lot of left over threads from other tests, so added clean up of those 
as well.
    
    Reviewers: Ismael Juma <[email protected]>
---
 core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index cba0e38..ba53cd0 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.{ApiVersion, Request}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
+import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -57,6 +58,7 @@ class PartitionTest {
   var replicaManager: ReplicaManager = _
   var logManager: LogManager = _
   var logConfig: LogConfig = _
+  var quotaManagers: QuotaManagers = _
 
   @Before
   def setup(): Unit = {
@@ -74,9 +76,10 @@ class PartitionTest {
     brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, 
logDir2).map(_.getAbsolutePath).mkString(","))
     val brokerConfig = KafkaConfig.fromProps(brokerProps)
     val kafkaZkClient: KafkaZkClient = 
EasyMock.createMock(classOf[KafkaZkClient])
+    quotaManagers = QuotaFactory.instantiate(brokerConfig, metrics, time, "")
     replicaManager = new ReplicaManager(
       config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new 
MockScheduler(time),
-      logManager, new AtomicBoolean(false), 
QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
+      logManager, new AtomicBoolean(false), quotaManagers,
       brokerTopicStats, new MetadataCache(brokerId), new 
LogDirFailureChannel(brokerConfig.logDirs.size))
 
     EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), 
EasyMock.anyString())).andReturn(logProps).anyTimes()
@@ -103,6 +106,7 @@ class PartitionTest {
     Utils.delete(tmpDir)
     logManager.liveLogDirs.foreach(Utils.delete)
     replicaManager.shutdown(checkpointHW = false)
+    quotaManagers.shutdown()
   }
 
   @Test
@@ -996,7 +1000,7 @@ class PartitionTest {
           (1 to 10000).foreach { _ => 
partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = 
true) }
         })
       }
-      futures.foreach(_.get(10, TimeUnit.SECONDS))
+      futures.foreach(_.get(15, TimeUnit.SECONDS))
       done.set(true)
     } catch {
       case e: TimeoutException =>

Reply via email to