[ https://issues.apache.org/jira/browse/KAFKA-3312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389047#comment-16389047 ]
ASF GitHub Bot commented on KAFKA-3312: --------------------------------------- ijuma closed pull request #1025: KAFKA-3312: Add utility offset methods to ZkUtils URL: https://github.com/apache/kafka/pull/1025 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 414e7baee44..674c138fc83 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -216,8 +216,8 @@ object ConsumerGroupCommand { // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong - offsetMap.put(topicAndPartition, offset) + val offset = zkUtils.getOffset(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition) + offsetMap.put(topicAndPartition, offset.get) } catch { case z: ZkNoNodeException => println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper." diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index f776578f6d4..bd8be4a2c05 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -308,7 +308,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) { if (checkpointedZkOffsets.get(topicPartition) != offset) { val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) + zkUtils.updateOffset(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) checkpointedZkOffsets.put(topicPartition, offset) zkCommitMeter.mark() } @@ -416,9 +416,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = { val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1 - offsetString match { - case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong)) + val offset = zkUtils.getOffset(dirs.consumerOffsetDir + "/" + topicPartition.partition) + offset match { + case Some(offsetVal) => (topicPartition, OffsetMetadataAndError(offsetVal)) case None => (topicPartition, OffsetMetadataAndError.NoOffset) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 086bd4b893d..b8402595ca1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel, else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) else { - zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) + zkUtils.updateOffset(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) (topicPartition, Errors.NONE.code) } } catch { @@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel, if (!metadataCache.hasTopicMetadata(topicPartition.topic)) (topicPartition, unknownTopicPartitionResponse) else { - val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 - payloadOpt match { - case Some(payload) => - (topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code)) + val payload = zkUtils.getOffset(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}") + payload match { + case Some(offsetVal) => + (topicPartition, new OffsetFetchResponse.PartitionData(offsetVal, "", Errors.NONE.code)) case None => (topicPartition, unknownTopicPartitionResponse) } diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 5c01f34a1e2..bbf9149fe3f 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -177,8 +177,8 @@ object ConsumerOffsetChecker extends Logging { // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) + val offset = zkUtils.getOffset(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition)) + offsetMap.put(topicAndPartition, offset.get) } catch { case z: ZkNoNodeException => if(zkUtils.pathExists(topicDirs.consumerOffsetDir)) diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index ccccae57923..a2b21a3867b 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -101,7 +101,7 @@ object ExportZkOffsets extends Logging { for (bidPid <- bidPidList) { val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid - zkUtils.readDataMaybeNull(offsetPath)._1 match { + zkUtils.getOffset(offsetPath) match { case Some(offsetVal) => fileWriter.write(offsetPath + ":" + offsetVal + "\n") debug(offsetPath + " => " + offsetVal) diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 60d48fa326c..a3d1b1159d7 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -98,7 +98,7 @@ object ImportZkOffsets extends Logging { debug("updating [" + partition + "] with offset [" + offset + "]") try { - zkUtils.updatePersistentPath(partition, offset.toString) + zkUtils.updateOffset(partition, offset) } catch { case e: Throwable => e.printStackTrace() } diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 96a33b17de3..b27a4c38ee9 100755 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -76,7 +76,7 @@ object UpdateOffsetsInZK { val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) println("updating partition " + partition + " with new offset: " + offset) - zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + partition, offset.toString) + zkUtils.updateOffset(topicDirs.consumerOffsetDir + "/" + partition, offset.toString) numParts += 1 case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker)) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 81eb24ad105..29c5e512e42 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -867,6 +867,17 @@ class ZkUtils(val zkClient: ZkClient, zkClient.close() } } + + def getOffset(path: String): Option[Long] = { + val offsetString = readDataMaybeNull(path)._1 + offsetString match { + case Some(offsetStr) => Some(offsetStr.toLong) + case None => None + } + } + + def updateOffset(path: String, offset: String) = + updatePersistentPath(path, offset) } private object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7df87fc31c7..b1460e5cd10 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -590,7 +590,7 @@ object TestUtils extends Logging { def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = { val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - zkUtils.updatePersistentPath(path, offset.toString) + zkUtils.updateOffset(path, offset.toString) zkUtils.close() } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a offsets methods to ZkUtils and replace relevant usages > ------------------------------------------------------------ > > Key: KAFKA-3312 > URL: https://issues.apache.org/jira/browse/KAFKA-3312 > Project: Kafka > Issue Type: Improvement > Reporter: Grant Henke > Assignee: Vahid Hashemian > Priority: Major > > There are many places in the code that manually build a zookeeper path and > get or update offsets. Moving this logic to a common location in ZkUtils > would be nice. > Ex: > {code} > zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 > {code} > {code} > zkUtils.readData(topicDirs.consumerOffsetDir + "/" + > topicAndPartition.partition)._1.toLong > {code} > {code} > zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", > partitionData.offset.toString) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)