Repository: kafka Updated Branches: refs/heads/trunk f4a263b5a -> b60af34d4
MINOR: Fix producer leak in `PlaintextProducerSendTest` Author: Ismael Juma <[email protected]> Reviewers: Sriharsha Chintalapani <[email protected]>, Guozhang Wang <[email protected]> Closes #1471 from ijuma/fix-leaking-producers-in-plaintext-producer-send-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b60af34d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b60af34d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b60af34d Branch: refs/heads/trunk Commit: b60af34d4a200dbc5062ba40bfb7ffc7162e72d0 Parents: f4a263b Author: Ismael Juma <[email protected]> Authored: Sun Jun 5 19:32:51 2016 -0700 Committer: Sriharsha Chintalapani <[email protected]> Committed: Sun Jun 5 19:32:51 2016 -0700 ---------------------------------------------------------------------- .../integration/kafka/api/BaseProducerSendTest.scala | 4 ++++ .../integration/kafka/api/PlaintextProducerSendTest.scala | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 9489e70..0a2b49a 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -73,6 +73,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props) + registerProducer(producer) + } + + protected def registerProducer(producer: KafkaProducer[Array[Byte], Array[Byte]]): KafkaProducer[Array[Byte], Array[Byte]] = { producers += producer producer } http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 111bc15..55fdbe3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -40,16 +40,16 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { createNewProducerWithExplicitSerializer(brokerList) } - private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + private def createNewProducerWithNoSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = { val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + registerProducer(new KafkaProducer(producerProps)) } - private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + private def createNewProducerWithExplicitSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = { val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer) + registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)) } @Test @@ -70,7 +70,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + registerProducer(new KafkaProducer(producerProps)) } }
