[
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299409#comment-15299409
]
ASF GitHub Bot commented on STORM-822:
--------------------------------------
Github user jianbzhou commented on the pull request:
https://github.com/apache/storm/pull/1131#issuecomment-221465746
@hmcl , sorry for the late reply, i was on leave and just now i send the
updated spout to you, pls help review. Below is the major changes:
1. In poll method, change numUncommittedOffsets <
kafkaSpoutConfig.getMaxUncommittedOffsets()
to emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
2. In method doSeekRetriableTopicPartitions, seems your code is
contradicted with the comment, i changed
**from:**
else {
kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
}
**To:**
else {
// kafkaConsumer.seekToEnd(rtp); // Seek to last
committed offset
OffsetAndMetadata commitOffset =
kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, commitOffset.offset()); //
Seek to last committed offset
}
3. in ack method, we found acked.get(msgId.getTopicPartition()) might
return null so we add some defensive validation - possibly due to kafka
consumer rebalance, the partition doesn't belongs to this spout anymore
4. in OffsetEntry.add method, we add one condition, only add the message
when condition is met - if (msgId.offset() > committedOffset). This
change was also applied in method doSeekRetriableTopicPartitions.
> Kafka Spout New Consumer API
> ----------------------------
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)