Hello All, Thank you very much for your feedbacks on our experience with storm-kafka-client, we're going to take your suggestions into account to dig into our issues and we'll feedback as soon as we have more to share.
Best regards, Alexandre Vermeerbergen 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>: > Hi Alexandre, > > I don't know much of storm-kafka-client, but at a glimpse, I can't find > misuse of HashMap in KafkaSpout so I'd rather suspect that OffsetManager > being really huge. If you are willing to dig more on KafkaSpout OOME issue, > you can get more information of KafkaSpout for tracking with changing log > level to DEBUG or even TRACE. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <[email protected]>님이 작성: > > > Hi Alexandre, > > > > About issue 1: > > This issue is not by design. It is a side effect of the spout internally > > using the KafkaConsumer's subscribe API instead of the assign API. > Support > > for using the assign API was added a while back, but has a bug that is > > preventing the spout from starting when configured to use that API. We > are > > working on fixing the issues with that implementation in these PRs > > https://github.com/apache/storm/pull/2150, https://github.com/apache/ > > storm/pull/2174 <https://github.com/apache/storm/pull/2174>. I think it > > is very likely that we will remove support for > > the subscribe API at some point as well, making the assign API the > default, > > since several users have had issues with the subscribe API's behavior. > > > > Once the assign API support is fixed, you can switch to using it via this > > KafkaSpoutConfig Builder constructor https://github.com/apache/ > > storm/blob/master/external/storm-kafka-client/src/main/ > > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and this > > Subscription implementation https://github.com/srdo/storm/blob/ > > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm- > > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > ManualPartitionSubscription.java > > > > If you'd like to try out the code from the PR branch, you can check it > out > > with some of the steps described here > > > > https://help.github.com/articles/checking-out-pull- > requests-locally/#modifying-an-inactive-pull-request-locally > > . > > > > Note that the PRs for fixing the manual partitioning option are against > the > > master branch right now, which is targeting Storm version 2.0.0, but I > > believe you may be able to use the 2.0.0 storm-kafka-client jar with a > > 1.1.0 cluster. > > > > Switching to the assign API should solve remove the instability as the > > spouts start. > > > > About issue 2: > > The spout lag shouldn't have an effect on memory use. I'm wondering if > your > > spout instances are not progressing at all, which might explain the lag? > > You should be able to check this using the kafka-consumer-groups.sh > script > > in the Kafka /bin directory. Once you've started the spout, you can use > the > > script to inspect which offsets the consumer group have committed. Try > > checking if the offsets are moving once the spouts have started running. > > > > I can't spout any suspicious use of HashMap in the 1.x KafkaSpout. Your > > attachment didn't make it through, could you post it somewhere? > > > > About issue 3: > > The CommitException you are getting is likely because we use the > subscribe > > API. When using the subscribe API Kafka is in control of partition > > assigment, and will reassign partitions if a consumer doesn't call poll > on > > the consumer often enough. The default is that the consumer must be > polled > > at least every 5 minutes. See max.poll.interval.ms in the Kafka consumer > > configuration. The assign API doesn't require this, and won't shut down > > spouts because they are too slow. > > > > There's likely still another underlying issue, because it seems strange > > that your spout instances are not polling at least once per 5 minutes, at > > least if you didn't set a high max.poll.records. It's almost certainly > > related to issue 2. Do you have acking enabled, and what is your > > topology.message.timeout.secs? > > > > I'm not sure I understand why you would want to write your own spout from > > scratch, rather than contributing fixes to the storm-kafka-client spout. > It > > seems likely to be more effort than fixing the problems with > > storm-kafka-client. > > > > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen < > > [email protected] > > >: > > > > > More about this thread: we noticed that with StormKafkaClient 1.1.x > > > latest, we get OutOfMemoryError after ~2hours of running our simple > test > > > topology. > > > > > > We reproduce it everytime, so we decided to generate a heap dump before > > > the OutOfMemoryError, and viewed the result using EclipseMAT. > > > > > > The results tends to show that there's a memory leak in > KafkaSpoutClient: > > > ===================================================================== > > > > > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* loaded by > > *"sun.misc.Launcher$AppClassLoader > > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory is > > > accumulated in one instance of *"java.util.HashMap$Node[]"* loaded by > > *"<system > > > class loader>"*. > > > > > > *Keywords* > > > sun.misc.Launcher$AppClassLoader @ 0x80023d98 > > > java.util.HashMap$Node[] > > > org.apache.storm.kafka.spout.KafkaSpout > > > > > > ===================================================================== > > > > > > See attached screenshots of EclipseMAT session showing graphical > > > representation of memory usage > > > > > > FYI we tried to follow instructions from https://docs.hortonworks.com/ > > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/ > > > content/storm-kafkaspout-perf.html to avoid the use of too much > memory, > > > but still after 2 hours the memory fills up and the process hosting our > > > spout is killed by Supervisor... > > > > > > Any clue of what we may have missed? > > > > > > Best regards, > > > Alexandre Vermeerbergen > > > > > > > > > > > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre Vermeerbergen < > > > [email protected]>: > > > > > >> Oops, sent my last mail too fast, let me continue it: > > >> > > >> Hello, > > >> > > >> Coming back to my original post in this list, we have 3 issues with > > >> latest 1.1.x StormKafkaClient spout with our setup: > > >> > > >> Issue#1: > > >> Initial lag (which we hadn't using the classic Storm Kafka spout) > > >> For this issue, my understanding of Kristopher's answer is that > this > > >> is "by design" of the StormKafkaClient spout, which instances > > progressively > > >> joins Kafka consumers group, which causes consumers rebalancing. This > > >> rebalancing is "slow", which means that until all spout instances are > > >> started, the topology starts with an "initial Kafka Lag" > > >> => Is my understanding correct? > > >> => Why don't we have such behavior with the old Storm Kafka spout ? > > >> => Is this annoying initial lag tracked by a JIRA ? > > >> > > >> Issue#2: > > >> The kafka Lag is increasing constantly and this leads to the > > overload > > >> of the storm worker running the kafka spout. At the end, the worker > > crashes > > >> and it is automatically restarted by Storm. > > >> => This is unlike what we observe with the old Storm Kafka spout > > >> => What is the recommended way to analyze this issue? > > >> > > >> Issue3: > > >> With the new Kafka Spout, we have faced this exception many times: > > >> > > >> org.apache.kafka.clients.consumer.CommitFailedException: Commit > cannot > > >> be completed since the group has already rebalanced and assigned the > > >> partitions to another member. This means that the time between > > subsequent > > >> calls to poll() was longer than the configured max.poll.interval.ms, > > >> which typically implies that the poll loop is spending too much time > > >> message processing. You can address this either by increasing the > > session > > >> timeout or by reducing the maximum size of batches returned in poll() > > with > > >> max.poll.records. at org.apache.kafka.clients.consu > > >> > > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest( > ConsumerCoordinator.java:702) > > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at > > >> > > org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1124) > > >> at > > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples( > KafkaSpout.java:384) > > >> at > > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220) > > >> at > > org.apache.storm.daemon.executor$fn__10780$fn__10795$ > fn__10826.invoke(executor.clj:646) > > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at > > >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja > > >> va:748) > > >> > > >> > > >> => Are we the only ones experiencing such issues with Storm > > 1.1.0/1.1.x > > >> latest ? > > >> > > >> Note: We are considering writing our own Kafka Spout, as we're > > time-bound > > >> to move to Kafka 0.10.x consumers & producers (to prepare our next > step > > >> with Kafka security, which isn't available with Kafka 0.9.x). We will > > miss > > >> the integration of Kafka lag in StormUI, but currently we do not > > understand > > >> how to solve the regressions we observe with latest Storm Kafka client > > >> spout. > > >> > > >> Are there other Storm developers/users who jumped into this > alternative? > > >> > > >> Best regards, > > >> > > >> Alexandre Vermeerbergen > > >> > > >> > > >> > > >> > > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre Vermeerbergen > < > > >> [email protected]>: > > >> > > >>> Hello, > > >>> > > >>> Coming back to my original post in this list, we have two issues with > > >>> latest 1.1.x StormKafkaClient spout with our setup: > > >>> > > >>> Issue#1: > > >>> Initial lag (which we hadn't using the classic Storm Kafka spout) > > >>> For this issue, my understanding of Kristopher's answer is that > this > > >>> is "by design" of the StormKafkaClient spout, which instances > > progressively > > >>> joins Kafka consumers group, which causes consumers rebalancing. This > > >>> rebalancing is "slow", which means that until all spout instances are > > >>> started, the topology starts with an "initial Kafka Lag" > > >>> => Is my understanding correct? > > >>> => Why don't we have such behavior with the old Storm Kafka spout > ? > > >>> => Is this annoying initial lag tracked by a JIRA ? > > >>> > > >>> > > >>> > > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen < > > >>> [email protected]>: > > >>> > > >>>> Hello Kristopher, > > >>>> > > >>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26 > > >>>> <20%2017%2006%2026>) artifacts downloaded from > > >>>> https://github.com/apache/storm/tree/1.x-branch. > > >>>> <https://github.com/apache/storm/tree/1.x-branch> > > >>>> > > >>>> Is your latest PR supposed to be in what we downloaded & built, or > do > > >>>> we need to upgrade in some way (which?) > > >>>> > > >>>> Should we change anything to our settings? > > >>>> > > >>>> Please note that I mistakenly wrote that our "Kafka consumer > strategy > > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is set to > > LATEST", > > >>>> if that matters. > > >>>> > > >>>> Best regards, > > >>>> Alexandre Vermeerbergen > > >>>> > > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>: > > >>>> > > >>>>> Correction: https://github.com/apache/storm/pull/2174 has all of > > what > > >>>>> I was > > >>>>> doing and more. > > >>>>> > > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane < > > [email protected] > > >>>>> > > > >>>>> wrote: > > >>>>> > > >>>>> > Alexandre, > > >>>>> > > > >>>>> > There are quite a few JIRAs and discussions around this recently. > > >>>>> The > > >>>>> > default behavior for storm-kafka-client is the 'subscribe' API > > which > > >>>>> causes > > >>>>> > the immediate lag you see since rebalance will happen spout(n)-1 > > >>>>> times > > >>>>> > just from the spouts spinning up. > > >>>>> > > > >>>>> > There is a Builder for ManualPartitionNamedSubscription and the > > >>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka > consumer > > >>>>> API) but > > >>>>> > they don't work at all. I hope to have a PR in today > > >>>>> > to fix these on 1.x-branch > > >>>>> > > > >>>>> > The other JIRAs I mentioned are for a redesign of this spout or > > >>>>> other more > > >>>>> > drastic changes. My goal is a bug fix for a version of the spout > > >>>>> that > > >>>>> > doesn't provide unnecessary duplicates. > > >>>>> > > > >>>>> > Kris > > >>>>> > > > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < > > >>>>> > [email protected]> wrote: > > >>>>> > > > >>>>> >> Hello All, > > >>>>> >> > > >>>>> >> We have been running for a while our real-time supervision > > >>>>> application > > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old > consumer: > > >>>>> >> storm-kafka) and with our Kafka Broker cluster based on Apache > > Kafka > > >>>>> >> 0.10.1.0. > > >>>>> >> > > >>>>> >> Backpressure is activated with default parameters. > > >>>>> >> > > >>>>> >> Key > > >>>>> >> Value > > >>>>> >> > > >>>>> >> backpressure.disruptor.high.watermark 0.9 > > >>>>> >> > > >>>>> >> backpressure.disruptor.low.watermark 0.4 > > >>>>> >> > > >>>>> >> task.backpressure.poll.secs > 30 > > >>>>> >> > > >>>>> >> topology.backpressure.enable true > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the > > new > > >>>>> Kafka > > >>>>> >> Spout (storm-kafka-client lib) with a consumer which has no > more > > >>>>> >> dependency on Zookeeper. > > >>>>> >> > > >>>>> >> After upgrade, we had several issues with kafka consumption. > > >>>>> >> > > >>>>> >> > > >>>>> >> We saw that several JIRAs were opened and resolved on Apache > Storm > > >>>>> 1.1.1. > > >>>>> >> > > >>>>> >> So we decided to upgrade to the latest available Apache Storm > > 1.1.x > > >>>>> code > > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but we still > > >>>>> have issues : > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> 1. The kafka Lag is increasing constantly and this leads to the > > >>>>> overload > > >>>>> >> of > > >>>>> >> the storm worker running the kafka spout. At the end, the worker > > >>>>> crashes > > >>>>> >> and it is automatically restarted by Storm. > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> With old kafka spout version, we had a lag most of the times > > bellow > > >>>>> 10000. > > >>>>> >> > > >>>>> >> With the new one, we are starting with Kafka lag about 30000 and > > >>>>> >> increasing > > >>>>> >> until crash. > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> 2. With the new Kafka Spout, we have faced this exception many > > >>>>> times: > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit > > >>>>> cannot be > > >>>>> >> completed since the group has already rebalanced and assigned > the > > >>>>> >> partitions to another member. This means that the time between > > >>>>> subsequent > > >>>>> >> calls to poll() was longer than the configured > > max.poll.interval.ms > > >>>>> , > > >>>>> >> which > > >>>>> >> typically implies that the poll loop is spending too much time > > >>>>> message > > >>>>> >> processing. You can address this either by increasing the > session > > >>>>> timeout > > >>>>> >> or by reducing the maximum size of batches returned in poll() > with > > >>>>> >> max.poll.records. at > > >>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina > > >>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702) > > >>>>> >> at > > >>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina > > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) > > >>>>> >> at > > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > >>>>> >> KafkaConsumer.java:1124) > > >>>>> >> at > > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke > > >>>>> >> dTuples(KafkaSpout.java:384) > > >>>>> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout > > >>>>> .java:220) > > >>>>> >> at > > >>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ > > >>>>> >> 10826.invoke(executor.clj:646) > > >>>>> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj: > 484) > > at > > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at > > java.lang.Thread.run(Thread.ja > > >>>>> >> va:748) > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> We are using the following configuration for Kafka Spout : > > >>>>> >> > > >>>>> >> poll.timeout.ms 200. > > >>>>> >> > > >>>>> >> offset.commit.period.ms 30000 (30 seconds). > > >>>>> >> > > >>>>> >> max.uncommitted.offsets 10000000 (ten million). > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> The Kafka consumer strategy is set to EARLY. We also set the > > >>>>> following > > >>>>> >> Kafka consumer parameter : > > >>>>> >> > > >>>>> >> session.timeout.ms 120000 > > >>>>> >> > > >>>>> >> Are there any traces/options which we could turn on on Storm or > on > > >>>>> Kafka > > >>>>> >> Broker that might help understanding how to stabilize our > > >>>>> topologies with > > >>>>> >> this new branch? > > >>>>> >> > > >>>>> >> Thanks! > > >>>>> >> > > >>>>> >> Alexandre Vermeerbergen > > >>>>> >> > > >>>>> > > > >>>>> > > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > > > > >
