More about this thread: we noticed that with StormKafkaClient 1.1.x latest, we get OutOfMemoryError after ~2hours of running our simple test topology.
We reproduce it everytime, so we decided to generate a heap dump before the OutOfMemoryError, and viewed the result using EclipseMAT. The results tends to show that there's a memory leak in KafkaSpoutClient: ===================================================================== One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* loaded by *"sun.misc.Launcher$AppClassLoader @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory is accumulated in one instance of *"java.util.HashMap$Node[]"* loaded by *"<system class loader>"*. *Keywords* sun.misc.Launcher$AppClassLoader @ 0x80023d98 java.util.HashMap$Node[] org.apache.storm.kafka.spout.KafkaSpout ===================================================================== See attached screenshots of EclipseMAT session showing graphical representation of memory usage FYI we tried to follow instructions from https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/content/storm-kafkaspout-perf.html to avoid the use of too much memory, but still after 2 hours the memory fills up and the process hosting our spout is killed by Supervisor... Any clue of what we may have missed? Best regards, Alexandre Vermeerbergen 2017-06-28 9:17 GMT+02:00 Alexandre Vermeerbergen <[email protected]> : > Oops, sent my last mail too fast, let me continue it: > > Hello, > > Coming back to my original post in this list, we have 3 issues with latest > 1.1.x StormKafkaClient spout with our setup: > > Issue#1: > Initial lag (which we hadn't using the classic Storm Kafka spout) > For this issue, my understanding of Kristopher's answer is that this is > "by design" of the StormKafkaClient spout, which instances progressively > joins Kafka consumers group, which causes consumers rebalancing. This > rebalancing is "slow", which means that until all spout instances are > started, the topology starts with an "initial Kafka Lag" > => Is my understanding correct? > => Why don't we have such behavior with the old Storm Kafka spout ? > => Is this annoying initial lag tracked by a JIRA ? > > Issue#2: > The kafka Lag is increasing constantly and this leads to the overload > of the storm worker running the kafka spout. At the end, the worker crashes > and it is automatically restarted by Storm. > => This is unlike what we observe with the old Storm Kafka spout > => What is the recommended way to analyze this issue? > > Issue3: > With the new Kafka Spout, we have faced this exception many times: > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, > which typically implies that the poll loop is spending too much time > message processing. You can address this either by increasing the session > timeout or by reducing the maximum size of batches returned in poll() with > max.poll.records. at org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > tor.commitOffsetsSync(ConsumerCoordinator.java:581) at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > at > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) > at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220) > at > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__10826.invoke(executor.clj:646) > at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at > clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748) > > > => Are we the only ones experiencing such issues with Storm 1.1.0/1.1.x > latest ? > > Note: We are considering writing our own Kafka Spout, as we're time-bound > to move to Kafka 0.10.x consumers & producers (to prepare our next step > with Kafka security, which isn't available with Kafka 0.9.x). We will miss > the integration of Kafka lag in StormUI, but currently we do not understand > how to solve the regressions we observe with latest Storm Kafka client > spout. > > Are there other Storm developers/users who jumped into this alternative? > > Best regards, > > Alexandre Vermeerbergen > > > > > 2017-06-28 9:09 GMT+02:00 Alexandre Vermeerbergen < > [email protected]>: > >> Hello, >> >> Coming back to my original post in this list, we have two issues with >> latest 1.1.x StormKafkaClient spout with our setup: >> >> Issue#1: >> Initial lag (which we hadn't using the classic Storm Kafka spout) >> For this issue, my understanding of Kristopher's answer is that this >> is "by design" of the StormKafkaClient spout, which instances progressively >> joins Kafka consumers group, which causes consumers rebalancing. This >> rebalancing is "slow", which means that until all spout instances are >> started, the topology starts with an "initial Kafka Lag" >> => Is my understanding correct? >> => Why don't we have such behavior with the old Storm Kafka spout ? >> => Is this annoying initial lag tracked by a JIRA ? >> >> >> >> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen < >> [email protected]>: >> >>> Hello Kristopher, >>> >>> We built Storm 1.1.1-latest using yesterday's (2017-06-26) artifacts >>> downloaded from https://github.com/apache/storm/tree/1.x-branch. >>> <https://github.com/apache/storm/tree/1.x-branch> >>> >>> Is your latest PR supposed to be in what we downloaded & built, or do we >>> need to upgrade in some way (which?) >>> >>> Should we change anything to our settings? >>> >>> Please note that I mistakenly wrote that our "Kafka consumer strategy is >>> set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST", if >>> that matters. >>> >>> Best regards, >>> Alexandre Vermeerbergen >>> >>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>: >>> >>>> Correction: https://github.com/apache/storm/pull/2174 has all of what >>>> I was >>>> doing and more. >>>> >>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]> >>>> wrote: >>>> >>>> > Alexandre, >>>> > >>>> > There are quite a few JIRAs and discussions around this recently. The >>>> > default behavior for storm-kafka-client is the 'subscribe' API which >>>> causes >>>> > the immediate lag you see since rebalance will happen spout(n)-1 times >>>> > just from the spouts spinning up. >>>> > >>>> > There is a Builder for ManualPartitionNamedSubscription and the >>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer >>>> API) but >>>> > they don't work at all. I hope to have a PR in today >>>> > to fix these on 1.x-branch >>>> > >>>> > The other JIRAs I mentioned are for a redesign of this spout or other >>>> more >>>> > drastic changes. My goal is a bug fix for a version of the spout that >>>> > doesn't provide unnecessary duplicates. >>>> > >>>> > Kris >>>> > >>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < >>>> > [email protected]> wrote: >>>> > >>>> >> Hello All, >>>> >> >>>> >> We have been running for a while our real-time supervision >>>> application >>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer: >>>> >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka >>>> >> 0.10.1.0. >>>> >> >>>> >> Backpressure is activated with default parameters. >>>> >> >>>> >> Key >>>> >> Value >>>> >> >>>> >> backpressure.disruptor.high.watermark 0.9 >>>> >> >>>> >> backpressure.disruptor.low.watermark 0.4 >>>> >> >>>> >> task.backpressure.poll.secs 30 >>>> >> >>>> >> topology.backpressure.enable true >>>> >> >>>> >> >>>> >> >>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new >>>> Kafka >>>> >> Spout (storm-kafka-client lib) with a consumer which has no more >>>> >> dependency on Zookeeper. >>>> >> >>>> >> After upgrade, we had several issues with kafka consumption. >>>> >> >>>> >> >>>> >> We saw that several JIRAs were opened and resolved on Apache Storm >>>> 1.1.1. >>>> >> >>>> >> So we decided to upgrade to the latest available Apache Storm 1.1.x >>>> code >>>> >> built from source (2017-06-26) but we still have issues : >>>> >> >>>> >> >>>> >> >>>> >> 1. The kafka Lag is increasing constantly and this leads to the >>>> overload >>>> >> of >>>> >> the storm worker running the kafka spout. At the end, the worker >>>> crashes >>>> >> and it is automatically restarted by Storm. >>>> >> >>>> >> >>>> >> >>>> >> With old kafka spout version, we had a lag most of the times bellow >>>> 10000. >>>> >> >>>> >> With the new one, we are starting with Kafka lag about 30000 and >>>> >> increasing >>>> >> until crash. >>>> >> >>>> >> >>>> >> >>>> >> 2. With the new Kafka Spout, we have faced this exception many times: >>>> >> >>>> >> >>>> >> >>>> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit >>>> cannot be >>>> >> completed since the group has already rebalanced and assigned the >>>> >> partitions to another member. This means that the time between >>>> subsequent >>>> >> calls to poll() was longer than the configured max.poll.interval.ms, >>>> >> which >>>> >> typically implies that the poll loop is spending too much time >>>> message >>>> >> processing. You can address this either by increasing the session >>>> timeout >>>> >> or by reducing the maximum size of batches returned in poll() with >>>> >> max.poll.records. at >>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702) >>>> >> at >>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) >>>> >> at >>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( >>>> >> KafkaConsumer.java:1124) >>>> >> at >>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke >>>> >> dTuples(KafkaSpout.java:384) >>>> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout >>>> .java:220) >>>> >> at >>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ >>>> >> 10826.invoke(executor.clj:646) >>>> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at >>>> >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja >>>> >> va:748) >>>> >> >>>> >> >>>> >> >>>> >> We are using the following configuration for Kafka Spout : >>>> >> >>>> >> poll.timeout.ms 200. >>>> >> >>>> >> offset.commit.period.ms 30000 (30 seconds). >>>> >> >>>> >> max.uncommitted.offsets 10000000 (ten million). >>>> >> >>>> >> >>>> >> >>>> >> The Kafka consumer strategy is set to EARLY. We also set the >>>> following >>>> >> Kafka consumer parameter : >>>> >> >>>> >> session.timeout.ms 120000 >>>> >> >>>> >> Are there any traces/options which we could turn on on Storm or on >>>> Kafka >>>> >> Broker that might help understanding how to stabilize our topologies >>>> with >>>> >> this new branch? >>>> >> >>>> >> Thanks! >>>> >> >>>> >> Alexandre Vermeerbergen >>>> >> >>>> > >>>> > >>>> >>> >>> >> >
