Hi Alexandre,

About issue 1:
This issue is not by design. It is a side effect of the spout internally
using the KafkaConsumer's subscribe API instead of the assign API. Support
for using the assign API was added a while back, but has a bug that is
preventing the spout from starting when configured to use that API. We are
working on fixing the issues with that implementation in these PRs
https://github.com/apache/storm/pull/2150, https://github.com/apache/
storm/pull/2174. I think it is very likely that we will remove support for
the subscribe API at some point as well, making the assign API the default,
since several users have had issues with the subscribe API's behavior.

Once the assign API support is fixed, you can switch to using it via this
KafkaSpoutConfig Builder constructor https://github.com/apache/
storm/blob/master/external/storm-kafka-client/src/main/
java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and this
Subscription implementation https://github.com/srdo/storm/blob/
f524868baa5929f29b69258f0da2948d0f6e152b/external/storm-
kafka-client/src/main/java/org/apache/storm/kafka/spout/
ManualPartitionSubscription.java

If you'd like to try out the code from the PR branch, you can check it out
with some of the steps described here
https://help.github.com/articles/checking-out-pull-requests-locally/#modifying-an-inactive-pull-request-locally
.

Note that the PRs for fixing the manual partitioning option are against the
master branch right now, which is targeting Storm version 2.0.0, but I
believe you may be able to use the 2.0.0 storm-kafka-client jar with a
1.1.0 cluster.

Switching to the assign API should solve remove the instability as the
spouts start.

About issue 2:
The spout lag shouldn't have an effect on memory use. I'm wondering if your
spout instances are not progressing at all, which might explain the lag?
You should be able to check this using the kafka-consumer-groups.sh script
in the Kafka /bin directory. Once you've started the spout, you can use the
script to inspect which offsets the consumer group have committed. Try
checking if the offsets are moving once the spouts have started running.

I can't spout any suspicious use of HashMap in the 1.x KafkaSpout. Your
attachment didn't make it through, could you post it somewhere?

About issue 3:
The CommitException you are getting is likely because we use the subscribe
API. When using the subscribe API Kafka is in control of partition
assigment, and will reassign partitions if a consumer doesn't call poll on
the consumer often enough. The default is that the consumer must be polled
at least every 5 minutes. See max.poll.interval.ms in the Kafka consumer
configuration. The assign API doesn't require this, and won't shut down
spouts because they are too slow.

There's likely still another underlying issue, because it seems strange
that your spout instances are not polling at least once per 5 minutes, at
least if you didn't set a high max.poll.records. It's almost certainly
related to issue 2. Do you have acking enabled, and what is your
topology.message.timeout.secs?

I'm not sure I understand why you would want to write your own spout from
scratch, rather than contributing fixes to the storm-kafka-client spout. It
seems likely to be more effort than fixing the problems with
storm-kafka-client.

2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen <[email protected]
>:

> More about this thread: we noticed that with StormKafkaClient 1.1.x
> latest, we get OutOfMemoryError after ~2hours of running our simple test
> topology.
>
> We reproduce it everytime, so we decided to generate a heap dump before
> the OutOfMemoryError, and viewed the result using EclipseMAT.
>
> The results tends to show that there's a memory leak in KafkaSpoutClient:
> =====================================================================
>
> One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* loaded by 
> *"sun.misc.Launcher$AppClassLoader
> @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory is
> accumulated in one instance of *"java.util.HashMap$Node[]"* loaded by 
> *"<system
> class loader>"*.
>
> *Keywords*
> sun.misc.Launcher$AppClassLoader @ 0x80023d98
> java.util.HashMap$Node[]
> org.apache.storm.kafka.spout.KafkaSpout
>
> =====================================================================
>
> See attached screenshots of EclipseMAT session showing graphical
> representation of memory usage
>
> FYI we tried to follow instructions from https://docs.hortonworks.com/
> HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/
> content/storm-kafkaspout-perf.html to avoid the use of too much memory,
> but still after 2 hours the memory fills up and the process hosting our
> spout is killed by Supervisor...
>
> Any clue of what we may have missed?
>
> Best regards,
> Alexandre Vermeerbergen
>
>
>
> 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]>:
>
>> Oops, sent my last mail too fast, let me continue it:
>>
>> Hello,
>>
>> Coming back to my original post in this list, we have 3 issues with
>> latest 1.1.x StormKafkaClient spout with our setup:
>>
>> Issue#1:
>>  Initial lag (which we hadn't using the classic Storm Kafka spout)
>>    For this issue, my understanding of Kristopher's answer is that this
>> is "by design" of the StormKafkaClient spout, which instances progressively
>> joins Kafka consumers group, which causes consumers rebalancing. This
>> rebalancing is "slow", which means that until all spout instances are
>> started, the topology starts with an "initial Kafka Lag"
>>    => Is my understanding correct?
>>    => Why don't we have such behavior with the old Storm Kafka spout ?
>>    => Is this annoying initial lag tracked by a JIRA ?
>>
>> Issue#2:
>>     The kafka Lag is increasing constantly and this leads to the overload
>> of the storm worker running the kafka spout. At the end, the worker crashes
>> and it is automatically restarted by Storm.
>>     => This is unlike what we observe with the old Storm Kafka spout
>>     => What is the recommended way to analyze this issue?
>>
>> Issue3:
>>   With the new Kafka Spout, we have faced this exception many times:
>>
>>  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured max.poll.interval.ms,
>> which typically implies that the poll loop is spending too much time
>> message processing. You can address this either by increasing the session
>> timeout or by reducing the maximum size of batches returned in poll() with
>> max.poll.records. at org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
>> at 
>> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
>> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220)
>> at 
>> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__10826.invoke(executor.clj:646)
>> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
>> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja
>> va:748)
>>
>>
>>   => Are we the only ones experiencing such issues with Storm 1.1.0/1.1.x
>> latest ?
>>
>> Note: We are considering writing our own Kafka Spout, as we're time-bound
>> to move to Kafka 0.10.x consumers & producers (to prepare our next step
>> with Kafka security, which isn't available with Kafka 0.9.x). We will miss
>> the integration of Kafka lag in StormUI, but currently we do not understand
>> how to solve the regressions we observe with latest Storm Kafka client
>> spout.
>>
>> Are there other Storm developers/users who jumped into this alternative?
>>
>> Best regards,
>>
>> Alexandre Vermeerbergen
>>
>>
>>
>>
>> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre Vermeerbergen <
>> [email protected]>:
>>
>>> Hello,
>>>
>>> Coming back to my original post in this list, we have two issues with
>>> latest 1.1.x StormKafkaClient spout with our setup:
>>>
>>> Issue#1:
>>>  Initial lag (which we hadn't using the classic Storm Kafka spout)
>>>    For this issue, my understanding of Kristopher's answer is that this
>>> is "by design" of the StormKafkaClient spout, which instances progressively
>>> joins Kafka consumers group, which causes consumers rebalancing. This
>>> rebalancing is "slow", which means that until all spout instances are
>>> started, the topology starts with an "initial Kafka Lag"
>>>    => Is my understanding correct?
>>>    => Why don't we have such behavior with the old Storm Kafka spout ?
>>>    => Is this annoying initial lag tracked by a JIRA ?
>>>
>>>
>>>
>>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
>>> [email protected]>:
>>>
>>>> Hello Kristopher,
>>>>
>>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26
>>>> <20%2017%2006%2026>)  artifacts downloaded from
>>>> https://github.com/apache/storm/tree/1.x-branch.
>>>> <https://github.com/apache/storm/tree/1.x-branch>
>>>>
>>>> Is your latest PR supposed to be in what we downloaded & built, or do
>>>> we need to upgrade in some way (which?)
>>>>
>>>> Should we change anything to our settings?
>>>>
>>>> Please note that I mistakenly wrote that our "Kafka consumer strategy
>>>> is set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST",
>>>> if that matters.
>>>>
>>>> Best regards,
>>>> Alexandre Vermeerbergen
>>>>
>>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>:
>>>>
>>>>> Correction: https://github.com/apache/storm/pull/2174 has all of what
>>>>> I was
>>>>> doing and more.
>>>>>
>>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]
>>>>> >
>>>>> wrote:
>>>>>
>>>>> > Alexandre,
>>>>> >
>>>>> > There are quite a few JIRAs and discussions around this recently.
>>>>> The
>>>>> > default behavior for storm-kafka-client is the 'subscribe' API which
>>>>> causes
>>>>> > the immediate lag you see since rebalance will happen spout(n)-1
>>>>> times
>>>>> > just from the spouts spinning up.
>>>>> >
>>>>> > There is a Builder for ManualPartitionNamedSubscription and the
>>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer
>>>>> API) but
>>>>> > they don't work at all.  I hope to have a PR in today
>>>>> > to fix these on 1.x-branch
>>>>> >
>>>>> > The other JIRAs I mentioned are for a redesign of this spout or
>>>>> other more
>>>>> > drastic changes.  My goal is a bug fix for a version of the spout
>>>>> that
>>>>> > doesn't provide unnecessary duplicates.
>>>>> >
>>>>> > Kris
>>>>> >
>>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
>>>>> > [email protected]> wrote:
>>>>> >
>>>>> >> Hello All,
>>>>> >>
>>>>> >> We have been running for a while our real-time supervision
>>>>> application
>>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer:
>>>>> >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka
>>>>> >> 0.10.1.0.
>>>>> >>
>>>>> >> Backpressure is activated with default parameters.
>>>>> >>
>>>>> >>  Key
>>>>> >> Value
>>>>> >>
>>>>> >> backpressure.disruptor.high.watermark              0.9
>>>>> >>
>>>>> >> backpressure.disruptor.low.watermark               0.4
>>>>> >>
>>>>> >> task.backpressure.poll.secs                                     30
>>>>> >>
>>>>> >> topology.backpressure.enable                               true
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new
>>>>> Kafka
>>>>> >> Spout  (storm-kafka-client lib) with a consumer which has no more
>>>>> >> dependency on Zookeeper.
>>>>> >>
>>>>> >> After upgrade, we had several issues with kafka consumption.
>>>>> >>
>>>>> >>
>>>>> >> We saw that several JIRAs were opened and resolved on Apache Storm
>>>>> 1.1.1.
>>>>> >>
>>>>> >> So we decided to upgrade to the latest available Apache Storm 1.1.x
>>>>> code
>>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but  we still
>>>>> have issues :
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> 1. The kafka Lag is increasing constantly and this leads to the
>>>>> overload
>>>>> >> of
>>>>> >> the storm worker running the kafka spout. At the end, the worker
>>>>> crashes
>>>>> >> and it is automatically restarted by Storm.
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> With old kafka spout version, we had a lag most of the times bellow
>>>>> 10000.
>>>>> >>
>>>>> >> With the new one, we are starting with Kafka lag about 30000 and
>>>>> >> increasing
>>>>> >> until crash.
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> 2. With the new Kafka Spout, we have faced this exception many
>>>>> times:
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>> cannot be
>>>>> >> completed since the group has already rebalanced and assigned the
>>>>> >> partitions to another member. This means that the time between
>>>>> subsequent
>>>>> >> calls to poll() was longer than the configured max.poll.interval.ms
>>>>> ,
>>>>> >> which
>>>>> >> typically implies that the poll loop is spending too much time
>>>>> message
>>>>> >> processing. You can address this either by increasing the session
>>>>> timeout
>>>>> >> or by reducing the maximum size of batches returned in poll() with
>>>>> >> max.poll.records. at
>>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
>>>>> >> at
>>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
>>>>> >> at
>>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>>>>> >> KafkaConsumer.java:1124)
>>>>> >> at
>>>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
>>>>> >> dTuples(KafkaSpout.java:384)
>>>>> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout
>>>>> .java:220)
>>>>> >> at
>>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
>>>>> >> 10826.invoke(executor.clj:646)
>>>>> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
>>>>> >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja
>>>>> >> va:748)
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> We are using the following configuration for Kafka Spout :
>>>>> >>
>>>>> >> poll.timeout.ms 200.
>>>>> >>
>>>>> >> offset.commit.period.ms  30000          (30 seconds).
>>>>> >>
>>>>> >> max.uncommitted.offsets 10000000  (ten million).
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> The Kafka consumer strategy is set to EARLY. We also set the
>>>>> following
>>>>> >> Kafka consumer parameter :
>>>>> >>
>>>>> >> session.timeout.ms  120000
>>>>> >>
>>>>> >> Are there any traces/options which we could turn on on Storm or on
>>>>> Kafka
>>>>> >> Broker that might help understanding how to stabilize our
>>>>> topologies with
>>>>> >> this new branch?
>>>>> >>
>>>>> >> Thanks!
>>>>> >>
>>>>> >> Alexandre Vermeerbergen
>>>>> >>
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to