Repository: kafka Updated Branches: refs/heads/trunk 435e5e196 -> aaca1b478
KAFKA-5439; Verify that no unexpected threads are left behind in tests Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3314 from rajinisivaram/KAFKA-5439 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aaca1b47 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aaca1b47 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aaca1b47 Branch: refs/heads/trunk Commit: aaca1b478fcce99860e2eb89aa468ff863458572 Parents: 435e5e1 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Tue Jun 13 13:47:41 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Tue Jun 13 13:47:41 2017 +0100 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 3 +- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../controller/ControllerEventManager.scala | 5 +- .../kafka/api/ConsumerBounceTest.scala | 4 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 54 +++++++++++++++++++- 5 files changed, 63 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index aa3807e..d36f711 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -93,6 +93,7 @@ import org.apache.kafka.common.errors.InterruptException; public abstract class AbstractCoordinator implements Closeable { private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class); + public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread"; private enum MemberState { UNJOINED, // the client is not part of a group @@ -863,7 +864,7 @@ public abstract class AbstractCoordinator implements Closeable { private AtomicReference<RuntimeException> failed = new AtomicReference<>(null); private HeartbeatThread() { - super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId), true); + super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true); } public void enable() { http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 9e84a31..1be30f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -148,6 +148,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; + public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; private String clientId; // Visible for testing @@ -322,7 +323,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); - String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); + String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); this.errors = this.metrics.sensor("errors"); http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/main/scala/kafka/controller/ControllerEventManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala index 3c0da23..f7ed54e 100644 --- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala @@ -24,13 +24,16 @@ import scala.collection._ import kafka.metrics.KafkaTimer import kafka.utils.ShutdownableThread +object ControllerEventManager { + val ControllerEventThreadName = "controller-event-thread" +} class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer], eventProcessedListener: ControllerEvent => Unit) { @volatile private var _state: ControllerState = ControllerState.Idle private val queue = new LinkedBlockingQueue[ControllerEvent] - private val thread = new ControllerEventThread("controller-event-thread") + private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName) def state: ControllerState = _state http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 4057ccf..dc51d67 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -348,7 +348,9 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - createNewConsumer + val consumer = createNewConsumer + consumers += consumer + consumer } private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int) : KafkaConsumer[Array[Byte], Array[Byte]] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index a250633..0a7e631 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,13 +19,20 @@ package kafka.zk import javax.security.auth.login.Configuration -import kafka.utils.{CoreUtils, Logging, ZkUtils} -import org.junit.{After, Before} +import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} +import org.junit.{After, AfterClass, Before, BeforeClass} +import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.test.IntegrationTest import org.junit.experimental.categories.Category +import scala.collection.Set +import scala.collection.JavaConverters._ +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.consumer.internals.AbstractCoordinator +import kafka.controller.ControllerEventManager + @Category(Array(classOf[IntegrationTest])) abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { @@ -54,3 +61,46 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { Configuration.setConfiguration(null) } } + +object ZooKeeperTestHarness { + val ZkClientEventThreadPrefix = "ZkClient-EventThread" + + // Threads which may cause transient failures in subsequent tests if not shutdown. + // These include threads which make connections to brokers and may cause issues + // when broker ports are reused (e.g. auto-create topics) as well as threads + // which reset static JAAS configuration. + val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName, + KafkaProducer.NETWORK_THREAD_PREFIX, + AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, + ZkClientEventThreadPrefix) + + /** + * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread. + * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass, + * which is true for core tests where this harness is used. + */ + @BeforeClass + def setUpClass() { + verifyNoUnexpectedThreads() + } + + /** + * Verify that tests from the current test class using ZooKeeperTestHarness haven't left behind an unexpected thread + */ + @AfterClass + def tearDownClass() { + verifyNoUnexpectedThreads() + } + + /** + * Verifies that threads which are known to cause transient failures in subsequent tests + * have been shutdown. + */ + def verifyNoUnexpectedThreads() { + def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName) + val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads => + threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s))) + } + assertTrue(s"Found unexpected threads, allThreads=$threads", noUnexpected) + } +}