Hello All,

Thank you very much for your feedbacks on our experience with
storm-kafka-client, we're going to take your suggestions into account to
dig into our issues and we'll feedback as soon as we have more to share.

Best regards,
Alexandre Vermeerbergen

2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>:

> Hi Alexandre,
>
> I don't know much of storm-kafka-client, but at a glimpse, I can't find
> misuse of HashMap in KafkaSpout so I'd rather suspect that OffsetManager
> being really huge. If you are willing to dig more on KafkaSpout OOME issue,
> you can get more information of KafkaSpout for tracking with changing log
> level to DEBUG or even TRACE.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <[email protected]>님이 작성:
>
> > Hi Alexandre,
> >
> > About issue 1:
> > This issue is not by design. It is a side effect of the spout internally
> > using the KafkaConsumer's subscribe API instead of the assign API.
> Support
> > for using the assign API was added a while back, but has a bug that is
> > preventing the spout from starting when configured to use that API. We
> are
> > working on fixing the issues with that implementation in these PRs
> > https://github.com/apache/storm/pull/2150, https://github.com/apache/
> > storm/pull/2174 <https://github.com/apache/storm/pull/2174>. I think it
> > is very likely that we will remove support for
> > the subscribe API at some point as well, making the assign API the
> default,
> > since several users have had issues with the subscribe API's behavior.
> >
> > Once the assign API support is fixed, you can switch to using it via this
> > KafkaSpoutConfig Builder constructor https://github.com/apache/
> > storm/blob/master/external/storm-kafka-client/src/main/
> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and this
> > Subscription implementation https://github.com/srdo/storm/blob/
> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm-
> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > ManualPartitionSubscription.java
> >
> > If you'd like to try out the code from the PR branch, you can check it
> out
> > with some of the steps described here
> >
> > https://help.github.com/articles/checking-out-pull-
> requests-locally/#modifying-an-inactive-pull-request-locally
> > .
> >
> > Note that the PRs for fixing the manual partitioning option are against
> the
> > master branch right now, which is targeting Storm version 2.0.0, but I
> > believe you may be able to use the 2.0.0 storm-kafka-client jar with a
> > 1.1.0 cluster.
> >
> > Switching to the assign API should solve remove the instability as the
> > spouts start.
> >
> > About issue 2:
> > The spout lag shouldn't have an effect on memory use. I'm wondering if
> your
> > spout instances are not progressing at all, which might explain the lag?
> > You should be able to check this using the kafka-consumer-groups.sh
> script
> > in the Kafka /bin directory. Once you've started the spout, you can use
> the
> > script to inspect which offsets the consumer group have committed. Try
> > checking if the offsets are moving once the spouts have started running.
> >
> > I can't spout any suspicious use of HashMap in the 1.x KafkaSpout. Your
> > attachment didn't make it through, could you post it somewhere?
> >
> > About issue 3:
> > The CommitException you are getting is likely because we use the
> subscribe
> > API. When using the subscribe API Kafka is in control of partition
> > assigment, and will reassign partitions if a consumer doesn't call poll
> on
> > the consumer often enough. The default is that the consumer must be
> polled
> > at least every 5 minutes. See max.poll.interval.ms in the Kafka consumer
> > configuration. The assign API doesn't require this, and won't shut down
> > spouts because they are too slow.
> >
> > There's likely still another underlying issue, because it seems strange
> > that your spout instances are not polling at least once per 5 minutes, at
> > least if you didn't set a high max.poll.records. It's almost certainly
> > related to issue 2. Do you have acking enabled, and what is your
> > topology.message.timeout.secs?
> >
> > I'm not sure I understand why you would want to write your own spout from
> > scratch, rather than contributing fixes to the storm-kafka-client spout.
> It
> > seems likely to be more effort than fixing the problems with
> > storm-kafka-client.
> >
> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen <
> > [email protected]
> > >:
> >
> > > More about this thread: we noticed that with StormKafkaClient 1.1.x
> > > latest, we get OutOfMemoryError after ~2hours of running our simple
> test
> > > topology.
> > >
> > > We reproduce it everytime, so we decided to generate a heap dump before
> > > the OutOfMemoryError, and viewed the result using EclipseMAT.
> > >
> > > The results tends to show that there's a memory leak in
> KafkaSpoutClient:
> > > =====================================================================
> > >
> > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* loaded by
> > *"sun.misc.Launcher$AppClassLoader
> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory is
> > > accumulated in one instance of *"java.util.HashMap$Node[]"* loaded by
> > *"<system
> > > class loader>"*.
> > >
> > > *Keywords*
> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98
> > > java.util.HashMap$Node[]
> > > org.apache.storm.kafka.spout.KafkaSpout
> > >
> > > =====================================================================
> > >
> > > See attached screenshots of EclipseMAT session showing graphical
> > > representation of memory usage
> > >
> > > FYI we tried to follow instructions from https://docs.hortonworks.com/
> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/
> > > content/storm-kafkaspout-perf.html to avoid the use of too much
> memory,
> > > but still after 2 hours the memory fills up and the process hosting our
> > > spout is killed by Supervisor...
> > >
> > > Any clue of what we may have missed?
> > >
> > > Best regards,
> > > Alexandre Vermeerbergen
> > >
> > >
> > >
> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre Vermeerbergen <
> > > [email protected]>:
> > >
> > >> Oops, sent my last mail too fast, let me continue it:
> > >>
> > >> Hello,
> > >>
> > >> Coming back to my original post in this list, we have 3 issues with
> > >> latest 1.1.x StormKafkaClient spout with our setup:
> > >>
> > >> Issue#1:
> > >>  Initial lag (which we hadn't using the classic Storm Kafka spout)
> > >>    For this issue, my understanding of Kristopher's answer is that
> this
> > >> is "by design" of the StormKafkaClient spout, which instances
> > progressively
> > >> joins Kafka consumers group, which causes consumers rebalancing. This
> > >> rebalancing is "slow", which means that until all spout instances are
> > >> started, the topology starts with an "initial Kafka Lag"
> > >>    => Is my understanding correct?
> > >>    => Why don't we have such behavior with the old Storm Kafka spout ?
> > >>    => Is this annoying initial lag tracked by a JIRA ?
> > >>
> > >> Issue#2:
> > >>     The kafka Lag is increasing constantly and this leads to the
> > overload
> > >> of the storm worker running the kafka spout. At the end, the worker
> > crashes
> > >> and it is automatically restarted by Storm.
> > >>     => This is unlike what we observe with the old Storm Kafka spout
> > >>     => What is the recommended way to analyze this issue?
> > >>
> > >> Issue3:
> > >>   With the new Kafka Spout, we have faced this exception many times:
> > >>
> > >>  org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot
> > >> be completed since the group has already rebalanced and assigned the
> > >> partitions to another member. This means that the time between
> > subsequent
> > >> calls to poll() was longer than the configured max.poll.interval.ms,
> > >> which typically implies that the poll loop is spending too much time
> > >> message processing. You can address this either by increasing the
> > session
> > >> timeout or by reducing the maximum size of batches returned in poll()
> > with
> > >> max.poll.records. at org.apache.kafka.clients.consu
> > >>
> > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(
> ConsumerCoordinator.java:702)
> > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
> > >>
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1124)
> > >> at
> > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(
> KafkaSpout.java:384)
> > >> at
> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220)
> > >> at
> > org.apache.storm.daemon.executor$fn__10780$fn__10795$
> fn__10826.invoke(executor.clj:646)
> > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
> > >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja
> > >> va:748)
> > >>
> > >>
> > >>   => Are we the only ones experiencing such issues with Storm
> > 1.1.0/1.1.x
> > >> latest ?
> > >>
> > >> Note: We are considering writing our own Kafka Spout, as we're
> > time-bound
> > >> to move to Kafka 0.10.x consumers & producers (to prepare our next
> step
> > >> with Kafka security, which isn't available with Kafka 0.9.x). We will
> > miss
> > >> the integration of Kafka lag in StormUI, but currently we do not
> > understand
> > >> how to solve the regressions we observe with latest Storm Kafka client
> > >> spout.
> > >>
> > >> Are there other Storm developers/users who jumped into this
> alternative?
> > >>
> > >> Best regards,
> > >>
> > >> Alexandre Vermeerbergen
> > >>
> > >>
> > >>
> > >>
> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre Vermeerbergen
> <
> > >> [email protected]>:
> > >>
> > >>> Hello,
> > >>>
> > >>> Coming back to my original post in this list, we have two issues with
> > >>> latest 1.1.x StormKafkaClient spout with our setup:
> > >>>
> > >>> Issue#1:
> > >>>  Initial lag (which we hadn't using the classic Storm Kafka spout)
> > >>>    For this issue, my understanding of Kristopher's answer is that
> this
> > >>> is "by design" of the StormKafkaClient spout, which instances
> > progressively
> > >>> joins Kafka consumers group, which causes consumers rebalancing. This
> > >>> rebalancing is "slow", which means that until all spout instances are
> > >>> started, the topology starts with an "initial Kafka Lag"
> > >>>    => Is my understanding correct?
> > >>>    => Why don't we have such behavior with the old Storm Kafka spout
> ?
> > >>>    => Is this annoying initial lag tracked by a JIRA ?
> > >>>
> > >>>
> > >>>
> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
> > >>> [email protected]>:
> > >>>
> > >>>> Hello Kristopher,
> > >>>>
> > >>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26
> > >>>> <20%2017%2006%2026>)  artifacts downloaded from
> > >>>> https://github.com/apache/storm/tree/1.x-branch.
> > >>>> <https://github.com/apache/storm/tree/1.x-branch>
> > >>>>
> > >>>> Is your latest PR supposed to be in what we downloaded & built, or
> do
> > >>>> we need to upgrade in some way (which?)
> > >>>>
> > >>>> Should we change anything to our settings?
> > >>>>
> > >>>> Please note that I mistakenly wrote that our "Kafka consumer
> strategy
> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is set to
> > LATEST",
> > >>>> if that matters.
> > >>>>
> > >>>> Best regards,
> > >>>> Alexandre Vermeerbergen
> > >>>>
> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>:
> > >>>>
> > >>>>> Correction: https://github.com/apache/storm/pull/2174 has all of
> > what
> > >>>>> I was
> > >>>>> doing and more.
> > >>>>>
> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <
> > [email protected]
> > >>>>> >
> > >>>>> wrote:
> > >>>>>
> > >>>>> > Alexandre,
> > >>>>> >
> > >>>>> > There are quite a few JIRAs and discussions around this recently.
> > >>>>> The
> > >>>>> > default behavior for storm-kafka-client is the 'subscribe' API
> > which
> > >>>>> causes
> > >>>>> > the immediate lag you see since rebalance will happen spout(n)-1
> > >>>>> times
> > >>>>> > just from the spouts spinning up.
> > >>>>> >
> > >>>>> > There is a Builder for ManualPartitionNamedSubscription and the
> > >>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka
> consumer
> > >>>>> API) but
> > >>>>> > they don't work at all.  I hope to have a PR in today
> > >>>>> > to fix these on 1.x-branch
> > >>>>> >
> > >>>>> > The other JIRAs I mentioned are for a redesign of this spout or
> > >>>>> other more
> > >>>>> > drastic changes.  My goal is a bug fix for a version of the spout
> > >>>>> that
> > >>>>> > doesn't provide unnecessary duplicates.
> > >>>>> >
> > >>>>> > Kris
> > >>>>> >
> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
> > >>>>> > [email protected]> wrote:
> > >>>>> >
> > >>>>> >> Hello All,
> > >>>>> >>
> > >>>>> >> We have been running for a while our real-time supervision
> > >>>>> application
> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old
> consumer:
> > >>>>> >> storm-kafka) and with our Kafka Broker cluster based on Apache
> > Kafka
> > >>>>> >> 0.10.1.0.
> > >>>>> >>
> > >>>>> >> Backpressure is activated with default parameters.
> > >>>>> >>
> > >>>>> >>  Key
> > >>>>> >> Value
> > >>>>> >>
> > >>>>> >> backpressure.disruptor.high.watermark              0.9
> > >>>>> >>
> > >>>>> >> backpressure.disruptor.low.watermark               0.4
> > >>>>> >>
> > >>>>> >> task.backpressure.poll.secs
>  30
> > >>>>> >>
> > >>>>> >> topology.backpressure.enable                               true
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the
> > new
> > >>>>> Kafka
> > >>>>> >> Spout  (storm-kafka-client lib) with a consumer which has no
> more
> > >>>>> >> dependency on Zookeeper.
> > >>>>> >>
> > >>>>> >> After upgrade, we had several issues with kafka consumption.
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> We saw that several JIRAs were opened and resolved on Apache
> Storm
> > >>>>> 1.1.1.
> > >>>>> >>
> > >>>>> >> So we decided to upgrade to the latest available Apache Storm
> > 1.1.x
> > >>>>> code
> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but  we still
> > >>>>> have issues :
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> 1. The kafka Lag is increasing constantly and this leads to the
> > >>>>> overload
> > >>>>> >> of
> > >>>>> >> the storm worker running the kafka spout. At the end, the worker
> > >>>>> crashes
> > >>>>> >> and it is automatically restarted by Storm.
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> With old kafka spout version, we had a lag most of the times
> > bellow
> > >>>>> 10000.
> > >>>>> >>
> > >>>>> >> With the new one, we are starting with Kafka lag about 30000 and
> > >>>>> >> increasing
> > >>>>> >> until crash.
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> 2. With the new Kafka Spout, we have faced this exception many
> > >>>>> times:
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > >>>>> cannot be
> > >>>>> >> completed since the group has already rebalanced and assigned
> the
> > >>>>> >> partitions to another member. This means that the time between
> > >>>>> subsequent
> > >>>>> >> calls to poll() was longer than the configured
> > max.poll.interval.ms
> > >>>>> ,
> > >>>>> >> which
> > >>>>> >> typically implies that the poll loop is spending too much time
> > >>>>> message
> > >>>>> >> processing. You can address this either by increasing the
> session
> > >>>>> timeout
> > >>>>> >> or by reducing the maximum size of batches returned in poll()
> with
> > >>>>> >> max.poll.records. at
> > >>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> > >>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
> > >>>>> >> at
> > >>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
> > >>>>> >> at
> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > >>>>> >> KafkaConsumer.java:1124)
> > >>>>> >> at
> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
> > >>>>> >> dTuples(KafkaSpout.java:384)
> > >>>>> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout
> > >>>>> .java:220)
> > >>>>> >> at
> > >>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> > >>>>> >> 10826.invoke(executor.clj:646)
> > >>>>> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:
> 484)
> > at
> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at
> > java.lang.Thread.run(Thread.ja
> > >>>>> >> va:748)
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> We are using the following configuration for Kafka Spout :
> > >>>>> >>
> > >>>>> >> poll.timeout.ms 200.
> > >>>>> >>
> > >>>>> >> offset.commit.period.ms  30000          (30 seconds).
> > >>>>> >>
> > >>>>> >> max.uncommitted.offsets 10000000  (ten million).
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> The Kafka consumer strategy is set to EARLY. We also set the
> > >>>>> following
> > >>>>> >> Kafka consumer parameter :
> > >>>>> >>
> > >>>>> >> session.timeout.ms  120000
> > >>>>> >>
> > >>>>> >> Are there any traces/options which we could turn on on Storm or
> on
> > >>>>> Kafka
> > >>>>> >> Broker that might help understanding how to stabilize our
> > >>>>> topologies with
> > >>>>> >> this new branch?
> > >>>>> >>
> > >>>>> >> Thanks!
> > >>>>> >>
> > >>>>> >> Alexandre Vermeerbergen
> > >>>>> >>
> > >>>>> >
> > >>>>> >
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to