[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299409#comment-15299409
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user jianbzhou commented on the pull request:

    https://github.com/apache/storm/pull/1131#issuecomment-221465746
  
    @hmcl , sorry for the late reply, i was on leave and just now i send the 
updated spout to you, pls help review. Below is the major changes:
    1. In poll method, change numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets()
    to emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
    2. In method doSeekRetriableTopicPartitions, seems your code is 
contradicted with the comment, i changed 
    **from:**
        else {
              kafkaConsumer.seekToEnd(rtp);    // Seek to last committed offset
         }
    **To:** 
                  else {
                        //  kafkaConsumer.seekToEnd(rtp);    // Seek to last 
committed offset
                        OffsetAndMetadata commitOffset = 
kafkaConsumer.committed(rtp);
                        kafkaConsumer.seek(rtp, commitOffset.offset());  // 
Seek to last committed offset
                    }
    
    3. in ack method, we found acked.get(msgId.getTopicPartition()) might 
return null so we add some defensive validation - possibly due to kafka 
consumer rebalance, the partition doesn't belongs to this spout anymore
    4. in OffsetEntry.add method, we add one condition, only add the message 
when condition is met -             if (msgId.offset() > committedOffset). This 
change was also applied in method doSeekRetriableTopicPartitions.
    
    



> Kafka Spout New Consumer API
> ----------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>             Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to