[
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487501#comment-16487501
]
Stig Rohde Døssing commented on STORM-3081:
-------------------------------------------
The properties you pass to the KafkaSpoutConfig are the KafkaConsumer
properties documented at
https://kafka.apache.org/documentation/#newconsumerconfigs. The
TOPOLOGY_BACKPRESSURE_ENABLE setting is a Storm parameter, so you should set it
in your Storm config (you're likely creating a Config object when you create
your topology).
{quote}
Even if a message is not consumed, some different message is unaffected by it.
It gets consumed properly inspite of the last unconsumed message.
{quote}
I'm not sure I follow. Do you mean that if you put e.g. message "a" and "b"
into partition 0, then "b" is emitted, but "a" isn't?
Are you producing into Kafka with acks, so you're sure the message is actually
written to Kafka?
In order to narrow it down to the spout, you might want to try running a
KafkaConsumer manually and verify that it doesn't drop messages. Then try using
a test topology where you have only the spout and a logging bolt, and check
whether messages are dropped here.
Finally, do the offsets that are skipped contain null values? The spout will
skip these by default
https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L655.
> Storm kafka client not consuming messages properly
> --------------------------------------------------
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka, storm-kafka-client
> Affects Versions: 1.2.1
> Reporter: Kush Khandelwal
> Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue
> blocking the thread which waits for the response from storm's end.
> We added one more consumer with a different consumer group and there the
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this -
> KafkaSpoutConfig<String, String> kafkaSpoutConfig =
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"),
> stormProperties.getProperty("TOPIC"))
>
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)