[
https://issues.apache.org/jira/browse/STORM-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vivek Mittal updated STORM-2505:
--------------------------------
Description:
Kafka maintains the spout progress (offsets for partitions) which can hold a
value which no longer exists (or offset+1 doesn't exist) in the topic due to
following reasons
* Topology stopped processing (or died) & topic got compacted
(cleanup.policy=compact) leaving offset voids in the topic.
* Topology stopped processing (or died) & Topic got cleaned up
(cleanup.policy=delete) and the offset.
When the topology starts processing again (or restarted), the spout logic
suggests that the next offset has to be (committedOffset+1) for the spout to
make progress, which will never be the case as (committedOffset+1) has been
removed from the topic and will never be acked.
{code:title=OffsetManager.java|borderStyle=solid}
if (currOffset == nextCommitOffset + 1) { // found the next offset
to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currOffset > nextCommitOffset + 1) {
LOG.debug("topic-partition [{}] has non-continuous offset [{}].
It will be processed in a subsequent batch.", tp, currOffset);
}
{code}
A smart forwarding mechanism has to be built so as to forward the spout pivot
to the next logical location, instead of a hardcoded single forward operation.
was:
Kafka maintains the spout progress (offsets for partitions) which can hold a
value which no longer exists (or offset+1 doesn't exist) in the topic due to
following reasons
* Topology stopped processing (or died) & topic got compacted
(cleanup.policy=compact) leaving offset voids in the topic.
* Topology stopped processing (or died) & Topic got cleaned up
(cleanup.policy=delete) and the offset.
When the topology starts processing again (or restarted), the spout logic
suggests that the next offset has to be (committedOffset+1) for the spout to
make progress, which will never be the case as (committedOffset+1) has been
removed from the topic and will never be acked.
{code:title=OffsetManager.java|borderStyle=solid}
// Some comments here
if (currOffset == nextCommitOffset + 1) { // found the next offset
to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currOffset > nextCommitOffset + 1) {
LOG.debug("topic-partition [{}] has non-continuous offset [{}].
It will be processed in a subsequent batch.", tp, currOffset);
}
{code}
A smart forwarding mechanism has to be built so as to forward the spout pivot
to the next logical location, instead of a hardcoded single forward operation.
> Kafka Spout doesn't support voids in the topic (topic compaction not
> supported)
> -------------------------------------------------------------------------------
>
> Key: STORM-2505
> URL: https://issues.apache.org/jira/browse/STORM-2505
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka
> Affects Versions: 1.x
> Reporter: Vivek Mittal
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Kafka maintains the spout progress (offsets for partitions) which can hold a
> value which no longer exists (or offset+1 doesn't exist) in the topic due to
> following reasons
> * Topology stopped processing (or died) & topic got compacted
> (cleanup.policy=compact) leaving offset voids in the topic.
> * Topology stopped processing (or died) & Topic got cleaned up
> (cleanup.policy=delete) and the offset.
> When the topology starts processing again (or restarted), the spout logic
> suggests that the next offset has to be (committedOffset+1) for the spout to
> make progress, which will never be the case as (committedOffset+1) has been
> removed from the topic and will never be acked.
> {code:title=OffsetManager.java|borderStyle=solid}
> if (currOffset == nextCommitOffset + 1) { // found the next
> offset to commit
> found = true;
> nextCommitMsg = currAckedMsg;
> nextCommitOffset = currOffset;
> } else if (currOffset > nextCommitOffset + 1) {
> LOG.debug("topic-partition [{}] has non-continuous offset
> [{}]. It will be processed in a subsequent batch.", tp, currOffset);
> }
> {code}
> A smart forwarding mechanism has to be built so as to forward the spout pivot
> to the next logical location, instead of a hardcoded single forward operation.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)