[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487955#comment-16487955
 ] 

Stig Rohde Døssing commented on STORM-3081:
-------------------------------------------

Just to make it clear to me, how are you determining that a message was 
consumed or not? If you're not already doing it, please try to use the this log 
line to check what the spout emits 
[https://github.com/apache/storm/blob/d156d25d991311eaa1f5131d3dc34787f87ce684/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L488.]

 

Your spout configuration uses the NO_GUARANTEE processing guarantee, which 
means messages may be processed 0, 1, or more times. If you need all messages 
to be processed, you shouldn't use this value. When the spout is configured to 
use this value, it will commit messages at arbitrary times once they've been 
emitted, which means you can lose messages if e.g. messages disappear on your 
network, or a transient error occurs. Use AT_LEAST_ONCE instead (and ensure 
acking is enabled in your topology), to guarantee that all messages are 
processed at least once. That way, if a message disappears or fails in the 
topology, it will be retried a configurable number of times.

 

Committed offsets are stored in the Kafka __consumer_offsets topic. You can 
check the committed offset via the kafka-consumer-groups.sh command in your 
Kafka install. The script may not work in earlier versions of Kafka, I believe 
someone else mentioned having to upgrade to Kafka 0.11 or 1.x to get it to work.

 

Some of what you describe could be caused by the spout restarting, e.g. 
processing the same set of messages multiple times. Given that you're using 
NO_GUARANTEE, I'd only expect this if the spout worker was restarting though. 
I'm not sure what would be causing the rest of what you describe.

 

How large is your Kafka cluster? Which version are you using? Is the version 
you have installed as brokers the same version you are using for Storm?

> Storm kafka client not consuming messages properly
> --------------------------------------------------
>
>                 Key: STORM-3081
>                 URL: https://issues.apache.org/jira/browse/STORM-3081
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka, storm-kafka-client
>    Affects Versions: 1.2.1
>            Reporter: Kush Khandelwal
>            Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig<String, String> kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
>                 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>                 .setRecordTranslator(new MessageDeserializer(), arguments)
>                 .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to