More about this thread: we noticed that with StormKafkaClient 1.1.x latest,
we get OutOfMemoryError after ~2hours of running our simple test topology.

We reproduce it everytime, so we decided to generate a heap dump before the
OutOfMemoryError, and viewed the result using EclipseMAT.

The results tends to show that there's a memory leak in KafkaSpoutClient:
=====================================================================

One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* loaded by
*"sun.misc.Launcher$AppClassLoader
@ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory is
accumulated in one instance of *"java.util.HashMap$Node[]"* loaded by *"<system
class loader>"*.

*Keywords*
sun.misc.Launcher$AppClassLoader @ 0x80023d98
java.util.HashMap$Node[]
org.apache.storm.kafka.spout.KafkaSpout

=====================================================================

See attached screenshots of EclipseMAT session showing graphical
representation of memory usage

FYI we tried to follow instructions from
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/content/storm-kafkaspout-perf.html
to avoid the use of too much memory, but still after 2 hours the memory
fills up and the process hosting our spout is killed by Supervisor...

Any clue of what we may have missed?

Best regards,
Alexandre Vermeerbergen



2017-06-28 9:17 GMT+02:00 Alexandre Vermeerbergen <[email protected]>
:

> Oops, sent my last mail too fast, let me continue it:
>
> Hello,
>
> Coming back to my original post in this list, we have 3 issues with latest
> 1.1.x StormKafkaClient spout with our setup:
>
> Issue#1:
>  Initial lag (which we hadn't using the classic Storm Kafka spout)
>    For this issue, my understanding of Kristopher's answer is that this is
> "by design" of the StormKafkaClient spout, which instances progressively
> joins Kafka consumers group, which causes consumers rebalancing. This
> rebalancing is "slow", which means that until all spout instances are
> started, the topology starts with an "initial Kafka Lag"
>    => Is my understanding correct?
>    => Why don't we have such behavior with the old Storm Kafka spout ?
>    => Is this annoying initial lag tracked by a JIRA ?
>
> Issue#2:
>     The kafka Lag is increasing constantly and this leads to the overload
> of the storm worker running the kafka spout. At the end, the worker crashes
> and it is automatically restarted by Storm.
>     => This is unlike what we observe with the old Storm Kafka spout
>     => What is the recommended way to analyze this issue?
>
> Issue3:
>   With the new Kafka Spout, we have faced this exception many times:
>
>  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records. at org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220)
> at 
> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__10826.invoke(executor.clj:646)
> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748)
>
>
>   => Are we the only ones experiencing such issues with Storm 1.1.0/1.1.x
> latest ?
>
> Note: We are considering writing our own Kafka Spout, as we're time-bound
> to move to Kafka 0.10.x consumers & producers (to prepare our next step
> with Kafka security, which isn't available with Kafka 0.9.x). We will miss
> the integration of Kafka lag in StormUI, but currently we do not understand
> how to solve the regressions we observe with latest Storm Kafka client
> spout.
>
> Are there other Storm developers/users who jumped into this alternative?
>
> Best regards,
>
> Alexandre Vermeerbergen
>
>
>
>
> 2017-06-28 9:09 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]>:
>
>> Hello,
>>
>> Coming back to my original post in this list, we have two issues with
>> latest 1.1.x StormKafkaClient spout with our setup:
>>
>> Issue#1:
>>  Initial lag (which we hadn't using the classic Storm Kafka spout)
>>    For this issue, my understanding of Kristopher's answer is that this
>> is "by design" of the StormKafkaClient spout, which instances progressively
>> joins Kafka consumers group, which causes consumers rebalancing. This
>> rebalancing is "slow", which means that until all spout instances are
>> started, the topology starts with an "initial Kafka Lag"
>>    => Is my understanding correct?
>>    => Why don't we have such behavior with the old Storm Kafka spout ?
>>    => Is this annoying initial lag tracked by a JIRA ?
>>
>>
>>
>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
>> [email protected]>:
>>
>>> Hello Kristopher,
>>>
>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26)  artifacts
>>> downloaded from https://github.com/apache/storm/tree/1.x-branch.
>>> <https://github.com/apache/storm/tree/1.x-branch>
>>>
>>> Is your latest PR supposed to be in what we downloaded & built, or do we
>>> need to upgrade in some way (which?)
>>>
>>> Should we change anything to our settings?
>>>
>>> Please note that I mistakenly wrote that our "Kafka consumer strategy is
>>> set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST", if
>>> that matters.
>>>
>>> Best regards,
>>> Alexandre Vermeerbergen
>>>
>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>:
>>>
>>>> Correction: https://github.com/apache/storm/pull/2174 has all of what
>>>> I was
>>>> doing and more.
>>>>
>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]>
>>>> wrote:
>>>>
>>>> > Alexandre,
>>>> >
>>>> > There are quite a few JIRAs and discussions around this recently.  The
>>>> > default behavior for storm-kafka-client is the 'subscribe' API which
>>>> causes
>>>> > the immediate lag you see since rebalance will happen spout(n)-1 times
>>>> > just from the spouts spinning up.
>>>> >
>>>> > There is a Builder for ManualPartitionNamedSubscription and the
>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer
>>>> API) but
>>>> > they don't work at all.  I hope to have a PR in today
>>>> > to fix these on 1.x-branch
>>>> >
>>>> > The other JIRAs I mentioned are for a redesign of this spout or other
>>>> more
>>>> > drastic changes.  My goal is a bug fix for a version of the spout that
>>>> > doesn't provide unnecessary duplicates.
>>>> >
>>>> > Kris
>>>> >
>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
>>>> > [email protected]> wrote:
>>>> >
>>>> >> Hello All,
>>>> >>
>>>> >> We have been running for a while our real-time supervision
>>>> application
>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer:
>>>> >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka
>>>> >> 0.10.1.0.
>>>> >>
>>>> >> Backpressure is activated with default parameters.
>>>> >>
>>>> >>  Key
>>>> >> Value
>>>> >>
>>>> >> backpressure.disruptor.high.watermark              0.9
>>>> >>
>>>> >> backpressure.disruptor.low.watermark               0.4
>>>> >>
>>>> >> task.backpressure.poll.secs                                     30
>>>> >>
>>>> >> topology.backpressure.enable                               true
>>>> >>
>>>> >>
>>>> >>
>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new
>>>> Kafka
>>>> >> Spout  (storm-kafka-client lib) with a consumer which has no more
>>>> >> dependency on Zookeeper.
>>>> >>
>>>> >> After upgrade, we had several issues with kafka consumption.
>>>> >>
>>>> >>
>>>> >> We saw that several JIRAs were opened and resolved on Apache Storm
>>>> 1.1.1.
>>>> >>
>>>> >> So we decided to upgrade to the latest available Apache Storm 1.1.x
>>>> code
>>>> >> built from source (2017-06-26) but  we still have issues :
>>>> >>
>>>> >>
>>>> >>
>>>> >> 1. The kafka Lag is increasing constantly and this leads to the
>>>> overload
>>>> >> of
>>>> >> the storm worker running the kafka spout. At the end, the worker
>>>> crashes
>>>> >> and it is automatically restarted by Storm.
>>>> >>
>>>> >>
>>>> >>
>>>> >> With old kafka spout version, we had a lag most of the times bellow
>>>> 10000.
>>>> >>
>>>> >> With the new one, we are starting with Kafka lag about 30000 and
>>>> >> increasing
>>>> >> until crash.
>>>> >>
>>>> >>
>>>> >>
>>>> >> 2. With the new Kafka Spout, we have faced this exception many times:
>>>> >>
>>>> >>
>>>> >>
>>>> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>> cannot be
>>>> >> completed since the group has already rebalanced and assigned the
>>>> >> partitions to another member. This means that the time between
>>>> subsequent
>>>> >> calls to poll() was longer than the configured max.poll.interval.ms,
>>>> >> which
>>>> >> typically implies that the poll loop is spending too much time
>>>> message
>>>> >> processing. You can address this either by increasing the session
>>>> timeout
>>>> >> or by reducing the maximum size of batches returned in poll() with
>>>> >> max.poll.records. at
>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
>>>> >> at
>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
>>>> >> at
>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>>>> >> KafkaConsumer.java:1124)
>>>> >> at
>>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
>>>> >> dTuples(KafkaSpout.java:384)
>>>> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout
>>>> .java:220)
>>>> >> at
>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
>>>> >> 10826.invoke(executor.clj:646)
>>>> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
>>>> >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja
>>>> >> va:748)
>>>> >>
>>>> >>
>>>> >>
>>>> >> We are using the following configuration for Kafka Spout :
>>>> >>
>>>> >> poll.timeout.ms 200.
>>>> >>
>>>> >> offset.commit.period.ms  30000          (30 seconds).
>>>> >>
>>>> >> max.uncommitted.offsets 10000000  (ten million).
>>>> >>
>>>> >>
>>>> >>
>>>> >> The Kafka consumer strategy is set to EARLY. We also set the
>>>> following
>>>> >> Kafka consumer parameter :
>>>> >>
>>>> >> session.timeout.ms  120000
>>>> >>
>>>> >> Are there any traces/options which we could turn on on Storm or on
>>>> Kafka
>>>> >> Broker that might help understanding how to stabilize our topologies
>>>> with
>>>> >> this new branch?
>>>> >>
>>>> >> Thanks!
>>>> >>
>>>> >> Alexandre Vermeerbergen
>>>> >>
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to