Hello Stig,

Thank you very much for your detailed explanations about storm-kafka-client
behavior, it helps us a lot.

We have topologies for which at-least-one matters, so we're OK to try to
converge them with this default behavior of storm-kafka-client.

However, our main topology is our "realtime alerting" one. It evaluates
incoming "metrics" (our system is a Supervision system) against "triggers"
(same meaning as in Zabbix), and here we prefer to drop anything that we
couldn't evaluate quickly enough. Indeed, to do "real-time alerting", we
know that if we skip a few metrics, then, shortly after, there will be
newer ones that will suffice to get a "near real time enough" view of the
"current state of our supervised devices".

This is the reason why we recently turned on "autocommit" in
storm-kafka-client, to overcome the growing "lag" or OOM we eventually got.

Now our topology works like a charm.

Except that with autocommit turned on, no acks are sent from
storm-kafka-client spouts, so Storm UI isn't displaying any statistics
information about our topology, which is a big annoyance for us.

Question:
* Is our approach to use autocommit OK, or given our scenario, do you
recommend othewise?
* If it's OK to use autocommit, then how can we configure the
storm-kafka-client spout to "blindly" ACK all tuples it sends?

Best regards,
Alexandre Vermeerbergen




2017-07-05 17:22 GMT+02:00 Stig Døssing <[email protected]>:

> Regarding the offsetManagers map, I think the only thing it contains that
> should take up any space is uncommitted offsets. Those should be cleared
> out when the spout commits offsets to Kafka. There have been at least a few
> issues related to that recently, e.g.
> https://github.com/apache/storm/pull/2153, which may be what you're
> running
> into. The spout should be able to limit how many uncommitted offsets it
> will accept before slowing down, but this functionality is still buggy in
> some cases (see https://github.com/apache/storm/pull/2156,
> https://github.com/apache/storm/pull/2152).
>
> The storm-kafka-client spout doesn't share code with storm-kafka, it's an
> implementation based on a different Kafka client (this one
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html),
> where the storm-kafka client used the now deprecated
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.
> 0+SimpleConsumer+Example.
> There are still some bugs on the new spout we're working out.
>
> Auto commit is not enabled in storm-kafka, because the SimpleConsumer
> doesn't have that concept. The old spout kept track of processed offsets by
> storing how far it had read in Zookeeper. The new client allows us to mark
> how far the spout has read in Kafka, which can either be done via the
> commit methods on the KafkaConsumer (auto commit disabled), or periodically
> by a background thread in the consumer (auto commit enabled). Normally when
> the spout emits a message into the topology, we want to only mark that
> message as "done" once Storm tells the spout to ack the tuple. This is the
> behavior when auto commit is disabled; the spout emits some tuples,
> receives some acks, and periodically uses the commitSync method to mark the
> acked tuples as "done" in Kafka. If auto commit is enabled, the
> KafkaConsumer will instead periodically commit the tuples it has returned
> from polls. The effect is that Storm can't control when an offset is
> considered "done", which means that some messages can be lost. This is fine
> for some use cases where you don't necessarily need to process all
> messages.
>
> Leaving auto commit disabled is the closest match to how the storm-kafka
> spout handled messages. We disable auto commit by default in the new spout
> because at-least-once processing guarantees should be the default, not
> something people have to opt in to.
>
> 2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]
> >:
>
> > Hello,
> >
> > To continue on this thread:
> >
> > Since we are facing some OOMEs and Kafka lag peaks with our topologies
> > using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on Kafka
> > 0.10.2.0 libs), we decided to test a much simpler Storm topology.
> >
> >  This test topology is very basic and is composed of :
> >
> > - one instance of Kafka spout consuming a topic composed of 4 partitions
> > with a throughput of about 3000 messages per second and with message size
> > of about 1500 bytes, the consumption strategy is set to "LATEST", other
> > Kafka Spout parameters are default ones
> >
> > - one basic bolt which is just counting received tuples and logging
> > statistics every minutes (min ,max, avg for number of tuples received per
> > min) with acking every tuple
> >
> > Back pressure is activated with default watermark parameters.
> >
> > topology.message.timeout.secs is not changed (default value is 30).
> >
> >  Message are encrypted (using our own encryption method) serialized java
> > objects but we are just using the provided ByteArrayDeserializer for this
> > test since we just want to count tuples without any custom
> deserialization
> > interference.
> >
> > This topology is running alone on a single VM (8 vCPU) with one worker
> > configured with 2 GB of RAM for Xmx JVM option.
> >
> > Please also note that our cluster of Kafka Brokers is based on Apache
> Kafka
> > 0.10.1.0
> >
> >
> >
> > We are tracking the Kafka lag for the topic we are consuming.
> >
> > Kafka lag is 10000 at topology start but it decreased slowly to 2000
> after
> > 24h of execution.
> >
> >
> >
> > We did not face any OOM error this time so we are suspecting that the
> > previous OOMEs were due to some lag in the much more complex topology we
> > were running and due to Storm backpressure slowing down the Spout
> > consumption.
> >
> > In fact, it seems that the following Map in the
> > org.apache.storm.kafka.spout.KafkaSpout class was the root cause of the
> > errors :
> >
> >  ============================================================
> > =========================================
> >
> > // Tuples that were successfully acked/emitted. These tuples will be
> > committed periodically when the commit timer expires, //or after a
> consumer
> > rebalance, or during close/deactivate private transient
> Map<TopicPartition,
> > OffsetManager> offsetManagers;
> > ============================================================
> > =========================================
> >
> >
> >
> > Please note however that these errors did not occur with the "old"
> version
> > of Kafka spout (storm-kafka lib).
> >
> > We ran another test with the same test topology but with
> enable.auto.commit
> > set to true in Kafka consumer properties, lag was starting at about 2000
> > and staying constant for at least 24h.
> >
> > Our understanding is that auto commit is enabled by default in Kafka
> > consumers and also in old version of Kafka spouts (storm-kafka) but it is
> > disabled in the new Kafka spout version (storm-kafka-client) to manage
> > tuple acking, is it correct please ?
> >
> >  What are the consequences of activating auto commit for consumers with
> the
> > new Kafka Spout then ?
> >
> >  Thank you for your feedbacks,
> >
> > Alexandre Vermeerbergen
> >
> > 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen <
> > [email protected]
> > >:
> >
> > > Hello All,
> > >
> > > Thank you very much for your feedbacks on our experience with
> > > storm-kafka-client, we're going to take your suggestions into account
> to
> > > dig into our issues and we'll feedback as soon as we have more to
> share.
> > >
> > > Best regards,
> > > Alexandre Vermeerbergen
> > >
> > > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>:
> > >
> > >> Hi Alexandre,
> > >>
> > >> I don't know much of storm-kafka-client, but at a glimpse, I can't
> find
> > >> misuse of HashMap in KafkaSpout so I'd rather suspect that
> OffsetManager
> > >> being really huge. If you are willing to dig more on KafkaSpout OOME
> > >> issue,
> > >> you can get more information of KafkaSpout for tracking with changing
> > log
> > >> level to DEBUG or even TRACE.
> > >>
> > >> Thanks,
> > >> Jungtaek Lim (HeartSaVioR)
> > >>
> > >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <[email protected]>님이
> 작성:
> > >>
> > >> > Hi Alexandre,
> > >> >
> > >> > About issue 1:
> > >> > This issue is not by design. It is a side effect of the spout
> > internally
> > >> > using the KafkaConsumer's subscribe API instead of the assign API.
> > >> Support
> > >> > for using the assign API was added a while back, but has a bug that
> is
> > >> > preventing the spout from starting when configured to use that API.
> We
> > >> are
> > >> > working on fixing the issues with that implementation in these PRs
> > >> > https://github.com/apache/storm/pull/2150,
> https://github.com/apache/
> > >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174>. I
> think
> > it
> > >> > is very likely that we will remove support for
> > >> > the subscribe API at some point as well, making the assign API the
> > >> default,
> > >> > since several users have had issues with the subscribe API's
> behavior.
> > >> >
> > >> > Once the assign API support is fixed, you can switch to using it via
> > >> this
> > >> > KafkaSpoutConfig Builder constructor https://github.com/apache/
> > >> > storm/blob/master/external/storm-kafka-client/src/main/
> > >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and
> this
> > >> > Subscription implementation https://github.com/srdo/storm/blob/
> > >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm-
> > >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > >> > ManualPartitionSubscription.java
> > >> >
> > >> > If you'd like to try out the code from the PR branch, you can check
> it
> > >> out
> > >> > with some of the steps described here
> > >> >
> > >> > https://help.github.com/articles/checking-out-pull-requests-
> > >> locally/#modifying-an-inactive-pull-request-locally
> > >> > .
> > >> >
> > >> > Note that the PRs for fixing the manual partitioning option are
> > against
> > >> the
> > >> > master branch right now, which is targeting Storm version 2.0.0,
> but I
> > >> > believe you may be able to use the 2.0.0 storm-kafka-client jar
> with a
> > >> > 1.1.0 cluster.
> > >> >
> > >> > Switching to the assign API should solve remove the instability as
> the
> > >> > spouts start.
> > >> >
> > >> > About issue 2:
> > >> > The spout lag shouldn't have an effect on memory use. I'm wondering
> if
> > >> your
> > >> > spout instances are not progressing at all, which might explain the
> > lag?
> > >> > You should be able to check this using the kafka-consumer-groups.sh
> > >> script
> > >> > in the Kafka /bin directory. Once you've started the spout, you can
> > use
> > >> the
> > >> > script to inspect which offsets the consumer group have committed.
> Try
> > >> > checking if the offsets are moving once the spouts have started
> > running.
> > >> >
> > >> > I can't spout any suspicious use of HashMap in the 1.x KafkaSpout.
> > Your
> > >> > attachment didn't make it through, could you post it somewhere?
> > >> >
> > >> > About issue 3:
> > >> > The CommitException you are getting is likely because we use the
> > >> subscribe
> > >> > API. When using the subscribe API Kafka is in control of partition
> > >> > assigment, and will reassign partitions if a consumer doesn't call
> > poll
> > >> on
> > >> > the consumer often enough. The default is that the consumer must be
> > >> polled
> > >> > at least every 5 minutes. See max.poll.interval.ms in the Kafka
> > >> consumer
> > >> > configuration. The assign API doesn't require this, and won't shut
> > down
> > >> > spouts because they are too slow.
> > >> >
> > >> > There's likely still another underlying issue, because it seems
> > strange
> > >> > that your spout instances are not polling at least once per 5
> minutes,
> > >> at
> > >> > least if you didn't set a high max.poll.records. It's almost
> certainly
> > >> > related to issue 2. Do you have acking enabled, and what is your
> > >> > topology.message.timeout.secs?
> > >> >
> > >> > I'm not sure I understand why you would want to write your own spout
> > >> from
> > >> > scratch, rather than contributing fixes to the storm-kafka-client
> > >> spout. It
> > >> > seems likely to be more effort than fixing the problems with
> > >> > storm-kafka-client.
> > >> >
> > >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen <
> > >> > [email protected]
> > >> > >:
> > >> >
> > >> > > More about this thread: we noticed that with StormKafkaClient
> 1.1.x
> > >> > > latest, we get OutOfMemoryError after ~2hours of running our
> simple
> > >> test
> > >> > > topology.
> > >> > >
> > >> > > We reproduce it everytime, so we decided to generate a heap dump
> > >> before
> > >> > > the OutOfMemoryError, and viewed the result using EclipseMAT.
> > >> > >
> > >> > > The results tends to show that there's a memory leak in
> > >> KafkaSpoutClient:
> > >> > > ============================================================
> > =========
> > >> > >
> > >> > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"*
> loaded
> > by
> > >> > *"sun.misc.Launcher$AppClassLoader
> > >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory
> > is
> > >> > > accumulated in one instance of *"java.util.HashMap$Node[]"* loaded
> > by
> > >> > *"<system
> > >> > > class loader>"*.
> > >> > >
> > >> > > *Keywords*
> > >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98
> > >> > > java.util.HashMap$Node[]
> > >> > > org.apache.storm.kafka.spout.KafkaSpout
> > >> > >
> > >> > > ============================================================
> > =========
> > >> > >
> > >> > > See attached screenshots of EclipseMAT session showing graphical
> > >> > > representation of memory usage
> > >> > >
> > >> > > FYI we tried to follow instructions from
> > >> https://docs.hortonworks.com/
> > >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/
> > >> > > content/storm-kafkaspout-perf.html to avoid the use of too much
> > >> memory,
> > >> > > but still after 2 hours the memory fills up and the process
> hosting
> > >> our
> > >> > > spout is killed by Supervisor...
> > >> > >
> > >> > > Any clue of what we may have missed?
> > >> > >
> > >> > > Best regards,
> > >> > > Alexandre Vermeerbergen
> > >> > >
> > >> > >
> > >> > >
> > >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre
> > Vermeerbergen
> > >> <
> > >> > > [email protected]>:
> > >> > >
> > >> > >> Oops, sent my last mail too fast, let me continue it:
> > >> > >>
> > >> > >> Hello,
> > >> > >>
> > >> > >> Coming back to my original post in this list, we have 3 issues
> with
> > >> > >> latest 1.1.x StormKafkaClient spout with our setup:
> > >> > >>
> > >> > >> Issue#1:
> > >> > >>  Initial lag (which we hadn't using the classic Storm Kafka
> spout)
> > >> > >>    For this issue, my understanding of Kristopher's answer is
> that
> > >> this
> > >> > >> is "by design" of the StormKafkaClient spout, which instances
> > >> > progressively
> > >> > >> joins Kafka consumers group, which causes consumers rebalancing.
> > This
> > >> > >> rebalancing is "slow", which means that until all spout instances
> > are
> > >> > >> started, the topology starts with an "initial Kafka Lag"
> > >> > >>    => Is my understanding correct?
> > >> > >>    => Why don't we have such behavior with the old Storm Kafka
> > spout
> > >> ?
> > >> > >>    => Is this annoying initial lag tracked by a JIRA ?
> > >> > >>
> > >> > >> Issue#2:
> > >> > >>     The kafka Lag is increasing constantly and this leads to the
> > >> > overload
> > >> > >> of the storm worker running the kafka spout. At the end, the
> worker
> > >> > crashes
> > >> > >> and it is automatically restarted by Storm.
> > >> > >>     => This is unlike what we observe with the old Storm Kafka
> > spout
> > >> > >>     => What is the recommended way to analyze this issue?
> > >> > >>
> > >> > >> Issue3:
> > >> > >>   With the new Kafka Spout, we have faced this exception many
> > times:
> > >> > >>
> > >> > >>  org.apache.kafka.clients.consumer.CommitFailedException: Commit
> > >> cannot
> > >> > >> be completed since the group has already rebalanced and assigned
> > the
> > >> > >> partitions to another member. This means that the time between
> > >> > subsequent
> > >> > >> calls to poll() was longer than the configured
> > max.poll.interval.ms,
> > >> > >> which typically implies that the poll loop is spending too much
> > time
> > >> > >> message processing. You can address this either by increasing the
> > >> > session
> > >> > >> timeout or by reducing the maximum size of batches returned in
> > poll()
> > >> > with
> > >> > >> max.poll.records. at org.apache.kafka.clients.consu
> > >> > >>
> > >> > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > >> nsumerCoordinator.java:702)
> > >> > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> > >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
> > >> > >>
> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > >> KafkaConsumer.java:1124)
> > >> > >> at
> > >> > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
> > >> dTuples(KafkaSpout.java:384)
> > >> > >> at
> > >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(
> > KafkaSpout.java:220)
> > >> > >> at
> > >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> > >> 10826.invoke(executor.clj:646)
> > >> > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484)
> > at
> > >> > >> clojure.lang.AFn.run(AFn.java:22) at
> > java.lang.Thread.run(Thread.ja
> > >> > >> va:748)
> > >> > >>
> > >> > >>
> > >> > >>   => Are we the only ones experiencing such issues with Storm
> > >> > 1.1.0/1.1.x
> > >> > >> latest ?
> > >> > >>
> > >> > >> Note: We are considering writing our own Kafka Spout, as we're
> > >> > time-bound
> > >> > >> to move to Kafka 0.10.x consumers & producers (to prepare our
> next
> > >> step
> > >> > >> with Kafka security, which isn't available with Kafka 0.9.x). We
> > will
> > >> > miss
> > >> > >> the integration of Kafka lag in StormUI, but currently we do not
> > >> > understand
> > >> > >> how to solve the regressions we observe with latest Storm Kafka
> > >> client
> > >> > >> spout.
> > >> > >>
> > >> > >> Are there other Storm developers/users who jumped into this
> > >> alternative?
> > >> > >>
> > >> > >> Best regards,
> > >> > >>
> > >> > >> Alexandre Vermeerbergen
> > >> > >>
> > >> > >>
> > >> > >>
> > >> > >>
> > >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre
> > >> Vermeerbergen <
> > >> > >> [email protected]>:
> > >> > >>
> > >> > >>> Hello,
> > >> > >>>
> > >> > >>> Coming back to my original post in this list, we have two issues
> > >> with
> > >> > >>> latest 1.1.x StormKafkaClient spout with our setup:
> > >> > >>>
> > >> > >>> Issue#1:
> > >> > >>>  Initial lag (which we hadn't using the classic Storm Kafka
> spout)
> > >> > >>>    For this issue, my understanding of Kristopher's answer is
> that
> > >> this
> > >> > >>> is "by design" of the StormKafkaClient spout, which instances
> > >> > progressively
> > >> > >>> joins Kafka consumers group, which causes consumers rebalancing.
> > >> This
> > >> > >>> rebalancing is "slow", which means that until all spout
> instances
> > >> are
> > >> > >>> started, the topology starts with an "initial Kafka Lag"
> > >> > >>>    => Is my understanding correct?
> > >> > >>>    => Why don't we have such behavior with the old Storm Kafka
> > >> spout ?
> > >> > >>>    => Is this annoying initial lag tracked by a JIRA ?
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
> > >> > >>> [email protected]>:
> > >> > >>>
> > >> > >>>> Hello Kristopher,
> > >> > >>>>
> > >> > >>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26
> > >> > >>>> <20%2017%2006%2026>)  artifacts downloaded from
> > >> > >>>> https://github.com/apache/storm/tree/1.x-branch.
> > >> > >>>> <https://github.com/apache/storm/tree/1.x-branch>
> > >> > >>>>
> > >> > >>>> Is your latest PR supposed to be in what we downloaded & built,
> > or
> > >> do
> > >> > >>>> we need to upgrade in some way (which?)
> > >> > >>>>
> > >> > >>>> Should we change anything to our settings?
> > >> > >>>>
> > >> > >>>> Please note that I mistakenly wrote that our "Kafka consumer
> > >> strategy
> > >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is set
> to
> > >> > LATEST",
> > >> > >>>> if that matters.
> > >> > >>>>
> > >> > >>>> Best regards,
> > >> > >>>> Alexandre Vermeerbergen
> > >> > >>>>
> > >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <
> [email protected]
> > >:
> > >> > >>>>
> > >> > >>>>> Correction: https://github.com/apache/storm/pull/2174 has all
> > of
> > >> > what
> > >> > >>>>> I was
> > >> > >>>>> doing and more.
> > >> > >>>>>
> > >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <
> > >> > [email protected]
> > >> > >>>>> >
> > >> > >>>>> wrote:
> > >> > >>>>>
> > >> > >>>>> > Alexandre,
> > >> > >>>>> >
> > >> > >>>>> > There are quite a few JIRAs and discussions around this
> > >> recently.
> > >> > >>>>> The
> > >> > >>>>> > default behavior for storm-kafka-client is the 'subscribe'
> API
> > >> > which
> > >> > >>>>> causes
> > >> > >>>>> > the immediate lag you see since rebalance will happen
> > spout(n)-1
> > >> > >>>>> times
> > >> > >>>>> > just from the spouts spinning up.
> > >> > >>>>> >
> > >> > >>>>> > There is a Builder for ManualPartitionNamedSubscription and
> > the
> > >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka
> > >> consumer
> > >> > >>>>> API) but
> > >> > >>>>> > they don't work at all.  I hope to have a PR in today
> > >> > >>>>> > to fix these on 1.x-branch
> > >> > >>>>> >
> > >> > >>>>> > The other JIRAs I mentioned are for a redesign of this spout
> > or
> > >> > >>>>> other more
> > >> > >>>>> > drastic changes.  My goal is a bug fix for a version of the
> > >> spout
> > >> > >>>>> that
> > >> > >>>>> > doesn't provide unnecessary duplicates.
> > >> > >>>>> >
> > >> > >>>>> > Kris
> > >> > >>>>> >
> > >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
> > >> > >>>>> > [email protected]> wrote:
> > >> > >>>>> >
> > >> > >>>>> >> Hello All,
> > >> > >>>>> >>
> > >> > >>>>> >> We have been running for a while our real-time supervision
> > >> > >>>>> application
> > >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old
> > >> consumer:
> > >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster based on
> > Apache
> > >> > Kafka
> > >> > >>>>> >> 0.10.1.0.
> > >> > >>>>> >>
> > >> > >>>>> >> Backpressure is activated with default parameters.
> > >> > >>>>> >>
> > >> > >>>>> >>  Key
> > >> > >>>>> >> Value
> > >> > >>>>> >>
> > >> > >>>>> >> backpressure.disruptor.high.watermark              0.9
> > >> > >>>>> >>
> > >> > >>>>> >> backpressure.disruptor.low.watermark               0.4
> > >> > >>>>> >>
> > >> > >>>>> >> task.backpressure.poll.secs
> > >>  30
> > >> > >>>>> >>
> > >> > >>>>> >> topology.backpressure.enable
> >  true
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from
> > the
> > >> > new
> > >> > >>>>> Kafka
> > >> > >>>>> >> Spout  (storm-kafka-client lib) with a consumer which has
> no
> > >> more
> > >> > >>>>> >> dependency on Zookeeper.
> > >> > >>>>> >>
> > >> > >>>>> >> After upgrade, we had several issues with kafka
> consumption.
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> We saw that several JIRAs were opened and resolved on
> Apache
> > >> Storm
> > >> > >>>>> 1.1.1.
> > >> > >>>>> >>
> > >> > >>>>> >> So we decided to upgrade to the latest available Apache
> Storm
> > >> > 1.1.x
> > >> > >>>>> code
> > >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but  we
> > >> still
> > >> > >>>>> have issues :
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> 1. The kafka Lag is increasing constantly and this leads to
> > the
> > >> > >>>>> overload
> > >> > >>>>> >> of
> > >> > >>>>> >> the storm worker running the kafka spout. At the end, the
> > >> worker
> > >> > >>>>> crashes
> > >> > >>>>> >> and it is automatically restarted by Storm.
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> With old kafka spout version, we had a lag most of the
> times
> > >> > bellow
> > >> > >>>>> 10000.
> > >> > >>>>> >>
> > >> > >>>>> >> With the new one, we are starting with Kafka lag about
> 30000
> > >> and
> > >> > >>>>> >> increasing
> > >> > >>>>> >> until crash.
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> 2. With the new Kafka Spout, we have faced this exception
> > many
> > >> > >>>>> times:
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> org.apache.kafka.clients.consumer.CommitFailedException:
> > >> Commit
> > >> > >>>>> cannot be
> > >> > >>>>> >> completed since the group has already rebalanced and
> assigned
> > >> the
> > >> > >>>>> >> partitions to another member. This means that the time
> > between
> > >> > >>>>> subsequent
> > >> > >>>>> >> calls to poll() was longer than the configured
> > >> > max.poll.interval.ms
> > >> > >>>>> ,
> > >> > >>>>> >> which
> > >> > >>>>> >> typically implies that the poll loop is spending too much
> > time
> > >> > >>>>> message
> > >> > >>>>> >> processing. You can address this either by increasing the
> > >> session
> > >> > >>>>> timeout
> > >> > >>>>> >> or by reducing the maximum size of batches returned in
> poll()
> > >> with
> > >> > >>>>> >> max.poll.records. at
> > >> > >>>>> >> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordina
> > >> > >>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
> > >> > >>>>> >> at
> > >> > >>>>> >> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordina
> > >> > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
> > >> > >>>>> >> at
> > >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(
> > >> > >>>>> >> KafkaConsumer.java:1124)
> > >> > >>>>> >> at
> > >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout.
> commitOffsetsForAcke
> > >> > >>>>> >> dTuples(KafkaSpout.java:384)
> > >> > >>>>> >> at org.apache.storm.kafka.spout.K
> > >> afkaSpout.nextTuple(KafkaSpout
> > >> > >>>>> .java:220)
> > >> > >>>>> >> at
> > >> > >>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> > >> > >>>>> >> 10826.invoke(executor.clj:646)
> > >> > >>>>> >> at org.apache.storm.util$async_lo
> > >> op$fn__555.invoke(util.clj:484)
> > >> > at
> > >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at
> > >> > java.lang.Thread.run(Thread.ja
> > >> > >>>>> >> va:748)
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> We are using the following configuration for Kafka Spout :
> > >> > >>>>> >>
> > >> > >>>>> >> poll.timeout.ms 200.
> > >> > >>>>> >>
> > >> > >>>>> >> offset.commit.period.ms  30000          (30 seconds).
> > >> > >>>>> >>
> > >> > >>>>> >> max.uncommitted.offsets 10000000  (ten million).
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >>
> > >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We also set
> the
> > >> > >>>>> following
> > >> > >>>>> >> Kafka consumer parameter :
> > >> > >>>>> >>
> > >> > >>>>> >> session.timeout.ms  120000
> > >> > >>>>> >>
> > >> > >>>>> >> Are there any traces/options which we could turn on on
> Storm
> > >> or on
> > >> > >>>>> Kafka
> > >> > >>>>> >> Broker that might help understanding how to stabilize our
> > >> > >>>>> topologies with
> > >> > >>>>> >> this new branch?
> > >> > >>>>> >>
> > >> > >>>>> >> Thanks!
> > >> > >>>>> >>
> > >> > >>>>> >> Alexandre Vermeerbergen
> > >> > >>>>> >>
> > >> > >>>>> >
> > >> > >>>>> >
> > >> > >>>>>
> > >> > >>>>
> > >> > >>>>
> > >> > >>>
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to