[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487878#comment-16487878
 ] 

Kush Khandelwal edited comment on STORM-3081 at 5/23/18 7:30 PM:
-----------------------------------------------------------------

We debugged it a little more and there are multiple situations we are facing 
randomly.
The spout randomly stops consuming messages from kafka.

The kafka spout config looks like this - 

 KafkaSpoutConfig<String, String> kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))
                
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
                .setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))
                
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
                .build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.
* There are also times, when if we keep the topology running, some/all 
(randomly) of the missed messages get processed.

Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.

And how/where are the offsets stored ?

Can u please tell something which would help in solving this?


was (Author: kush.kh):
We debugged it a little more and there are multiple situations we are facing 
randomly.
The spout randomly stops consuming messages from kafka.

The kafka spout config looks like this - 

 KafkaSpoutConfig<String, String> kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))
                
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
                .setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))
                
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
                .build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.
* There are also times, when if we keep the topology running, some/all 
(randomly) of the missed messages get processed.

Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.

Also, how does kafka commit work in this case?
Would it process another message if the commit is not successful?
And how/where are the offsets stored ?

Can u please tell something which would help in solving this?

> Storm kafka client not consuming messages properly
> --------------------------------------------------
>
>                 Key: STORM-3081
>                 URL: https://issues.apache.org/jira/browse/STORM-3081
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka, storm-kafka-client
>    Affects Versions: 1.2.1
>            Reporter: Kush Khandelwal
>            Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig<String, String> kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
>                 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>                 .setRecordTranslator(new MessageDeserializer(), arguments)
>                 .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to