[ 
https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300836#comment-16300836
 ] 

ASF GitHub Bot commented on KAFKA-873:
--------------------------------------

ijuma closed pull request #53: KAFKA-873, KAFKA-2079: curator + exhibitor 
integration
URL: https://github.com/apache/kafka/pull/53
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 99b32a6770e..4c2a5bbb20c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,4 +20,6 @@ TAGS
 .settings
 .gradle
 kafka.ipr
-kafka.iws
\ No newline at end of file
+kafka.iws
+core/data
+gradle/wrapper
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 7982fe7308f..61258edb472 100644
--- a/build.gradle
+++ b/build.gradle
@@ -205,7 +205,9 @@ project(':core') {
     compile project(':clients')
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
-    compile 'com.101tec:zkclient:0.3'
+    compile 'com.101tec:zkclient:0.4'
+    compile 'org.apache.curator:curator-framework:2.7.1'
+    compile 'com.netflix.curator:curator-x-zkclient-bridge:3.0.0'
     compile 'com.yammer.metrics:metrics-core:2.2.0'
     compile 'net.sf.jopt-simple:jopt-simple:3.2'
 
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 3e1718bc7ca..dd9dcd175a9 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -167,8 +167,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
   }
 
   private def connectZk() {
-    info("Connecting to zookeeper instance at " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, 
config.zkConnectionTimeoutMs, ZKStringSerializer)
+    info("Connecting to zookeeper instance at " + config.zkConnect +
+      (if (!config.exhibitorHosts.isEmpty) " (" + config.exhibitorHosts + ")" 
else ""))
+    zkClient = ZkUtils.bridgeCurator(config, ZkUtils.makeCuratorClient(config))
   }
 
   // Blocks until the offset manager is located and a channel is established 
to it.
@@ -489,6 +490,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
       // The child change watchers will be set inside rebalance when we read 
the children list.
     }
 
+    @throws(classOf[Exception])
+    def handleSessionEstablishmentError(error: Throwable): Unit = {
+    }
   }
 
   class ZKTopicPartitionChangeListener(val loadBalancerListener: 
ZKRebalancerListener)
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index 38f4ec0bd1b..34e6348fdd4 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -93,6 +93,10 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
         }
       }
     }
+
+    @throws(classOf[Exception])
+    def handleSessionEstablishmentError(error: Throwable): Unit = {
+    }
   }
 }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 66df6d2fbdb..bd6ed2a3b79 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1109,6 +1109,10 @@ class KafkaController(val config : KafkaConfig, 
zkClient: ZkClient, val brokerSt
         controllerElector.elect
       }
     }
+
+    @throws(classOf[Exception])
+    def handleSessionEstablishmentError(error: Throwable): Unit = {
+    }
   }
 
   private def checkAndTriggerPartitionRebalance(): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 7907987e434..30e3eac0d8d 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -82,6 +82,10 @@ class KafkaHealthcheck(private val brokerId: Int,
       info("done re-registering broker")
       info("Subscribing to %s path to watch for new 
topics".format(ZkUtils.BrokerTopicsPath))
     }
+
+    @throws(classOf[Exception])
+    def handleSessionEstablishmentError(error: Throwable): Unit = {
+    }
   }
 
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5cd4c84fc48..a89b355581a 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -137,29 +137,35 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   }
 
   private def initZk(): ZkClient = {
-    info("Connecting to zookeeper on " + config.zkConnect)
+    // If configuration contains exhibitor config we will create cxn to 
exhibitor,
+    // and use zookeeper connect string as a backup. Otherwise we will use the
+    // configured zookeeper connect string. In either case (exhibitor or 
direct zookeeper),
+    // if the zkConnect string contains a chroot/namespace, we will use that 
namespace for
+    // the zk connection we return.
+    info("Connecting to zookeeper on " + config.zkConnect +
+      (if (!config.exhibitorHosts.isEmpty) " (" + config.exhibitorHosts + ")" 
else ""))
 
-    val chroot = {
+    val curatorClient = ZkUtils.makeCuratorClient(config)
+
+    val namespace = {
       if (config.zkConnect.indexOf("/") > 0)
-        config.zkConnect.substring(config.zkConnect.indexOf("/"))
+        config.zkConnect.substring(config.zkConnect.indexOf("/") + 1)
       else
         ""
     }
 
-    if (chroot.length > 1) {
-      val zkConnForChrootCreation = config.zkConnect.substring(0, 
config.zkConnect.indexOf("/"))
-      val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, 
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
-      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
-      info("Created zookeeper path " + chroot)
+    if (namespace.length > 1) {
+      val zkClientForChrootCreation = ZkUtils.bridgeCurator(config, 
curatorClient.usingNamespace(null))
+      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, "/" + 
namespace)
+      info("Created zookeeper path /" + namespace)
       zkClientForChrootCreation.close()
     }
 
-    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, 
config.zkConnectionTimeoutMs, ZKStringSerializer)
+    val zkClient = ZkUtils.bridgeCurator(config, curatorClient)
     ZkUtils.setupCommonPaths(zkClient)
     zkClient
   }
 
-
   /**
    *  Forces some dynamic jmx beans to be registered on server startup.
    */
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a6f4d461ac0..c0232a15efc 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,12 +17,18 @@
 
 package kafka.utils
 
+import com.netflix.curator.x.zkclientbridge.CuratorZKClientBridge
 import kafka.cluster.{Broker, Cluster}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
+import kafka.server.KafkaConfig
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
+import 
org.apache.curator.ensemble.exhibitor.Exhibitors.BackupConnectionStringProvider
+import org.apache.curator.ensemble.exhibitor.{DefaultExhibitorRestClient, 
Exhibitors, ExhibitorEnsembleProvider}
+import org.apache.curator.framework.{CuratorFrameworkFactory, CuratorFramework}
+import org.apache.curator.retry.BoundedExponentialBackoffRetry
 import collection._
 import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
@@ -30,10 +36,9 @@ import kafka.admin._
 import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.ReassignedPartitionsContext
 import kafka.controller.KafkaController
-import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
-import scala.collection
+import collection.JavaConversions._
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -707,6 +712,66 @@ object ZkUtils extends Logging {
       }.flatten.toSet
     }
   }
+
+  /**
+   * Adapt a CuratorFramework instance to a ZkClient.
+   */
+  def bridgeCurator(config: ZKConfig, curator: CuratorFramework): ZkClient = {
+    new ZkClient(new CuratorZKClientBridge(curator),
+      config.zkConnectionTimeoutMs, ZKStringSerializer)
+  }
+
+  /**
+   * Make curator client using configured values but with the explicit 
provided zkConnect string and namespace.
+   */
+  def makeCuratorClient(config: ZKConfig): CuratorFramework = {
+    val useZkConnect = {
+      if (config.zkConnect.indexOf("/") > 0)
+        config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
+      else
+        config.zkConnect
+    }
+    val useNamespace = {
+      if (config.zkConnect.indexOf("/") > 0)
+        config.zkConnect.substring(config.zkConnect.indexOf("/") + 1)
+      else
+        ""
+    }
+    val builder: CuratorFrameworkFactory.Builder = 
CuratorFrameworkFactory.builder
+    builder.connectionTimeoutMs(config.zkConnectionTimeoutMs)
+    builder.sessionTimeoutMs(config.zkSessionTimeoutMs)
+    builder.retryPolicy(new BoundedExponentialBackoffRetry(
+      config.zkRetryInterval,
+      config.zkRetryIntervalCeiling,
+      config.zkRetryTimes))
+    if (useNamespace != null && !useNamespace.isEmpty) {
+      builder.namespace(useNamespace)
+    }
+    if (config.exhibitorHosts.isEmpty) {
+      // no exhibitor configured; connect directly to zookeeper
+      builder.connectString(useZkConnect)
+    } else {
+      builder.ensembleProvider(
+        new ExhibitorEnsembleProvider(
+          new Exhibitors(
+            config.exhibitorHosts.split(",").toSeq,
+            config.exhibitorPort,
+            new BackupConnectionStringProvider {
+              // use zkconnect as backup cxn string
+              override def getBackupConnectionString: String = useZkConnect
+            }),
+          new DefaultExhibitorRestClient,
+          config.exhibitorPollUriPath,
+          config.exhibitorPollMs,
+          new BoundedExponentialBackoffRetry(
+            config.exhibitorRetryInterval,
+            config.exhibitorRetryIntervalCeiling,
+            config.exhibitorRetryTimes)))
+    }
+    val curator = builder.build()
+    curator.start()
+    curator
+  }
 }
 
 object ZKStringSerializer extends ZkSerializer {
@@ -737,7 +802,7 @@ class ZKGroupTopicDirs(group: String, topic: String) 
extends ZKGroupDirs(group)
 
 class ZKConfig(props: VerifiableProperties) {
   /** ZK host string */
-  val zkConnect = props.getString("zookeeper.connect")
+  val zkConnect = props.getString("zookeeper.connect", "")
 
   /** zookeeper session timeout */
   val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
@@ -745,6 +810,36 @@ class ZKConfig(props: VerifiableProperties) {
   /** the max time that the client waits to establish a connection to 
zookeeper */
   val zkConnectionTimeoutMs = 
props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
 
+  /** zookeeper retry times */
+  val zkRetryTimes = props.getInt("zookeeper.retry.times", 5)
+
+  /** zookeeper retry interval */
+  val zkRetryInterval = props.getInt("zookeeper.retry.interval", 1000)
+
+  /** zookeeper retry interval */
+  val zkRetryIntervalCeiling = 
props.getInt("zookeeper.retry.interval.ceiling", 30000)
+
   /** how far a ZK follower can be behind a ZK leader */
   val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
+
+  /** Exhibitor host string */
+  val exhibitorHosts = props.getString("exhibitor.connect", "")
+
+  /** Exhibitor host port */
+  val exhibitorPort = props.getInt("exhibitor.port", 8080)
+
+  /** Exhbitor poll */
+  val exhibitorPollMs = props.getInt("exhibitor.poll.ms", 1000)
+
+  /** Exhibitor uri */
+  val exhibitorPollUriPath = props.getString("exhibitor.poll.uripath", 
"/exhibitor/v1/cluster/list")
+
+  /** Exhibitor retry times */
+  val exhibitorRetryTimes = props.getInt("exhibitor.retry.times", 5)
+
+  /** Exhibitor retry interval */
+  val exhibitorRetryInterval = props.getInt("exhibitor.retry.interval", 1000)
+
+  /** Exhibitor retry interval */
+  val exhibitorRetryIntervalCeiling = 
props.getInt("exhibitor.retry.interval.ceiling", 30000)
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Consider replacing zkclient with curator (with zkclient-bridge)
> ---------------------------------------------------------------
>
>                 Key: KAFKA-873
>                 URL: https://issues.apache.org/jira/browse/KAFKA-873
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8.0
>            Reporter: Scott Clasen
>            Assignee: Grant Henke
>
> If zkclient was replaced with curator and curator-x-zkclient-bridge it would 
> be initially a drop-in replacement
> https://github.com/Netflix/curator/wiki/ZKClient-Bridge
> With the addition of a few more props to ZkConfig, and a bit of code this 
> would open up the possibility of using ACLs in zookeeper (which arent 
> supported directly by zkclient), as well as integrating with netflix 
> exhibitor for those of us using that.
> Looks like KafkaZookeeperClient needs some love anyhow...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to