[ 
https://issues.apache.org/jira/browse/KAFKA-3312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389047#comment-16389047
 ] 

ASF GitHub Bot commented on KAFKA-3312:
---------------------------------------

ijuma closed pull request #1025: KAFKA-3312: Add utility offset methods to 
ZkUtils
URL: https://github.com/apache/kafka/pull/1025
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 414e7baee44..674c138fc83 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -216,8 +216,8 @@ object ConsumerGroupCommand {
           // this group may not have migrated off zookeeper for offsets 
storage (we don't expose the dual-commit option in this tool
           // (meaning the lag may be off until all the consumers in the group 
have the same setting for offsets storage)
           try {
-            val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + 
topicAndPartition.partition)._1.toLong
-            offsetMap.put(topicAndPartition, offset)
+            val offset = zkUtils.getOffset(topicDirs.consumerOffsetDir + "/" + 
topicAndPartition.partition)
+            offsetMap.put(topicAndPartition, offset.get)
           } catch {
             case z: ZkNoNodeException =>
               println("Could not fetch offset from zookeeper for group %s 
partition %s due to missing offset data in zookeeper."
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578f6d4..bd8be4a2c05 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -308,7 +308,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
   def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) 
{
     if (checkpointedZkOffsets.get(topicPartition) != offset) {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, 
topicPartition.topic)
-      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + 
topicPartition.partition, offset.toString)
+      zkUtils.updateOffset(topicDirs.consumerOffsetDir + "/" + 
topicPartition.partition, offset.toString)
       checkpointedZkOffsets.put(topicPartition, offset)
       zkCommitMeter.mark()
     }
@@ -416,9 +416,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
 
   private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
     val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
-    val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" 
+ topicPartition.partition)._1
-    offsetString match {
-      case Some(offsetStr) => (topicPartition, 
OffsetMetadataAndError(offsetStr.toLong))
+    val offset = zkUtils.getOffset(dirs.consumerOffsetDir + "/" + 
topicPartition.partition)
+    offset match {
+      case Some(offsetVal) => (topicPartition, 
OffsetMetadataAndError(offsetVal))
       case None => (topicPartition, OffsetMetadataAndError.NoOffset)
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 086bd4b893d..b8402595ca1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               else if (partitionData.metadata != null && 
partitionData.metadata.length > config.offsetMetadataMaxSize)
                 (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
               else {
-                
zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
 partitionData.offset.toString)
+                
zkUtils.updateOffset(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
 partitionData.offset.toString)
                 (topicPartition, Errors.NONE.code)
               }
             } catch {
@@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel,
             if (!metadataCache.hasTopicMetadata(topicPartition.topic))
               (topicPartition, unknownTopicPartitionResponse)
             else {
-              val payloadOpt = 
zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
-              payloadOpt match {
-                case Some(payload) =>
-                  (topicPartition, new 
OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code))
+              val payload = 
zkUtils.getOffset(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")
+              payload match {
+                case Some(offsetVal) => 
+                  (topicPartition, new 
OffsetFetchResponse.PartitionData(offsetVal, "", Errors.NONE.code))
                 case None =>
                   (topicPartition, unknownTopicPartitionResponse)
               }
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 5c01f34a1e2..bbf9149fe3f 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -177,8 +177,8 @@ object ConsumerOffsetChecker extends Logging {
           // this group may not have migrated off zookeeper for offsets 
storage (we don't expose the dual-commit option in this tool
           // (meaning the lag may be off until all the consumers in the group 
have the same setting for offsets storage)
           try {
-            val offset = zkUtils.readData(topicDirs.consumerOffsetDir + 
"/%d".format(topicAndPartition.partition))._1.toLong
-            offsetMap.put(topicAndPartition, offset)
+            val offset = zkUtils.getOffset(topicDirs.consumerOffsetDir + 
"/%d".format(topicAndPartition.partition))
+            offsetMap.put(topicAndPartition, offset.get)
           } catch {
             case z: ZkNoNodeException =>
               if(zkUtils.pathExists(topicDirs.consumerOffsetDir))
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala 
b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index ccccae57923..a2b21a3867b 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -101,7 +101,7 @@ object ExportZkOffsets extends Logging {
           for (bidPid <- bidPidList) {
             val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
             val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
-            zkUtils.readDataMaybeNull(offsetPath)._1 match {
+            zkUtils.getOffset(offsetPath) match {
               case Some(offsetVal) =>
                 fileWriter.write(offsetPath + ":" + offsetVal + "\n")
                 debug(offsetPath + " => " + offsetVal)
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala 
b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 60d48fa326c..a3d1b1159d7 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -98,7 +98,7 @@ object ImportZkOffsets extends Logging {
       debug("updating [" + partition + "] with offset [" + offset + "]")
       
       try {
-        zkUtils.updatePersistentPath(partition, offset.toString)
+        zkUtils.updateOffset(partition, offset)
       } catch {
         case e: Throwable => e.printStackTrace()
       }
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 96a33b17de3..b27a4c38ee9 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -76,7 +76,7 @@ object UpdateOffsetsInZK {
           val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
           println("updating partition " + partition + " with new offset: " + 
offset)
-          zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + 
partition, offset.toString)
+          zkUtils.updateOffset(topicDirs.consumerOffsetDir + "/" + partition, 
offset.toString)
           numParts += 1
         case None => throw new KafkaException("Broker information for broker 
id %d does not exist in ZK".format(broker))
       }
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 81eb24ad105..29c5e512e42 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -867,6 +867,17 @@ class ZkUtils(val zkClient: ZkClient,
       zkClient.close()
     }
   }
+  
+  def getOffset(path: String): Option[Long] = {
+    val offsetString = readDataMaybeNull(path)._1
+    offsetString match {
+      case Some(offsetStr) => Some(offsetStr.toLong)
+      case None => None
+    }
+  }
+
+  def updateOffset(path: String, offset: String) =
+    updatePersistentPath(path, offset)
 }
 
 private object ZKStringSerializer extends ZkSerializer {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7df87fc31c7..b1460e5cd10 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -590,7 +590,7 @@ object TestUtils extends Logging {
 
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : 
Long) = {
     val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, 
config.zkConnectionTimeoutMs, false)
-    zkUtils.updatePersistentPath(path, offset.toString)
+    zkUtils.updateOffset(path, offset.toString)
     zkUtils.close()
 
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a offsets methods to ZkUtils and replace relevant usages
> ------------------------------------------------------------
>
>                 Key: KAFKA-3312
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3312
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Grant Henke
>            Assignee: Vahid Hashemian
>            Priority: Major
>
> There are many places in the code that manually build a zookeeper path and 
> get or update offsets. Moving this logic to a common location in ZkUtils 
> would be nice. 
> Ex:
> {code}
> zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
> {code}
> {code}
>  zkUtils.readData(topicDirs.consumerOffsetDir + "/" + 
> topicAndPartition.partition)._1.toLong
> {code}
> {code}
> zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
>  partitionData.offset.toString)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to