[ https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13751517#comment-13751517 ]
Guozhang Wang commented on KAFKA-1012: -------------------------------------- Thanks for the patch, happy to see some Scala features not used before. Some more (many stylistic) comments: 26. OffsetMetadataAndError 26.1 BrokerNotAvailableCode -> BrokerNotAvailable 27. ZookeeperConsumerConnector 27.1 rename connectZK -> createZKConnection (it returns a zkClient instead of instantiate a zkClient now) 27.2 val gtp = GroupTopicPartition(config.groupId, currTopicPartition) -> groupTopicPartition 27.3 throw new InvalidOffsetException("Offset fetch request could not fetch all requested offsets as some offsets are being loaded.") -> we can improve this logging info as "... as the offset backend storage is not fully started up, will try later" 28. KafkaApis 28.1 I think we use if { } else { } instead of if { } else { } 29. OffsetManager 29.1 val OffsetsTopicProperties = new Properties() private val Decoder = new StringDecoder private var Manager: OffsetManager = null private var OffsetTopicPartitions = 1 private val Partitioner = new DefaultPartitioner Use small first letter? 29.2 Should we sycnhronize the checking of val offsetMetadataError = if(loading.contains(OffsetManager.partitionFor(key.group))) OffsetMetadataAndError.OffsetLoading with def triggerLoadOffsets(offsetsPartition: Int) { loading.add(offsetsPartition) // prevent any offset fetch directed to this partition of the offsets topic Otherwise would it be possible that the check passes before the partition is added to loading? 30. Another high-level question: currently the handleOffsetFetchRequest seems to be forwarded to the leader, got the response and forwarded back to consumer in a blocking way. When the leader is loaded, would this propagates a high latency issue, meaning that one slow broker could cause another broker slow, and hence another broker slow, and hence the whole cluster to be slow? > Implement an Offset Manager and hook offset requests to it > ---------------------------------------------------------- > > Key: KAFKA-1012 > URL: https://issues.apache.org/jira/browse/KAFKA-1012 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Tejas Patil > Assignee: Tejas Patil > Priority: Minor > Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch > > > After KAFKA-657, we have a protocol for consumers to commit and fetch offsets > from brokers. Currently, consumers are not using this API and directly > talking with Zookeeper. > This Jira will involve following: > 1. Add a special topic in kafka for storing offsets > 2. Add an OffsetManager interface which would handle storing, accessing, > loading and maintaining consumer offsets > 3. Implement offset managers for both of these 2 choices : existing ZK based > storage or inbuilt storage for offsets. > 4. Leader brokers would now maintain an additional hash table of offsets for > the group-topic-partitions that they lead > 5. Consumers should now use the OffsetCommit and OffsetFetch API -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira