[
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487330#comment-16487330
]
Kush Khandelwal edited comment on STORM-3081 at 5/23/18 2:41 PM:
-----------------------------------------------------------------
The topology has 39 bolts.
There are no tuple failures in the UI.
We tried enabling debug logging, but there also we didn't see any abnormal
behaviour as such. Its polling every 200 ms like it should and even when we
push messages to kafka, it logs - "Polled [0] records from Kafka.".
The problem is not sequential. Even if a message is not consumed, some
different message is unaffected by it. It gets consumed properly inspite of the
last unconsumed message.
So somehow storm is missing a message and incrementing the offset.
I changed the pollOffSet to Uncommitted_Earliest, but there was no change.
Can u please tell us something more which can help out with the issue?
Also, how can I set TOPOLOGY_BACKPRESSURE_ENABLE as false for the kafkaSpout.
I tried -
KafkaSpoutConfig<String, String> kafkaSpoutConfig =
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"),
stormProperties.getProperty("TOPIC"))
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new MessageDeserializer(), arguments)
.setProp(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false)
.build();
It didn't seem to work.
was (Author: kush.kh):
The topology has 39 bolts.
There are no tuple failures in the UI.
We tried enabling debug logging, but there also we didn't see any abnormal
behaviour as such. Its polling every 200 ms like it should and even when we
push messages to kafka, it logs - "Polled [0] records from Kafka.".
The problem is not sequential. Even if a message is not consumed, some
different message is unaffected by it. It gets consumed properly inspite of the
last unconsumed message.
So somehow storm is missing a message and incrementing the offset.
I changed the pollOffSet to Uncommitted_Earliest, but there was no change.
Can u please tell us something more which can help out with the issue?
> Storm kafka client not consuming messages properly
> --------------------------------------------------
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka, storm-kafka-client
> Affects Versions: 1.2.1
> Reporter: Kush Khandelwal
> Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue
> blocking the thread which waits for the response from storm's end.
> We added one more consumer with a different consumer group and there the
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this -
> KafkaSpoutConfig<String, String> kafkaSpoutConfig =
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"),
> stormProperties.getProperty("TOPIC"))
>
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)