Neway Liu created STORM-1591:
--------------------------------
Summary: [Storm-Kafka] Continuously processing un-emitted messages
when retrying failed message
Key: STORM-1591
URL: https://issues.apache.org/jira/browse/STORM-1591
Project: Apache Storm
Issue Type: Improvement
Components: storm-kafka
Reporter: Neway Liu
Priority: Minor
This is an enhancement of failed message retrying logic in
PartitionManager.java, for the failed msg, it will be fetched again for
retrying in the fill() method.
Current logic was to fetch a bulk of messages from the failed msg offset, and
handle the failed msg, while discard the other un-failed msgs, which was kind
of a waste, if those discarded msgs are un-emitted(offset > _emittedToOffset).
So, the enhance point is to handle the un-failed msgs if those msgs are not
emitted, instead of discarding them. By changing code:
{quote}
if (processingNewTuples ||
this._failedMsgRetryManager.shouldRetryMsg(cur_offset))
{quote}
to
{quote}
if (processingNewTuples ||
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) || cur_offset >=
_emittedToOffset)
{quote}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)