This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5db2f99 MINOR: Close ZooKeeperClient if waitUntilConnected fails
during construction (#5411)
5db2f99 is described below
commit 5db2f9903a1e1d9fe574730e89aec0022333db71
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sun Jul 22 22:19:32 2018 +0530
MINOR: Close ZooKeeperClient if waitUntilConnected fails during
construction (#5411)
This has always been an issue, but the recent upgrade to ZooKeeper
3.4.13 means it is also an issue when an unresolvable ZK
address is used, causing some tests to leak threads.
The change in behaviour in ZK 3.4.13 is that no exception is thrown
from the ZooKeeper constructor in case of an unresolvable address.
Instead, ZooKeeper tries to re-resolve the address hoping it becomes
resolvable again. We eventually throw a
`ZooKeeperClientTimeoutException`, which is similar to the case
where the the address is resolvable but ZooKeeper is not
reachable.
Reviewers: Ismael Juma <[email protected]>
---
.../src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 7 ++++++-
.../scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 16 +++++++++++++---
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5cb127c..97ec9a4 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -92,7 +92,12 @@ class ZooKeeperClient(connectString: String,
metricNames += "SessionState"
expiryScheduler.startup()
- waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+ try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+ catch {
+ case e: Throwable =>
+ close()
+ throw e
+ }
override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
explicitMetricName(metricGroup, metricType, name, metricTags)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fcbf699..0088c65 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -57,12 +57,22 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
}
- @Test(expected = classOf[ZooKeeperClientTimeoutException])
+ @Test
def testUnresolvableConnectString(): Unit = {
- new ZooKeeperClient("some.invalid.hostname.foo.bar.local",
zkSessionTimeout, connectionTimeoutMs = 10,
- Int.MaxValue, time, "testMetricGroup", "testMetricType").close()
+ try {
+ new ZooKeeperClient("some.invalid.hostname.foo.bar.local",
zkSessionTimeout, connectionTimeoutMs = 10,
+ Int.MaxValue, time, "testMetricGroup", "testMetricType")
+ } catch {
+ case e: ZooKeeperClientTimeoutException =>
+ assertEquals("ZooKeeper client threads still running", Set.empty,
runningZkSendThreads)
+ }
}
+ private def runningZkSendThreads: collection.Set[String] =
Thread.getAllStackTraces.keySet.asScala
+ .filter(_.isAlive)
+ .map(_.getName)
+ .filter(t => t.contains("SendThread()"))
+
@Test(expected = classOf[ZooKeeperClientTimeoutException])
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()