[ 
https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400983#comment-16400983
 ] 

Stig Rohde Døssing edited comment on STORM-2994 at 3/15/18 7:18 PM:
--------------------------------------------------------------------

Yes, I think that's the bug. Nice find. Seems like there was a mistake when 
adding the null filtering code. See 
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L602.
 

Just going to give a quick summary of how we got to this code, and what I think 
is wrong.

Originally the offset manager simply kept track of acked tuples, and committed 
tuples once there was a complete sequence of acked tuples to commit (e.g. if 
1,2,3,5,6 were acked, the offset manager would commit 1,2,3 and wait for 4 to 
be acked). A later PR added support for topic compaction, i.e. topics where an 
offset sequence might look like 1,2,3,5,6, with offset 4 missing entirely. The 
offset manager now allows skipping offset 4 if it is known that it was never 
emitted by the spout.

When null tuple filtering was added, I think this functionality wasn't kept in 
mind. When a null tuple is filtered out, it's just discarded without being 
added to the offset manager emitted list, or the acked list. The result seems 
to be that the offset manager is never told about the null tuples, so won't 
commit them either. When the next non-null tuple gets acked, the offset manager 
is told about it. From the offset manager's perspective, it now looks like 
there was a gap of offsets that weren't emitted by the spout, so it reacts by 
committing past them. Unfortunately this only happens once a non-null tuple is 
acked, which can take arbitrarily long.

I think the fix should be fairly simple: We add the null filtered tuples to the 
offset manager acked and emitted lists, so null filtering will behave as if the 
null tuples were acked immediately. If you'd like to give fixing it a shot, the 
two places to look would be 
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L503
 and 
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L602.
 If you don't, let me know and I'll take a look when I get a chance.


was (Author: srdo):
Yes, I think that's the bug. Seems like there was a mistake when adding the 
null filtering code. See 
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L602.
 

Just going to give a quick summary of how we got to this code, and what I think 
is wrong.

Originally the offset manager simply kept track of acked tuples, and committed 
tuples once there was a complete sequence of acked tuples to commit (e.g. if 
1,2,3,5,6 were acked, the offset manager would commit 1,2,3 and wait for 4 to 
be acked). A later PR added support for topic compaction, i.e. topics where an 
offset sequence might look like 1,2,3,5,6, with offset 4 missing entirely. The 
offset manager now allows skipping offset 4 if it is known that it was never 
emitted by the spout.

When null tuple filtering was added, I think this functionality wasn't kept in 
mind. When a null tuple is filtered out, it's just discarded without being 
added to the offset manager emitted list, or the acked list. The result seems 
to be that the offset manager is never told about the null tuples, so won't 
commit them either. When the next non-null tuple gets acked, the offset manager 
is told about it. From the offset manager's perspective, it now looks like 
there was a gap of offsets that weren't emitted by the spout, so it reacts by 
committing past them. Unfortunately this only happens once a non-null tuple is 
acked, which can take arbitrarily long.

I think the fix should be fairly simple: We add the null filtered tuples to the 
offset manager acked and emitted lists, so null filtering will behave as if the 
null tuples were acked immediately. If you'd like to give fixing it a shot, the 
two places to look would be 
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L503
 and 
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L602.
 If you don't, let me know and I'll take a look when I get a chance.

> KafkaSpout consumes messages but doesn't commit offsets
> -------------------------------------------------------
>
>                 Key: STORM-2994
>                 URL: https://issues.apache.org/jira/browse/STORM-2994
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0, 1.1.2
>            Reporter: RAbreu
>            Priority: Major
>
> A topology that consumes from two different Kafka clusters: 0.10.1.1 and 
> 0.10.2.1.
> Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) 
> The Spout that consumes from 0.10.1.1 exhibits either:
> 1- Unknown lag
> 2- Lag that increments as the Spout reads messages from Kafka
>  
> In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be 
> committed", despite continuing to consume messages.
> Several configuration tweaks were tried, including setting maxRetries to 1, 
> in case messages with a lower offset were being retried (logs didn't show it, 
> though)
> offsetCommitPeriodMs was also  lowered to no avail.
> The only configuration that works is to have 
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired   since 
> we lose processing guarantees.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to