This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5db2f99  MINOR: Close ZooKeeperClient if waitUntilConnected fails 
during construction (#5411)
5db2f99 is described below

commit 5db2f9903a1e1d9fe574730e89aec0022333db71
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sun Jul 22 22:19:32 2018 +0530

    MINOR: Close ZooKeeperClient if waitUntilConnected fails during 
construction (#5411)
    
    This has always been an issue, but the recent upgrade to ZooKeeper
    3.4.13 means it is also an issue when an unresolvable ZK
    address is used, causing some tests to leak threads.
    
    The change in behaviour in ZK 3.4.13 is that no exception is thrown
    from the ZooKeeper constructor in case of an unresolvable address.
    Instead, ZooKeeper tries to re-resolve the address hoping it becomes
    resolvable again. We eventually throw a
    `ZooKeeperClientTimeoutException`, which is similar to the case
    where the the address is resolvable but ZooKeeper is not
    reachable.
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../src/main/scala/kafka/zookeeper/ZooKeeperClient.scala |  7 ++++++-
 .../scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 16 +++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5cb127c..97ec9a4 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -92,7 +92,12 @@ class ZooKeeperClient(connectString: String,
   metricNames += "SessionState"
 
   expiryScheduler.startup()
-  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+  try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+  catch {
+    case e: Throwable =>
+      close()
+      throw e
+  }
 
   override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
     explicitMetricName(metricGroup, metricType, name, metricTags)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fcbf699..0088c65 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -57,12 +57,22 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
   }
 
-  @Test(expected = classOf[ZooKeeperClientTimeoutException])
+  @Test
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", 
zkSessionTimeout, connectionTimeoutMs = 10,
-      Int.MaxValue, time, "testMetricGroup", "testMetricType").close()
+    try {
+      new ZooKeeperClient("some.invalid.hostname.foo.bar.local", 
zkSessionTimeout, connectionTimeoutMs = 10,
+        Int.MaxValue, time, "testMetricGroup", "testMetricType")
+    } catch {
+      case e: ZooKeeperClientTimeoutException =>
+        assertEquals("ZooKeeper client threads still running", Set.empty,  
runningZkSendThreads)
+    }
   }
 
+  private def runningZkSendThreads: collection.Set[String] = 
Thread.getAllStackTraces.keySet.asScala
+    .filter(_.isAlive)
+    .map(_.getName)
+    .filter(t => t.contains("SendThread()"))
+
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()

Reply via email to