[ 
https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507512#comment-15507512
 ] 

Jeff Fenchel commented on STORM-2106:
-------------------------------------

[~Srdo] I don't think it quite works that way. In the case you provided, I 
agree with you that x+1..y will be skipped while x is processing. However in 
doSeekRetriableTopicPartitions we seek to the next offset that may be committed 
based on acked messages. So in this case, since x is still processing it seeks 
back to x and filters out all of x-y again because x is processing and the rest 
are acked. The result is nothing gets emitted until x finishes processing. 

For me the batch size kafka provide (~300) was << maxUncommittedOffsets which 
severely crippled the throughput of my topology. 

I agree that we should poll kafka again at y and that is why I made this 
ticket. 

----
Code snippet: 
{code}
    private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);    
// Seek to last committed offset
            }
        }
    }
{code}




> Storm Kafka Client is paused while failed tuples are replayed
> -------------------------------------------------------------
>
>                 Key: STORM-2106
>                 URL: https://issues.apache.org/jira/browse/STORM-2106
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting 
> tuples that are within the poll() size for kafka. This means that if the 
> first tuple in a batch from kafka is failed, the spout will not emit more 
> than the size of the batch from kafka until the tuple is either processed 
> successfully or given up on. This behavior is exacerbated by the exponential 
> backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to