[ 
https://issues.apache.org/jira/browse/STORM-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16001959#comment-16001959
 ] 

Prasanna Ranganathan edited comment on STORM-2505 at 5/9/17 3:31 AM:
---------------------------------------------------------------------

Compaction in Kafka topics creates gaps in the offsets that are permanent and 
visible to all consumers all the time.

While Delete policy does not create permanent holes in the kafka topic offsets, 
spout/consumer could still see gaps under a combination of the following 
factors:

 - Kafka topic with short retention period
 - Storm topology that consumes from such a topic with a slow processing rate
 - Spout offset fetch strategy of earliest OR uncommitted_earliest

Given the above it is possible that while the spout/consumer is busy processing 
messages fetched from a particular partition, the async cleaner has run and 
cleaned up expired logs on the Kafka broker managing that partition. Now when 
the spout fetches the next batch during a subsequent poll it is likely to see 
message offsets that are not sequential to the ones it received in the previous 
batch.

Version info:
Kafka brokers - 0.10.0.1
Kafka Client in spout - 0.10.2


was (Author: ranganp):
Compaction in Kafka topics creates gaps in the offsets that are permanent and 
visible to all consumers all the time.

While Delete policy does not create permanent holes in the kafka topic offsets, 
spout/consumer could still see gaps under a combination of the following 
factors:

 - Kafka topic with short retention period
 - Storm topology that consumes from such a topic with a slow processing rate
 - Spout offset fetch strategy of earliest OR uncommitted_earliest

Given the above it is possible that while the spout/consumer is busy processing 
messages fetched from a particular partition, the async cleaner has run and 
cleaned up expired logs on the Kafka broker managing that partition. Now when 
the spout fetches the next batch during a subsequent poll it is likely to see 
message offsets that are not sequential to the ones it received in the previous 
batch.

> Kafka Spout doesn't support voids in the topic (topic compaction not 
> supported)
> -------------------------------------------------------------------------------
>
>                 Key: STORM-2505
>                 URL: https://issues.apache.org/jira/browse/STORM-2505
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 1.x
>            Reporter: Vivek Mittal
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Kafka maintains the spout progress (offsets for partitions) which can hold a 
> value which no longer exists (or offset+1 doesn't exist) in the topic due to 
> following reasons
> * Topology stopped processing (or died) & topic got compacted 
> (cleanup.policy=compact) leaving offset voids in the topic.
> * Topology stopped processing (or died) & Topic got cleaned up 
> (cleanup.policy=delete) and the offset.
> When the topology starts processing again (or restarted), the spout logic 
> suggests that the next offset has to be (committedOffset+1) for the spout to 
> make progress, which will never be the case as (committedOffset+1) has been 
> removed from the topic and will never be acked.
> {code:title=OffsetManager.java|borderStyle=solid}
>  if (currOffset == nextCommitOffset + 1) {            // found the next 
> offset to commit
>       found = true;
>       nextCommitMsg = currAckedMsg;
>       nextCommitOffset = currOffset;
> } else if (currOffset > nextCommitOffset + 1) {
>       LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will 
> be processed in a subsequent batch.", tp, currOffset);
> }
> {code}
> A smart forwarding mechanism has to be built so as to forward the spout pivot 
> to the next logical location, instead of a hardcoded single forward operation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to