Repository: kafka
Updated Branches:
  refs/heads/trunk f4a263b5a -> b60af34d4


MINOR: Fix producer leak in `PlaintextProducerSendTest`

Author: Ismael Juma <[email protected]>

Reviewers: Sriharsha Chintalapani <[email protected]>, Guozhang Wang 
<[email protected]>

Closes #1471 from ijuma/fix-leaking-producers-in-plaintext-producer-send-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b60af34d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b60af34d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b60af34d

Branch: refs/heads/trunk
Commit: b60af34d4a200dbc5062ba40bfb7ffc7162e72d0
Parents: f4a263b
Author: Ismael Juma <[email protected]>
Authored: Sun Jun 5 19:32:51 2016 -0700
Committer: Sriharsha Chintalapani <[email protected]>
Committed: Sun Jun 5 19:32:51 2016 -0700

----------------------------------------------------------------------
 .../integration/kafka/api/BaseProducerSendTest.scala      |  4 ++++
 .../integration/kafka/api/PlaintextProducerSendTest.scala | 10 +++++-----
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 9489e70..0a2b49a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -73,6 +73,10 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
   private def createProducer(brokerList: String, retries: Int = 0, lingerMs: 
Long = 0, props: Option[Properties] = None): 
KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
       saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, 
props = props)
+    registerProducer(producer)
+  }
+
+  protected def registerProducer(producer: KafkaProducer[Array[Byte], 
Array[Byte]]): KafkaProducer[Array[Byte], Array[Byte]] = {
     producers += producer
     producer
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 111bc15..55fdbe3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -40,16 +40,16 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
     createNewProducerWithExplicitSerializer(brokerList)
   }
 
-  private def createNewProducerWithNoSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+  private def createNewProducerWithNoSerializer(brokerList: String): 
KafkaProducer[Array[Byte], Array[Byte]] = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    registerProducer(new KafkaProducer(producerProps))
   }
 
-  private def createNewProducerWithExplicitSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+  private def createNewProducerWithExplicitSerializer(brokerList: String): 
KafkaProducer[Array[Byte], Array[Byte]] = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new 
ByteArraySerializer, new ByteArraySerializer)
+    registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, 
new ByteArraySerializer))
   }
 
   @Test
@@ -70,7 +70,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    registerProducer(new KafkaProducer(producerProps))
   }
 
 }

Reply via email to