[
https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Scott Clasen updated KAFKA-873:
-------------------------------
Status: Patch Available (was: Open)
---
core/build.sbt | 2 ++
core/src/main/scala/kafka/common/KafkaZookeperClient.scala | 12 ++++++++++--
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/core/build.sbt b/core/build.sbt
index 405ea55..9e01605 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -13,6 +13,8 @@ libraryDependencies <+= scalaVersion("org.scala-lang" %
"scala-compiler" % _ )
libraryDependencies ++= Seq(
"org.apache.zookeeper" % "zookeeper" % "3.3.4",
"com.101tec" % "zkclient" % "0.2",
+ "com.netflix.curator" % "curator-framework" % "1.3.3"
exclude("org.apache.zookeeper", "zookeeper"),
+ "com.netflix.curator" % "curator-x-zkclient-bridge" % "1.3.3"
exclude("org.apache.zookeeper", "zookeeper"),
"org.xerial.snappy" % "snappy-java" % "1.0.4.1",
"com.yammer.metrics" % "metrics-core" % "2.2.0",
"com.yammer.metrics" % "metrics-annotation" % "2.2.0",
diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
index bace1d2..aeae5c9 100644
--- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
+++ b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
@@ -20,6 +20,9 @@ package kafka.common
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, ZKConfig}
import java.util.concurrent.atomic.AtomicReference
+import com.netflix.curator.x.zkclientbridge.CuratorZKClientBridge
+import com.netflix.curator.framework.CuratorFrameworkFactory
+import com.netflix.curator.{RetrySleeper, RetryPolicy}
object KafkaZookeeperClient {
private val INSTANCE = new AtomicReference[ZkClient](null)
@@ -28,8 +31,13 @@ object KafkaZookeeperClient {
// TODO: This cannot be a singleton since unit tests break if we do that
// INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect,
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
// ZKStringSerializer))
- INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
- ZKStringSerializer))
+ val policy = new RetryPolicy {
+ def allowRetry(retryCount: Int, elapsedTimeMs: Long, sleeper:
RetrySleeper): Boolean = false
+ }
+ val curator = CuratorFrameworkFactory.newClient(config.zkConnect,
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, policy)
+ val bridge = new CuratorZKClientBridge(curator)
+ val client = new ZkClient(bridge,config.zkConnectionTimeoutMs,
ZKStringSerializer)
+ INSTANCE.set(client)
INSTANCE.get()
}
}
--
1.8.0.1
> Consider replacing zkclient with curator (with zkclient-bridge)
> ---------------------------------------------------------------
>
> Key: KAFKA-873
> URL: https://issues.apache.org/jira/browse/KAFKA-873
> Project: Kafka
> Issue Type: Improvement
> Affects Versions: 0.8
> Reporter: Scott Clasen
>
> If zkclient was replaced with curator and curator-x-zkclient-bridge it would
> be initially a drop-in replacement
> https://github.com/Netflix/curator/wiki/ZKClient-Bridge
> With the addition of a few more props to ZkConfig, and a bit of code this
> would open up the possibility of using ACLs in zookeeper (which arent
> supported directly by zkclient), as well as integrating with netflix
> exhibitor for those of us using that.
> Looks like KafkaZookeeperClient needs some love anyhow...
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira