Neway Liu created STORM-1591:
--------------------------------

             Summary: [Storm-Kafka] Continuously processing un-emitted messages 
when retrying failed message
                 Key: STORM-1591
                 URL: https://issues.apache.org/jira/browse/STORM-1591
             Project: Apache Storm
          Issue Type: Improvement
          Components: storm-kafka
            Reporter: Neway Liu
            Priority: Minor


This is an enhancement of failed message retrying logic in 
PartitionManager.java, for the failed msg, it will be fetched again for 
retrying in the fill() method. 

Current logic was to fetch a bulk of messages from the failed msg offset, and 
handle the failed msg, while discard the other un-failed msgs, which was kind 
of a waste, if those discarded msgs are un-emitted(offset > _emittedToOffset).

So, the enhance point is to handle the un-failed msgs if those msgs are not 
emitted, instead of discarding them. By changing code: 

{quote}
if (processingNewTuples || 
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) 
{quote}

to

{quote}
if (processingNewTuples || 
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) || cur_offset >= 
_emittedToOffset)
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to