[
https://issues.apache.org/jira/browse/STORM-3102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511474#comment-16511474
]
Stig Rohde Døssing commented on STORM-3102:
-------------------------------------------
Nice find, great job. The change seems to come from
https://issues.apache.org/jira/browse/KAFKA-5273. Seems like they removed the
committed offset cache because commits can also be done by the producer now, so
it was unsafe to cache the committed offsets.
The check is simply there to check that we don't reintroduce a fixed bug
(https://issues.apache.org/jira/browse/STORM-2666). I felt it made sense to
have, because it didn't really raise complexity, and it was an extremely cheap
check. Now that calling KafkaConsumer.committed is expensive, I think the check
isn't worth it anymore. I don't have a strong opinion on whether we should keep
it around for testing/debugging and disable it with a flag, or delete it
outright. I'd probably lean toward deleting it to avoid adding unnecessary
configuration though.
[~acseidel] If you'd like to make this fix, please open a PR at
https://github.com/apache/storm/pulls.
> Storm Kafka Client performance issues with Kafka Client v1.0.0
> --------------------------------------------------------------
>
> Key: STORM-3102
> URL: https://issues.apache.org/jira/browse/STORM-3102
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.0.0, 1.0.6, 1.1.3, 1.2.2
> Reporter: Andy Seidel
> Priority: Major
>
> Recently I upgraded our storm topology to use the storm-kafka-client instead
> of storm-kafka. After the upgrade in our production environment we saw a
> significant (2x) reduction in our processing throughput.
> We process ~20000 kafka messages per second, on a 10 machine kafka 1.0.0
> server cluster.
> After some investigation, it looks like the issue only occurs when using
> kafka clients 0.11 or newer.
> In kafka 0.11, the kafka consumer method commited always blocks to make an
> external call o get the last commited offsets
> [https://github.com/apache/kafka/blob/e18335dd953107a61d89451932de33d33c0fd207/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1326-L1351]
> In kafka 0.10.2 the kafka consumer only made the blocking remote call if the
> partition is not assigned to the consumer
> [https://github.com/apache/kafka/blob/695596977c7f293513f255e07f5a4b0240a7595c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1274-L1311]
>
> The impact of this is to require every tuple to make blocking remote calls
> before being emitted.
> [https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L464-L473]
> Removing this check returns performance to expected levels.
> Looking through the storm-kafka-client code, it is not clear to me the impact
> of ignoring the check. In our case we want at least once processing, but for
> other processing gurantees the call to kafkaConsumer.commited(tp) is not
> needed, as the value is only looked at if the processing mode is at least
> once.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)