This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 4a00b79 MINOR: Close ZooKeeperClient if waitUntilConnected fails
during construction (#5411)
4a00b79 is described below
commit 4a00b79d12761bd55fc01b1be2d9c60788945233
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sun Jul 22 22:19:32 2018 +0530
MINOR: Close ZooKeeperClient if waitUntilConnected fails during
construction (#5411)
This has always been an issue, but the recent upgrade to ZooKeeper
3.4.13 means it is also an issue when an unresolvable ZK
address is used, causing some tests to leak threads.
The change in behaviour in ZK 3.4.13 is that no exception is thrown
from the ZooKeeper constructor in case of an unresolvable address.
Instead, ZooKeeper tries to re-resolve the address hoping it becomes
resolvable again. We eventually throw a
`ZooKeeperClientTimeoutException`, which is similar to the case
where the the address is resolvable but ZooKeeper is not
reachable.
Reviewers: Ismael Juma <[email protected]>
---
.../src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 7 ++++++-
.../scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 16 +++++++++++++---
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5cb127c..97ec9a4 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -92,7 +92,12 @@ class ZooKeeperClient(connectString: String,
metricNames += "SessionState"
expiryScheduler.startup()
- waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+ try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+ catch {
+ case e: Throwable =>
+ close()
+ throw e
+ }
override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
explicitMetricName(metricGroup, metricType, name, metricTags)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fcbf699..0088c65 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -57,12 +57,22 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
}
- @Test(expected = classOf[ZooKeeperClientTimeoutException])
+ @Test
def testUnresolvableConnectString(): Unit = {
- new ZooKeeperClient("some.invalid.hostname.foo.bar.local",
zkSessionTimeout, connectionTimeoutMs = 10,
- Int.MaxValue, time, "testMetricGroup", "testMetricType").close()
+ try {
+ new ZooKeeperClient("some.invalid.hostname.foo.bar.local",
zkSessionTimeout, connectionTimeoutMs = 10,
+ Int.MaxValue, time, "testMetricGroup", "testMetricType")
+ } catch {
+ case e: ZooKeeperClientTimeoutException =>
+ assertEquals("ZooKeeper client threads still running", Set.empty,
runningZkSendThreads)
+ }
}
+ private def runningZkSendThreads: collection.Set[String] =
Thread.getAllStackTraces.keySet.asScala
+ .filter(_.isAlive)
+ .map(_.getName)
+ .filter(t => t.contains("SendThread()"))
+
@Test(expected = classOf[ZooKeeperClientTimeoutException])
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()