Regarding the offsetManagers map, I think the only thing it contains that
should take up any space is uncommitted offsets. Those should be cleared
out when the spout commits offsets to Kafka. There have been at least a few
issues related to that recently, e.g.
https://github.com/apache/storm/pull/2153, which may be what you're running
into. The spout should be able to limit how many uncommitted offsets it
will accept before slowing down, but this functionality is still buggy in
some cases (see https://github.com/apache/storm/pull/2156,
https://github.com/apache/storm/pull/2152).

The storm-kafka-client spout doesn't share code with storm-kafka, it's an
implementation based on a different Kafka client (this one
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html),
where the storm-kafka client used the now deprecated
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.
There are still some bugs on the new spout we're working out.

Auto commit is not enabled in storm-kafka, because the SimpleConsumer
doesn't have that concept. The old spout kept track of processed offsets by
storing how far it had read in Zookeeper. The new client allows us to mark
how far the spout has read in Kafka, which can either be done via the
commit methods on the KafkaConsumer (auto commit disabled), or periodically
by a background thread in the consumer (auto commit enabled). Normally when
the spout emits a message into the topology, we want to only mark that
message as "done" once Storm tells the spout to ack the tuple. This is the
behavior when auto commit is disabled; the spout emits some tuples,
receives some acks, and periodically uses the commitSync method to mark the
acked tuples as "done" in Kafka. If auto commit is enabled, the
KafkaConsumer will instead periodically commit the tuples it has returned
from polls. The effect is that Storm can't control when an offset is
considered "done", which means that some messages can be lost. This is fine
for some use cases where you don't necessarily need to process all messages.

Leaving auto commit disabled is the closest match to how the storm-kafka
spout handled messages. We disable auto commit by default in the new spout
because at-least-once processing guarantees should be the default, not
something people have to opt in to.

2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen <[email protected]
>:

> Hello,
>
> To continue on this thread:
>
> Since we are facing some OOMEs and Kafka lag peaks with our topologies
> using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on Kafka
> 0.10.2.0 libs), we decided to test a much simpler Storm topology.
>
>  This test topology is very basic and is composed of :
>
> - one instance of Kafka spout consuming a topic composed of 4 partitions
> with a throughput of about 3000 messages per second and with message size
> of about 1500 bytes, the consumption strategy is set to "LATEST", other
> Kafka Spout parameters are default ones
>
> - one basic bolt which is just counting received tuples and logging
> statistics every minutes (min ,max, avg for number of tuples received per
> min) with acking every tuple
>
> Back pressure is activated with default watermark parameters.
>
> topology.message.timeout.secs is not changed (default value is 30).
>
>  Message are encrypted (using our own encryption method) serialized java
> objects but we are just using the provided ByteArrayDeserializer for this
> test since we just want to count tuples without any custom deserialization
> interference.
>
> This topology is running alone on a single VM (8 vCPU) with one worker
> configured with 2 GB of RAM for Xmx JVM option.
>
> Please also note that our cluster of Kafka Brokers is based on Apache Kafka
> 0.10.1.0
>
>
>
> We are tracking the Kafka lag for the topic we are consuming.
>
> Kafka lag is 10000 at topology start but it decreased slowly to 2000 after
> 24h of execution.
>
>
>
> We did not face any OOM error this time so we are suspecting that the
> previous OOMEs were due to some lag in the much more complex topology we
> were running and due to Storm backpressure slowing down the Spout
> consumption.
>
> In fact, it seems that the following Map in the
> org.apache.storm.kafka.spout.KafkaSpout class was the root cause of the
> errors :
>
>  ============================================================
> =========================================
>
> // Tuples that were successfully acked/emitted. These tuples will be
> committed periodically when the commit timer expires, //or after a consumer
> rebalance, or during close/deactivate private transient Map<TopicPartition,
> OffsetManager> offsetManagers;
> ============================================================
> =========================================
>
>
>
> Please note however that these errors did not occur with the "old" version
> of Kafka spout (storm-kafka lib).
>
> We ran another test with the same test topology but with enable.auto.commit
> set to true in Kafka consumer properties, lag was starting at about 2000
> and staying constant for at least 24h.
>
> Our understanding is that auto commit is enabled by default in Kafka
> consumers and also in old version of Kafka spouts (storm-kafka) but it is
> disabled in the new Kafka spout version (storm-kafka-client) to manage
> tuple acking, is it correct please ?
>
>  What are the consequences of activating auto commit for consumers with the
> new Kafka Spout then ?
>
>  Thank you for your feedbacks,
>
> Alexandre Vermeerbergen
>
> 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]
> >:
>
> > Hello All,
> >
> > Thank you very much for your feedbacks on our experience with
> > storm-kafka-client, we're going to take your suggestions into account to
> > dig into our issues and we'll feedback as soon as we have more to share.
> >
> > Best regards,
> > Alexandre Vermeerbergen
> >
> > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>:
> >
> >> Hi Alexandre,
> >>
> >> I don't know much of storm-kafka-client, but at a glimpse, I can't find
> >> misuse of HashMap in KafkaSpout so I'd rather suspect that OffsetManager
> >> being really huge. If you are willing to dig more on KafkaSpout OOME
> >> issue,
> >> you can get more information of KafkaSpout for tracking with changing
> log
> >> level to DEBUG or even TRACE.
> >>
> >> Thanks,
> >> Jungtaek Lim (HeartSaVioR)
> >>
> >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <[email protected]>님이 작성:
> >>
> >> > Hi Alexandre,
> >> >
> >> > About issue 1:
> >> > This issue is not by design. It is a side effect of the spout
> internally
> >> > using the KafkaConsumer's subscribe API instead of the assign API.
> >> Support
> >> > for using the assign API was added a while back, but has a bug that is
> >> > preventing the spout from starting when configured to use that API. We
> >> are
> >> > working on fixing the issues with that implementation in these PRs
> >> > https://github.com/apache/storm/pull/2150, https://github.com/apache/
> >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174>. I think
> it
> >> > is very likely that we will remove support for
> >> > the subscribe API at some point as well, making the assign API the
> >> default,
> >> > since several users have had issues with the subscribe API's behavior.
> >> >
> >> > Once the assign API support is fixed, you can switch to using it via
> >> this
> >> > KafkaSpoutConfig Builder constructor https://github.com/apache/
> >> > storm/blob/master/external/storm-kafka-client/src/main/
> >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and this
> >> > Subscription implementation https://github.com/srdo/storm/blob/
> >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm-
> >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> >> > ManualPartitionSubscription.java
> >> >
> >> > If you'd like to try out the code from the PR branch, you can check it
> >> out
> >> > with some of the steps described here
> >> >
> >> > https://help.github.com/articles/checking-out-pull-requests-
> >> locally/#modifying-an-inactive-pull-request-locally
> >> > .
> >> >
> >> > Note that the PRs for fixing the manual partitioning option are
> against
> >> the
> >> > master branch right now, which is targeting Storm version 2.0.0, but I
> >> > believe you may be able to use the 2.0.0 storm-kafka-client jar with a
> >> > 1.1.0 cluster.
> >> >
> >> > Switching to the assign API should solve remove the instability as the
> >> > spouts start.
> >> >
> >> > About issue 2:
> >> > The spout lag shouldn't have an effect on memory use. I'm wondering if
> >> your
> >> > spout instances are not progressing at all, which might explain the
> lag?
> >> > You should be able to check this using the kafka-consumer-groups.sh
> >> script
> >> > in the Kafka /bin directory. Once you've started the spout, you can
> use
> >> the
> >> > script to inspect which offsets the consumer group have committed. Try
> >> > checking if the offsets are moving once the spouts have started
> running.
> >> >
> >> > I can't spout any suspicious use of HashMap in the 1.x KafkaSpout.
> Your
> >> > attachment didn't make it through, could you post it somewhere?
> >> >
> >> > About issue 3:
> >> > The CommitException you are getting is likely because we use the
> >> subscribe
> >> > API. When using the subscribe API Kafka is in control of partition
> >> > assigment, and will reassign partitions if a consumer doesn't call
> poll
> >> on
> >> > the consumer often enough. The default is that the consumer must be
> >> polled
> >> > at least every 5 minutes. See max.poll.interval.ms in the Kafka
> >> consumer
> >> > configuration. The assign API doesn't require this, and won't shut
> down
> >> > spouts because they are too slow.
> >> >
> >> > There's likely still another underlying issue, because it seems
> strange
> >> > that your spout instances are not polling at least once per 5 minutes,
> >> at
> >> > least if you didn't set a high max.poll.records. It's almost certainly
> >> > related to issue 2. Do you have acking enabled, and what is your
> >> > topology.message.timeout.secs?
> >> >
> >> > I'm not sure I understand why you would want to write your own spout
> >> from
> >> > scratch, rather than contributing fixes to the storm-kafka-client
> >> spout. It
> >> > seems likely to be more effort than fixing the problems with
> >> > storm-kafka-client.
> >> >
> >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen <
> >> > [email protected]
> >> > >:
> >> >
> >> > > More about this thread: we noticed that with StormKafkaClient 1.1.x
> >> > > latest, we get OutOfMemoryError after ~2hours of running our simple
> >> test
> >> > > topology.
> >> > >
> >> > > We reproduce it everytime, so we decided to generate a heap dump
> >> before
> >> > > the OutOfMemoryError, and viewed the result using EclipseMAT.
> >> > >
> >> > > The results tends to show that there's a memory leak in
> >> KafkaSpoutClient:
> >> > > ============================================================
> =========
> >> > >
> >> > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* loaded
> by
> >> > *"sun.misc.Launcher$AppClassLoader
> >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory
> is
> >> > > accumulated in one instance of *"java.util.HashMap$Node[]"* loaded
> by
> >> > *"<system
> >> > > class loader>"*.
> >> > >
> >> > > *Keywords*
> >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98
> >> > > java.util.HashMap$Node[]
> >> > > org.apache.storm.kafka.spout.KafkaSpout
> >> > >
> >> > > ============================================================
> =========
> >> > >
> >> > > See attached screenshots of EclipseMAT session showing graphical
> >> > > representation of memory usage
> >> > >
> >> > > FYI we tried to follow instructions from
> >> https://docs.hortonworks.com/
> >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/
> >> > > content/storm-kafkaspout-perf.html to avoid the use of too much
> >> memory,
> >> > > but still after 2 hours the memory fills up and the process hosting
> >> our
> >> > > spout is killed by Supervisor...
> >> > >
> >> > > Any clue of what we may have missed?
> >> > >
> >> > > Best regards,
> >> > > Alexandre Vermeerbergen
> >> > >
> >> > >
> >> > >
> >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre
> Vermeerbergen
> >> <
> >> > > [email protected]>:
> >> > >
> >> > >> Oops, sent my last mail too fast, let me continue it:
> >> > >>
> >> > >> Hello,
> >> > >>
> >> > >> Coming back to my original post in this list, we have 3 issues with
> >> > >> latest 1.1.x StormKafkaClient spout with our setup:
> >> > >>
> >> > >> Issue#1:
> >> > >>  Initial lag (which we hadn't using the classic Storm Kafka spout)
> >> > >>    For this issue, my understanding of Kristopher's answer is that
> >> this
> >> > >> is "by design" of the StormKafkaClient spout, which instances
> >> > progressively
> >> > >> joins Kafka consumers group, which causes consumers rebalancing.
> This
> >> > >> rebalancing is "slow", which means that until all spout instances
> are
> >> > >> started, the topology starts with an "initial Kafka Lag"
> >> > >>    => Is my understanding correct?
> >> > >>    => Why don't we have such behavior with the old Storm Kafka
> spout
> >> ?
> >> > >>    => Is this annoying initial lag tracked by a JIRA ?
> >> > >>
> >> > >> Issue#2:
> >> > >>     The kafka Lag is increasing constantly and this leads to the
> >> > overload
> >> > >> of the storm worker running the kafka spout. At the end, the worker
> >> > crashes
> >> > >> and it is automatically restarted by Storm.
> >> > >>     => This is unlike what we observe with the old Storm Kafka
> spout
> >> > >>     => What is the recommended way to analyze this issue?
> >> > >>
> >> > >> Issue3:
> >> > >>   With the new Kafka Spout, we have faced this exception many
> times:
> >> > >>
> >> > >>  org.apache.kafka.clients.consumer.CommitFailedException: Commit
> >> cannot
> >> > >> be completed since the group has already rebalanced and assigned
> the
> >> > >> partitions to another member. This means that the time between
> >> > subsequent
> >> > >> calls to poll() was longer than the configured
> max.poll.interval.ms,
> >> > >> which typically implies that the poll loop is spending too much
> time
> >> > >> message processing. You can address this either by increasing the
> >> > session
> >> > >> timeout or by reducing the maximum size of batches returned in
> poll()
> >> > with
> >> > >> max.poll.records. at org.apache.kafka.clients.consu
> >> > >>
> >> > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> >> nsumerCoordinator.java:702)
> >> > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
> >> > >>
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> >> KafkaConsumer.java:1124)
> >> > >> at
> >> > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
> >> dTuples(KafkaSpout.java:384)
> >> > >> at
> >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(
> KafkaSpout.java:220)
> >> > >> at
> >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> >> 10826.invoke(executor.clj:646)
> >> > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484)
> at
> >> > >> clojure.lang.AFn.run(AFn.java:22) at
> java.lang.Thread.run(Thread.ja
> >> > >> va:748)
> >> > >>
> >> > >>
> >> > >>   => Are we the only ones experiencing such issues with Storm
> >> > 1.1.0/1.1.x
> >> > >> latest ?
> >> > >>
> >> > >> Note: We are considering writing our own Kafka Spout, as we're
> >> > time-bound
> >> > >> to move to Kafka 0.10.x consumers & producers (to prepare our next
> >> step
> >> > >> with Kafka security, which isn't available with Kafka 0.9.x). We
> will
> >> > miss
> >> > >> the integration of Kafka lag in StormUI, but currently we do not
> >> > understand
> >> > >> how to solve the regressions we observe with latest Storm Kafka
> >> client
> >> > >> spout.
> >> > >>
> >> > >> Are there other Storm developers/users who jumped into this
> >> alternative?
> >> > >>
> >> > >> Best regards,
> >> > >>
> >> > >> Alexandre Vermeerbergen
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre
> >> Vermeerbergen <
> >> > >> [email protected]>:
> >> > >>
> >> > >>> Hello,
> >> > >>>
> >> > >>> Coming back to my original post in this list, we have two issues
> >> with
> >> > >>> latest 1.1.x StormKafkaClient spout with our setup:
> >> > >>>
> >> > >>> Issue#1:
> >> > >>>  Initial lag (which we hadn't using the classic Storm Kafka spout)
> >> > >>>    For this issue, my understanding of Kristopher's answer is that
> >> this
> >> > >>> is "by design" of the StormKafkaClient spout, which instances
> >> > progressively
> >> > >>> joins Kafka consumers group, which causes consumers rebalancing.
> >> This
> >> > >>> rebalancing is "slow", which means that until all spout instances
> >> are
> >> > >>> started, the topology starts with an "initial Kafka Lag"
> >> > >>>    => Is my understanding correct?
> >> > >>>    => Why don't we have such behavior with the old Storm Kafka
> >> spout ?
> >> > >>>    => Is this annoying initial lag tracked by a JIRA ?
> >> > >>>
> >> > >>>
> >> > >>>
> >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
> >> > >>> [email protected]>:
> >> > >>>
> >> > >>>> Hello Kristopher,
> >> > >>>>
> >> > >>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26
> >> > >>>> <20%2017%2006%2026>)  artifacts downloaded from
> >> > >>>> https://github.com/apache/storm/tree/1.x-branch.
> >> > >>>> <https://github.com/apache/storm/tree/1.x-branch>
> >> > >>>>
> >> > >>>> Is your latest PR supposed to be in what we downloaded & built,
> or
> >> do
> >> > >>>> we need to upgrade in some way (which?)
> >> > >>>>
> >> > >>>> Should we change anything to our settings?
> >> > >>>>
> >> > >>>> Please note that I mistakenly wrote that our "Kafka consumer
> >> strategy
> >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is set to
> >> > LATEST",
> >> > >>>> if that matters.
> >> > >>>>
> >> > >>>> Best regards,
> >> > >>>> Alexandre Vermeerbergen
> >> > >>>>
> >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]
> >:
> >> > >>>>
> >> > >>>>> Correction: https://github.com/apache/storm/pull/2174 has all
> of
> >> > what
> >> > >>>>> I was
> >> > >>>>> doing and more.
> >> > >>>>>
> >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <
> >> > [email protected]
> >> > >>>>> >
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> > Alexandre,
> >> > >>>>> >
> >> > >>>>> > There are quite a few JIRAs and discussions around this
> >> recently.
> >> > >>>>> The
> >> > >>>>> > default behavior for storm-kafka-client is the 'subscribe' API
> >> > which
> >> > >>>>> causes
> >> > >>>>> > the immediate lag you see since rebalance will happen
> spout(n)-1
> >> > >>>>> times
> >> > >>>>> > just from the spouts spinning up.
> >> > >>>>> >
> >> > >>>>> > There is a Builder for ManualPartitionNamedSubscription and
> the
> >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka
> >> consumer
> >> > >>>>> API) but
> >> > >>>>> > they don't work at all.  I hope to have a PR in today
> >> > >>>>> > to fix these on 1.x-branch
> >> > >>>>> >
> >> > >>>>> > The other JIRAs I mentioned are for a redesign of this spout
> or
> >> > >>>>> other more
> >> > >>>>> > drastic changes.  My goal is a bug fix for a version of the
> >> spout
> >> > >>>>> that
> >> > >>>>> > doesn't provide unnecessary duplicates.
> >> > >>>>> >
> >> > >>>>> > Kris
> >> > >>>>> >
> >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
> >> > >>>>> > [email protected]> wrote:
> >> > >>>>> >
> >> > >>>>> >> Hello All,
> >> > >>>>> >>
> >> > >>>>> >> We have been running for a while our real-time supervision
> >> > >>>>> application
> >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old
> >> consumer:
> >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster based on
> Apache
> >> > Kafka
> >> > >>>>> >> 0.10.1.0.
> >> > >>>>> >>
> >> > >>>>> >> Backpressure is activated with default parameters.
> >> > >>>>> >>
> >> > >>>>> >>  Key
> >> > >>>>> >> Value
> >> > >>>>> >>
> >> > >>>>> >> backpressure.disruptor.high.watermark              0.9
> >> > >>>>> >>
> >> > >>>>> >> backpressure.disruptor.low.watermark               0.4
> >> > >>>>> >>
> >> > >>>>> >> task.backpressure.poll.secs
> >>  30
> >> > >>>>> >>
> >> > >>>>> >> topology.backpressure.enable
>  true
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from
> the
> >> > new
> >> > >>>>> Kafka
> >> > >>>>> >> Spout  (storm-kafka-client lib) with a consumer which has no
> >> more
> >> > >>>>> >> dependency on Zookeeper.
> >> > >>>>> >>
> >> > >>>>> >> After upgrade, we had several issues with kafka consumption.
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> We saw that several JIRAs were opened and resolved on Apache
> >> Storm
> >> > >>>>> 1.1.1.
> >> > >>>>> >>
> >> > >>>>> >> So we decided to upgrade to the latest available Apache Storm
> >> > 1.1.x
> >> > >>>>> code
> >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but  we
> >> still
> >> > >>>>> have issues :
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> 1. The kafka Lag is increasing constantly and this leads to
> the
> >> > >>>>> overload
> >> > >>>>> >> of
> >> > >>>>> >> the storm worker running the kafka spout. At the end, the
> >> worker
> >> > >>>>> crashes
> >> > >>>>> >> and it is automatically restarted by Storm.
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> With old kafka spout version, we had a lag most of the times
> >> > bellow
> >> > >>>>> 10000.
> >> > >>>>> >>
> >> > >>>>> >> With the new one, we are starting with Kafka lag about 30000
> >> and
> >> > >>>>> >> increasing
> >> > >>>>> >> until crash.
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> 2. With the new Kafka Spout, we have faced this exception
> many
> >> > >>>>> times:
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> org.apache.kafka.clients.consumer.CommitFailedException:
> >> Commit
> >> > >>>>> cannot be
> >> > >>>>> >> completed since the group has already rebalanced and assigned
> >> the
> >> > >>>>> >> partitions to another member. This means that the time
> between
> >> > >>>>> subsequent
> >> > >>>>> >> calls to poll() was longer than the configured
> >> > max.poll.interval.ms
> >> > >>>>> ,
> >> > >>>>> >> which
> >> > >>>>> >> typically implies that the poll loop is spending too much
> time
> >> > >>>>> message
> >> > >>>>> >> processing. You can address this either by increasing the
> >> session
> >> > >>>>> timeout
> >> > >>>>> >> or by reducing the maximum size of batches returned in poll()
> >> with
> >> > >>>>> >> max.poll.records. at
> >> > >>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> > >>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
> >> > >>>>> >> at
> >> > >>>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
> >> > >>>>> >> at
> >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> >> > >>>>> >> KafkaConsumer.java:1124)
> >> > >>>>> >> at
> >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
> >> > >>>>> >> dTuples(KafkaSpout.java:384)
> >> > >>>>> >> at org.apache.storm.kafka.spout.K
> >> afkaSpout.nextTuple(KafkaSpout
> >> > >>>>> .java:220)
> >> > >>>>> >> at
> >> > >>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> >> > >>>>> >> 10826.invoke(executor.clj:646)
> >> > >>>>> >> at org.apache.storm.util$async_lo
> >> op$fn__555.invoke(util.clj:484)
> >> > at
> >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at
> >> > java.lang.Thread.run(Thread.ja
> >> > >>>>> >> va:748)
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> We are using the following configuration for Kafka Spout :
> >> > >>>>> >>
> >> > >>>>> >> poll.timeout.ms 200.
> >> > >>>>> >>
> >> > >>>>> >> offset.commit.period.ms  30000          (30 seconds).
> >> > >>>>> >>
> >> > >>>>> >> max.uncommitted.offsets 10000000  (ten million).
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >>
> >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We also set the
> >> > >>>>> following
> >> > >>>>> >> Kafka consumer parameter :
> >> > >>>>> >>
> >> > >>>>> >> session.timeout.ms  120000
> >> > >>>>> >>
> >> > >>>>> >> Are there any traces/options which we could turn on on Storm
> >> or on
> >> > >>>>> Kafka
> >> > >>>>> >> Broker that might help understanding how to stabilize our
> >> > >>>>> topologies with
> >> > >>>>> >> this new branch?
> >> > >>>>> >>
> >> > >>>>> >> Thanks!
> >> > >>>>> >>
> >> > >>>>> >> Alexandre Vermeerbergen
> >> > >>>>> >>
> >> > >>>>> >
> >> > >>>>> >
> >> > >>>>>
> >> > >>>>
> >> > >>>>
> >> > >>>
> >> > >>
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to