Repository: kafka Updated Branches: refs/heads/trunk 9cac38c02 -> 9f5a1f876
KAFKA-3217: Close producers in unit tests Producers that are not closed auto-create topics in subsequent tests when Kafka server port is reused. Added missing close(). Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #882 from rajinisivaram/KAFKA-3217 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f5a1f87 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f5a1f87 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f5a1f87 Branch: refs/heads/trunk Commit: 9f5a1f87667c23db557a712d51c45541372f3c5d Parents: 9cac38c Author: Rajini Sivaram <[email protected]> Authored: Tue Feb 9 09:49:32 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Feb 9 09:49:32 2016 -0800 ---------------------------------------------------------------------- .../kafka/api/AuthorizerIntegrationTest.scala | 2 ++ .../integration/kafka/api/BaseProducerSendTest.scala | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a54cbef..db2040f 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -154,6 +154,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { @After override def tearDown() = { + producers.foreach(_.close()) + consumers.foreach(_.close()) removeAllAcls super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 29291d4..42928a3 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.errors.SerializationException import org.junit.Assert._ import org.junit.{After, Before, Test} +import scala.collection.mutable.Buffer abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -43,6 +44,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null + private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() private val topic = "topic" private val numRecords = 100 @@ -60,13 +62,18 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { override def tearDown() { consumer1.close() consumer2.close() + // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused + producers.foreach(_.close()) super.tearDown() } - private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = - TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, + private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { + val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, retries = retries, lingerMs = lingerMs, props = props) + producers += producer + producer + } /** * testSendOffset checks the basic send API behavior
