This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new f7e56c6 KAFKA-7979 - Clean up threads and increase timeout in
PartitionTest (#6378)
f7e56c6 is described below
commit f7e56c64fc9a6232cc33f119b7eaf8de4559c311
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Mar 6 14:13:32 2019 +0000
KAFKA-7979 - Clean up threads and increase timeout in PartitionTest (#6378)
Stack trace generated from the test failure shows that test failed even
though threads were runnable and making progress, indicating that the timeout
may be too small when test machine is slow. Increasing timeout from 10 to 15
seconds, consistent with the default wait in other tests. Thread dump also
showed a lot of left over threads from other tests, so added clean up of those
as well.
Reviewers: Ismael Juma <[email protected]>
---
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1d2b71c..540be03 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.{ApiVersion, Request}
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
+import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._
import kafka.utils._
import kafka.zk.KafkaZkClient
@@ -57,6 +58,7 @@ class PartitionTest {
var replicaManager: ReplicaManager = _
var logManager: LogManager = _
var logConfig: LogConfig = _
+ var quotaManagers: QuotaManagers = _
@Before
def setup(): Unit = {
@@ -74,9 +76,10 @@ class PartitionTest {
brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1,
logDir2).map(_.getAbsolutePath).mkString(","))
val brokerConfig = KafkaConfig.fromProps(brokerProps)
val kafkaZkClient: KafkaZkClient =
EasyMock.createMock(classOf[KafkaZkClient])
+ quotaManagers = QuotaFactory.instantiate(brokerConfig, metrics, time, "")
replicaManager = new ReplicaManager(
config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new
MockScheduler(time),
- logManager, new AtomicBoolean(false),
QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
+ logManager, new AtomicBoolean(false), quotaManagers,
brokerTopicStats, new MetadataCache(brokerId), new
LogDirFailureChannel(brokerConfig.logDirs.size))
EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(),
EasyMock.anyString())).andReturn(logProps).anyTimes()
@@ -103,6 +106,7 @@ class PartitionTest {
Utils.delete(tmpDir)
logManager.liveLogDirs.foreach(Utils.delete)
replicaManager.shutdown(checkpointHW = false)
+ quotaManagers.shutdown()
}
@Test
@@ -996,7 +1000,7 @@ class PartitionTest {
(1 to 10000).foreach { _ =>
partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient =
true) }
})
}
- futures.foreach(_.get(10, TimeUnit.SECONDS))
+ futures.foreach(_.get(15, TimeUnit.SECONDS))
done.set(true)
} catch {
case e: TimeoutException =>