kafka-920; zkclient jar 0.2.0 is not compatible with 0.1.0; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d93cbc61 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d93cbc61 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d93cbc61 Branch: refs/heads/trunk Commit: d93cbc610a412d099324c2b4ce04660a00178fa9 Parents: dc0de29 Author: Jun Rao <jun...@gmail.com> Authored: Thu May 30 20:32:10 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu May 30 20:32:10 2013 -0700 ---------------------------------------------------------------------- core/build.sbt | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 15 +++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d93cbc61/core/build.sbt ---------------------------------------------------------------------- diff --git a/core/build.sbt b/core/build.sbt index 405ea55..c54cf44 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -12,7 +12,7 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ ) libraryDependencies ++= Seq( "org.apache.zookeeper" % "zookeeper" % "3.3.4", - "com.101tec" % "zkclient" % "0.2", + "com.101tec" % "zkclient" % "0.3", "org.xerial.snappy" % "snappy-java" % "1.0.4.1", "com.yammer.metrics" % "metrics-core" % "2.2.0", "com.yammer.metrics" % "metrics-annotation" % "2.2.0", http://git-wip-us.apache.org/repos/asf/kafka/blob/d93cbc61/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 2f5dff6..63ea87e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -298,22 +298,17 @@ object ZkUtils extends Logging { * create parrent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ - def updatePersistentPath(client: ZkClient, path: String, data: String): Int = { - var stat: Stat = null + def updatePersistentPath(client: ZkClient, path: String, data: String) = { try { - stat = client.writeData(path, data) - return stat.getVersion + client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) try { client.createPersistent(path, data) - // When the new path is created, its zkVersion always starts from 0 - return 0 } catch { case e: ZkNodeExistsException => - stat = client.writeData(path, data) - return stat.getVersion + client.writeData(path, data) case e2 => throw e2 } } @@ -327,7 +322,7 @@ object ZkUtils extends Logging { */ def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = client.writeData(path, data, expectVersion) + val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) @@ -345,7 +340,7 @@ object ZkUtils extends Logging { */ def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = client.writeData(path, data, expectVersion) + val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion)