Repository: kafka
Updated Branches:
  refs/heads/trunk 435e5e196 -> aaca1b478


KAFKA-5439; Verify that no unexpected threads are left behind in tests

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3314 from rajinisivaram/KAFKA-5439


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aaca1b47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aaca1b47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aaca1b47

Branch: refs/heads/trunk
Commit: aaca1b478fcce99860e2eb89aa468ff863458572
Parents: 435e5e1
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Tue Jun 13 13:47:41 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Tue Jun 13 13:47:41 2017 +0100

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  3 +-
 .../kafka/clients/producer/KafkaProducer.java   |  3 +-
 .../controller/ControllerEventManager.scala     |  5 +-
 .../kafka/api/ConsumerBounceTest.scala          |  4 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    | 54 +++++++++++++++++++-
 5 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index aa3807e..d36f711 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -93,6 +93,7 @@ import org.apache.kafka.common.errors.InterruptException;
 public abstract class AbstractCoordinator implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(AbstractCoordinator.class);
+    public static final String HEARTBEAT_THREAD_PREFIX = 
"kafka-coordinator-heartbeat-thread";
 
     private enum MemberState {
         UNJOINED,    // the client is not part of a group
@@ -863,7 +864,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         private AtomicReference<RuntimeException> failed = new 
AtomicReference<>(null);
 
         private HeartbeatThread() {
-            super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? 
"" : " | " + groupId), true);
+            super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + 
groupId), true);
         }
 
         public void enable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9e84a31..1be30f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -148,6 +148,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaProducer.class);
     private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.producer";
+    public static final String NETWORK_THREAD_PREFIX = 
"kafka-producer-network-thread";
 
     private String clientId;
     // Visible for testing
@@ -322,7 +323,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                     this.transactionManager,
                     apiVersions);
-            String ioThreadName = "kafka-producer-network-thread" + 
(clientId.length() > 0 ? " | " + clientId : "");
+            String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 
0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
             this.errors = this.metrics.sensor("errors");

http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala 
b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 3c0da23..f7ed54e 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -24,13 +24,16 @@ import scala.collection._
 import kafka.metrics.KafkaTimer
 import kafka.utils.ShutdownableThread
 
+object ControllerEventManager {
+  val ControllerEventThreadName = "controller-event-thread"
+}
 class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, 
KafkaTimer],
                              eventProcessedListener: ControllerEvent => Unit) {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
 
   private val queue = new LinkedBlockingQueue[ControllerEvent]
-  private val thread = new ControllerEventThread("controller-event-thread")
+  private val thread = new 
ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
 
   def state: ControllerState = _state
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 4057ccf..dc51d67 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -348,7 +348,9 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
 
   private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], 
Array[Byte]] = {
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-    createNewConsumer
+    val consumer = createNewConsumer
+    consumers += consumer
+    consumer
   }
 
   private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, 
numRecords: Int) : KafkaConsumer[Array[Byte], Array[Byte]] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala 
b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a250633..0a7e631 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,13 +19,20 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import org.junit.{After, Before}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import org.junit.{After, AfterClass, Before, BeforeClass}
+import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
 
+import scala.collection.Set
+import scala.collection.JavaConverters._
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
+import kafka.controller.ControllerEventManager
+
 @Category(Array(classOf[IntegrationTest]))
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
@@ -54,3 +61,46 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with 
Logging {
     Configuration.setConfiguration(null)
   }
 }
+
+object ZooKeeperTestHarness {
+  val ZkClientEventThreadPrefix = "ZkClient-EventThread"
+
+  // Threads which may cause transient failures in subsequent tests if not 
shutdown.
+  // These include threads which make connections to brokers and may cause 
issues
+  // when broker ports are reused (e.g. auto-create topics) as well as threads
+  // which reset static JAAS configuration.
+  val unexpectedThreadNames = 
Set(ControllerEventManager.ControllerEventThreadName,
+                                  KafkaProducer.NETWORK_THREAD_PREFIX,
+                                  AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+                                  ZkClientEventThreadPrefix)
+
+  /**
+   * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't 
left behind an unexpected thread.
+   * This assumes that brokers, ZooKeeper clients, producers and consumers are 
not created in another @BeforeClass,
+   * which is true for core tests where this harness is used.
+   */
+  @BeforeClass
+  def setUpClass() {
+    verifyNoUnexpectedThreads()
+  }
+
+  /**
+   * Verify that tests from the current test class using ZooKeeperTestHarness 
haven't left behind an unexpected thread
+   */
+  @AfterClass
+  def tearDownClass() {
+    verifyNoUnexpectedThreads()
+  }
+
+  /**
+   * Verifies that threads which are known to cause transient failures in 
subsequent tests
+   * have been shutdown.
+   */
+  def verifyNoUnexpectedThreads() {
+    def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => 
thread.getName)
+    val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { 
threads =>
+      threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
+    }
+    assertTrue(s"Found unexpected threads, allThreads=$threads", noUnexpected)
+  }
+}

Reply via email to