Okay, I think this might be fixable then.

I think storm-kafka-client in autocommit mode should also raise "fails"
> whenever it has Kafka consumption issues.  Is it the case with current
> implementation?

Acking and failing tuples only happens once a tuple has been emitted from
the spout, so the spout doesn't have the behavior you describe. If the
spout is having problems acquiring messages internally, it will not be
reflected in acks/fails. If the problem the spout is having causes an
exception, you should be able to see this in Storm UI. If the problem is
some sort of delay or hanging, you should be able to see this by the Kafka
lag, or the spout's emit rate dropping.

In order to support having acks/fails and complete latency when using auto
commit, we need to make the spout always emit tuples with anchors. Since
the spout doesn't care about acks or fails when auto commit is enabled, we
should make the spout ack/fail methods return immediately if auto commit is
enabled.

I think we don't lose much by doing it this way. Users that want the
statistics in Storm UI can enable auto commit and leave topology ackers at
some non-zero number. Users that don't care and don't want the overhead of
having Storm track the tuples can set topology ackers to 0, which should
make the spout behave a lot like it does now. The only use case I can think
of where this won't work is if someone with multiple spouts in a topology
needs one to be auto commit and one to be at-least-once, and they can't
live with the overhead of tuple tracking for the auto commit spout. If this
is a real use case, it can be worked around with an extra configuration
parameter to switch whether tuples are emitted unanchored, but I'd rather
keep it simple for now.

If we want to do this fix I don't think we need to break the API, so it
could probably go in 1.1.1. I've only given it a cursory look though, so
take that statement with a grain of salt :)

2017-07-20 9:30 GMT+02:00 Alexandre Vermeerbergen <[email protected]>
:

> Hello Stig,
>
> Thank you very much for your detailed answer.
>
> Yes, we get the same behavior as yours in Storm UI using storm-kafka-client
> in autocommit mode : the count of emitted & transferred tuples is
> available, but we have 0 acks, 0 failed and "complete latency" remains at
> 0.
>
> And YES, we need the complete latency to be functional while using
> storm-kafka-client with autocommit.
>
> I think storm-kafka-client in autocommit mode should also raise "fails"
> whenever it has Kafka consumption issues.  Is it the case with current
> implementation?
>
> Last, would it be possible to have complete latency for storm-kafka-client
> spouts in autocommit mode for Storm 1.1.1 release ?
>
> Best regards,
> Alexandre Vermeerbergen
>
>
> 2017-07-19 15:55 GMT+02:00 Stig Rohde Døssing <[email protected]>:
>
> > Hi Alexandre,
> >
> > Happy to hear that the spout is working better for you. I think that if
> you
> > don't need at-least-once, then using auto commit is a good option.
> >
> > When you enable auto commit, the spout emits tuples unanchored (see the
> > calls to collector.emit here
> > https://github.com/apache/storm/blob/master/external/
> > storm-kafka-client/src/main/java/org/apache/storm/kafka/
> > spout/KafkaSpout.java#L346).
> > This causes Storm to not bother tracking acks and fails for the emitted
> > tuples, which means the counters in Storm UI for acks, fails and complete
> > latency are zeroed out. The complete latency doesn't make sense to track
> if
> > the tuples are not acked. The rest of the statistics should still be
> there.
> > I think we implemented it this way from the reasoning that it doesn't
> make
> > sense to make Storm track and ack/fail tuples if the spout is configured
> to
> > not care about failures.
> >
> > I tried out a small test topology on a local 2.0.0 Storm setup using auto
> > commit, and here's what I'm seeing: http://imgur.com/a/CoJBa. As you can
> > see the execute latency and emit/transfer counters still work. Is this
> > different from what you're experiencing?
> >
> > If you need the complete latency to be functional while using auto
> commit,
> > we'll need to update the spout so it emits the tuples with anchors, and
> > then ignores the acks/fails.
> >
> > 2017-07-19 10:12 GMT+02:00 Alexandre Vermeerbergen <
> > [email protected]
> > >:
> >
> > > Hello Stig,
> > >
> > > Thank you very much for your detailed explanations about
> > storm-kafka-client
> > > behavior, it helps us a lot.
> > >
> > > We have topologies for which at-least-one matters, so we're OK to try
> to
> > > converge them with this default behavior of storm-kafka-client.
> > >
> > > However, our main topology is our "realtime alerting" one. It evaluates
> > > incoming "metrics" (our system is a Supervision system) against
> > "triggers"
> > > (same meaning as in Zabbix), and here we prefer to drop anything that
> we
> > > couldn't evaluate quickly enough. Indeed, to do "real-time alerting",
> we
> > > know that if we skip a few metrics, then, shortly after, there will be
> > > newer ones that will suffice to get a "near real time enough" view of
> the
> > > "current state of our supervised devices".
> > >
> > > This is the reason why we recently turned on "autocommit" in
> > > storm-kafka-client, to overcome the growing "lag" or OOM we eventually
> > got.
> > >
> > > Now our topology works like a charm.
> > >
> > > Except that with autocommit turned on, no acks are sent from
> > > storm-kafka-client spouts, so Storm UI isn't displaying any statistics
> > > information about our topology, which is a big annoyance for us.
> > >
> > > Question:
> > > * Is our approach to use autocommit OK, or given our scenario, do you
> > > recommend othewise?
> > > * If it's OK to use autocommit, then how can we configure the
> > > storm-kafka-client spout to "blindly" ACK all tuples it sends?
> > >
> > > Best regards,
> > > Alexandre Vermeerbergen
> > >
> > >
> > >
> > >
> > > 2017-07-05 17:22 GMT+02:00 Stig Døssing <[email protected]>:
> > >
> > > > Regarding the offsetManagers map, I think the only thing it contains
> > that
> > > > should take up any space is uncommitted offsets. Those should be
> > cleared
> > > > out when the spout commits offsets to Kafka. There have been at
> least a
> > > few
> > > > issues related to that recently, e.g.
> > > > https://github.com/apache/storm/pull/2153, which may be what you're
> > > > running
> > > > into. The spout should be able to limit how many uncommitted offsets
> it
> > > > will accept before slowing down, but this functionality is still
> buggy
> > in
> > > > some cases (see https://github.com/apache/storm/pull/2156,
> > > > https://github.com/apache/storm/pull/2152).
> > > >
> > > > The storm-kafka-client spout doesn't share code with storm-kafka,
> it's
> > an
> > > > implementation based on a different Kafka client (this one
> > > > https://kafka.apache.org/0100/javadoc/index.html?org/apache/
> > > > kafka/clients/consumer/KafkaConsumer.html),
> > > > where the storm-kafka client used the now deprecated
> > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.
> > > > 0+SimpleConsumer+Example.
> > > > There are still some bugs on the new spout we're working out.
> > > >
> > > > Auto commit is not enabled in storm-kafka, because the SimpleConsumer
> > > > doesn't have that concept. The old spout kept track of processed
> > offsets
> > > by
> > > > storing how far it had read in Zookeeper. The new client allows us to
> > > mark
> > > > how far the spout has read in Kafka, which can either be done via the
> > > > commit methods on the KafkaConsumer (auto commit disabled), or
> > > periodically
> > > > by a background thread in the consumer (auto commit enabled).
> Normally
> > > when
> > > > the spout emits a message into the topology, we want to only mark
> that
> > > > message as "done" once Storm tells the spout to ack the tuple. This
> is
> > > the
> > > > behavior when auto commit is disabled; the spout emits some tuples,
> > > > receives some acks, and periodically uses the commitSync method to
> mark
> > > the
> > > > acked tuples as "done" in Kafka. If auto commit is enabled, the
> > > > KafkaConsumer will instead periodically commit the tuples it has
> > returned
> > > > from polls. The effect is that Storm can't control when an offset is
> > > > considered "done", which means that some messages can be lost. This
> is
> > > fine
> > > > for some use cases where you don't necessarily need to process all
> > > > messages.
> > > >
> > > > Leaving auto commit disabled is the closest match to how the
> > storm-kafka
> > > > spout handled messages. We disable auto commit by default in the new
> > > spout
> > > > because at-least-once processing guarantees should be the default,
> not
> > > > something people have to opt in to.
> > > >
> > > > 2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen <
> > > > [email protected]
> > > > >:
> > > >
> > > > > Hello,
> > > > >
> > > > > To continue on this thread:
> > > > >
> > > > > Since we are facing some OOMEs and Kafka lag peaks with our
> > topologies
> > > > > using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on
> Kafka
> > > > > 0.10.2.0 libs), we decided to test a much simpler Storm topology.
> > > > >
> > > > >  This test topology is very basic and is composed of :
> > > > >
> > > > > - one instance of Kafka spout consuming a topic composed of 4
> > > partitions
> > > > > with a throughput of about 3000 messages per second and with
> message
> > > size
> > > > > of about 1500 bytes, the consumption strategy is set to "LATEST",
> > other
> > > > > Kafka Spout parameters are default ones
> > > > >
> > > > > - one basic bolt which is just counting received tuples and logging
> > > > > statistics every minutes (min ,max, avg for number of tuples
> received
> > > per
> > > > > min) with acking every tuple
> > > > >
> > > > > Back pressure is activated with default watermark parameters.
> > > > >
> > > > > topology.message.timeout.secs is not changed (default value is 30).
> > > > >
> > > > >  Message are encrypted (using our own encryption method) serialized
> > > java
> > > > > objects but we are just using the provided ByteArrayDeserializer
> for
> > > this
> > > > > test since we just want to count tuples without any custom
> > > > deserialization
> > > > > interference.
> > > > >
> > > > > This topology is running alone on a single VM (8 vCPU) with one
> > worker
> > > > > configured with 2 GB of RAM for Xmx JVM option.
> > > > >
> > > > > Please also note that our cluster of Kafka Brokers is based on
> Apache
> > > > Kafka
> > > > > 0.10.1.0
> > > > >
> > > > >
> > > > >
> > > > > We are tracking the Kafka lag for the topic we are consuming.
> > > > >
> > > > > Kafka lag is 10000 at topology start but it decreased slowly to
> 2000
> > > > after
> > > > > 24h of execution.
> > > > >
> > > > >
> > > > >
> > > > > We did not face any OOM error this time so we are suspecting that
> the
> > > > > previous OOMEs were due to some lag in the much more complex
> topology
> > > we
> > > > > were running and due to Storm backpressure slowing down the Spout
> > > > > consumption.
> > > > >
> > > > > In fact, it seems that the following Map in the
> > > > > org.apache.storm.kafka.spout.KafkaSpout class was the root cause
> of
> > > the
> > > > > errors :
> > > > >
> > > > >  ============================================================
> > > > > =========================================
> > > > >
> > > > > // Tuples that were successfully acked/emitted. These tuples will
> be
> > > > > committed periodically when the commit timer expires, //or after a
> > > > consumer
> > > > > rebalance, or during close/deactivate private transient
> > > > Map<TopicPartition,
> > > > > OffsetManager> offsetManagers;
> > > > > ============================================================
> > > > > =========================================
> > > > >
> > > > >
> > > > >
> > > > > Please note however that these errors did not occur with the "old"
> > > > version
> > > > > of Kafka spout (storm-kafka lib).
> > > > >
> > > > > We ran another test with the same test topology but with
> > > > enable.auto.commit
> > > > > set to true in Kafka consumer properties, lag was starting at about
> > > 2000
> > > > > and staying constant for at least 24h.
> > > > >
> > > > > Our understanding is that auto commit is enabled by default in
> Kafka
> > > > > consumers and also in old version of Kafka spouts (storm-kafka) but
> > it
> > > is
> > > > > disabled in the new Kafka spout version (storm-kafka-client) to
> > manage
> > > > > tuple acking, is it correct please ?
> > > > >
> > > > >  What are the consequences of activating auto commit for consumers
> > with
> > > > the
> > > > > new Kafka Spout then ?
> > > > >
> > > > >  Thank you for your feedbacks,
> > > > >
> > > > > Alexandre Vermeerbergen
> > > > >
> > > > > 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen <
> > > > > [email protected]
> > > > > >:
> > > > >
> > > > > > Hello All,
> > > > > >
> > > > > > Thank you very much for your feedbacks on our experience with
> > > > > > storm-kafka-client, we're going to take your suggestions into
> > account
> > > > to
> > > > > > dig into our issues and we'll feedback as soon as we have more to
> > > > share.
> > > > > >
> > > > > > Best regards,
> > > > > > Alexandre Vermeerbergen
> > > > > >
> > > > > > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>:
> > > > > >
> > > > > >> Hi Alexandre,
> > > > > >>
> > > > > >> I don't know much of storm-kafka-client, but at a glimpse, I
> can't
> > > > find
> > > > > >> misuse of HashMap in KafkaSpout so I'd rather suspect that
> > > > OffsetManager
> > > > > >> being really huge. If you are willing to dig more on KafkaSpout
> > OOME
> > > > > >> issue,
> > > > > >> you can get more information of KafkaSpout for tracking with
> > > changing
> > > > > log
> > > > > >> level to DEBUG or even TRACE.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Jungtaek Lim (HeartSaVioR)
> > > > > >>
> > > > > >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <
> [email protected]
> > >님이
> > > > 작성:
> > > > > >>
> > > > > >> > Hi Alexandre,
> > > > > >> >
> > > > > >> > About issue 1:
> > > > > >> > This issue is not by design. It is a side effect of the spout
> > > > > internally
> > > > > >> > using the KafkaConsumer's subscribe API instead of the assign
> > API.
> > > > > >> Support
> > > > > >> > for using the assign API was added a while back, but has a bug
> > > that
> > > > is
> > > > > >> > preventing the spout from starting when configured to use that
> > > API.
> > > > We
> > > > > >> are
> > > > > >> > working on fixing the issues with that implementation in these
> > PRs
> > > > > >> > https://github.com/apache/storm/pull/2150,
> > > > https://github.com/apache/
> > > > > >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174>.
> I
> > > > think
> > > > > it
> > > > > >> > is very likely that we will remove support for
> > > > > >> > the subscribe API at some point as well, making the assign API
> > the
> > > > > >> default,
> > > > > >> > since several users have had issues with the subscribe API's
> > > > behavior.
> > > > > >> >
> > > > > >> > Once the assign API support is fixed, you can switch to using
> it
> > > via
> > > > > >> this
> > > > > >> > KafkaSpoutConfig Builder constructor
> https://github.com/apache/
> > > > > >> > storm/blob/master/external/storm-kafka-client/src/main/
> > > > > >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136
> > and
> > > > this
> > > > > >> > Subscription implementation https://github.com/srdo/storm/
> blob/
> > > > > >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm-
> > > > > >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > > > > >> > ManualPartitionSubscription.java
> > > > > >> >
> > > > > >> > If you'd like to try out the code from the PR branch, you can
> > > check
> > > > it
> > > > > >> out
> > > > > >> > with some of the steps described here
> > > > > >> >
> > > > > >> > https://help.github.com/articles/checking-out-pull-requests-
> > > > > >> locally/#modifying-an-inactive-pull-request-locally
> > > > > >> > .
> > > > > >> >
> > > > > >> > Note that the PRs for fixing the manual partitioning option
> are
> > > > > against
> > > > > >> the
> > > > > >> > master branch right now, which is targeting Storm version
> 2.0.0,
> > > > but I
> > > > > >> > believe you may be able to use the 2.0.0 storm-kafka-client
> jar
> > > > with a
> > > > > >> > 1.1.0 cluster.
> > > > > >> >
> > > > > >> > Switching to the assign API should solve remove the
> instability
> > as
> > > > the
> > > > > >> > spouts start.
> > > > > >> >
> > > > > >> > About issue 2:
> > > > > >> > The spout lag shouldn't have an effect on memory use. I'm
> > > wondering
> > > > if
> > > > > >> your
> > > > > >> > spout instances are not progressing at all, which might
> explain
> > > the
> > > > > lag?
> > > > > >> > You should be able to check this using the
> > > kafka-consumer-groups.sh
> > > > > >> script
> > > > > >> > in the Kafka /bin directory. Once you've started the spout,
> you
> > > can
> > > > > use
> > > > > >> the
> > > > > >> > script to inspect which offsets the consumer group have
> > committed.
> > > > Try
> > > > > >> > checking if the offsets are moving once the spouts have
> started
> > > > > running.
> > > > > >> >
> > > > > >> > I can't spout any suspicious use of HashMap in the 1.x
> > KafkaSpout.
> > > > > Your
> > > > > >> > attachment didn't make it through, could you post it
> somewhere?
> > > > > >> >
> > > > > >> > About issue 3:
> > > > > >> > The CommitException you are getting is likely because we use
> the
> > > > > >> subscribe
> > > > > >> > API. When using the subscribe API Kafka is in control of
> > partition
> > > > > >> > assigment, and will reassign partitions if a consumer doesn't
> > call
> > > > > poll
> > > > > >> on
> > > > > >> > the consumer often enough. The default is that the consumer
> must
> > > be
> > > > > >> polled
> > > > > >> > at least every 5 minutes. See max.poll.interval.ms in the
> Kafka
> > > > > >> consumer
> > > > > >> > configuration. The assign API doesn't require this, and won't
> > shut
> > > > > down
> > > > > >> > spouts because they are too slow.
> > > > > >> >
> > > > > >> > There's likely still another underlying issue, because it
> seems
> > > > > strange
> > > > > >> > that your spout instances are not polling at least once per 5
> > > > minutes,
> > > > > >> at
> > > > > >> > least if you didn't set a high max.poll.records. It's almost
> > > > certainly
> > > > > >> > related to issue 2. Do you have acking enabled, and what is
> your
> > > > > >> > topology.message.timeout.secs?
> > > > > >> >
> > > > > >> > I'm not sure I understand why you would want to write your own
> > > spout
> > > > > >> from
> > > > > >> > scratch, rather than contributing fixes to the
> > storm-kafka-client
> > > > > >> spout. It
> > > > > >> > seems likely to be more effort than fixing the problems with
> > > > > >> > storm-kafka-client.
> > > > > >> >
> > > > > >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen <
> > > > > >> > [email protected]
> > > > > >> > >:
> > > > > >> >
> > > > > >> > > More about this thread: we noticed that with
> StormKafkaClient
> > > > 1.1.x
> > > > > >> > > latest, we get OutOfMemoryError after ~2hours of running our
> > > > simple
> > > > > >> test
> > > > > >> > > topology.
> > > > > >> > >
> > > > > >> > > We reproduce it everytime, so we decided to generate a heap
> > dump
> > > > > >> before
> > > > > >> > > the OutOfMemoryError, and viewed the result using
> EclipseMAT.
> > > > > >> > >
> > > > > >> > > The results tends to show that there's a memory leak in
> > > > > >> KafkaSpoutClient:
> > > > > >> > > ==============================
> ==============================
> > > > > =========
> > > > > >> > >
> > > > > >> > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"*
> > > > loaded
> > > > > by
> > > > > >> > *"sun.misc.Launcher$AppClassLoader
> > > > > >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The
> > > memory
> > > > > is
> > > > > >> > > accumulated in one instance of *"java.util.HashMap$Node[]"*
> > > loaded
> > > > > by
> > > > > >> > *"<system
> > > > > >> > > class loader>"*.
> > > > > >> > >
> > > > > >> > > *Keywords*
> > > > > >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98
> > > > > >> > > java.util.HashMap$Node[]
> > > > > >> > > org.apache.storm.kafka.spout.KafkaSpout
> > > > > >> > >
> > > > > >> > > ==============================
> ==============================
> > > > > =========
> > > > > >> > >
> > > > > >> > > See attached screenshots of EclipseMAT session showing
> > graphical
> > > > > >> > > representation of memory usage
> > > > > >> > >
> > > > > >> > > FYI we tried to follow instructions from
> > > > > >> https://docs.hortonworks.com/
> > > > > >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/
> > > > > >> > > content/storm-kafkaspout-perf.html to avoid the use of too
> > much
> > > > > >> memory,
> > > > > >> > > but still after 2 hours the memory fills up and the process
> > > > hosting
> > > > > >> our
> > > > > >> > > spout is killed by Supervisor...
> > > > > >> > >
> > > > > >> > > Any clue of what we may have missed?
> > > > > >> > >
> > > > > >> > > Best regards,
> > > > > >> > > Alexandre Vermeerbergen
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre
> > > > > Vermeerbergen
> > > > > >> <
> > > > > >> > > [email protected]>:
> > > > > >> > >
> > > > > >> > >> Oops, sent my last mail too fast, let me continue it:
> > > > > >> > >>
> > > > > >> > >> Hello,
> > > > > >> > >>
> > > > > >> > >> Coming back to my original post in this list, we have 3
> > issues
> > > > with
> > > > > >> > >> latest 1.1.x StormKafkaClient spout with our setup:
> > > > > >> > >>
> > > > > >> > >> Issue#1:
> > > > > >> > >>  Initial lag (which we hadn't using the classic Storm Kafka
> > > > spout)
> > > > > >> > >>    For this issue, my understanding of Kristopher's answer
> is
> > > > that
> > > > > >> this
> > > > > >> > >> is "by design" of the StormKafkaClient spout, which
> instances
> > > > > >> > progressively
> > > > > >> > >> joins Kafka consumers group, which causes consumers
> > > rebalancing.
> > > > > This
> > > > > >> > >> rebalancing is "slow", which means that until all spout
> > > instances
> > > > > are
> > > > > >> > >> started, the topology starts with an "initial Kafka Lag"
> > > > > >> > >>    => Is my understanding correct?
> > > > > >> > >>    => Why don't we have such behavior with the old Storm
> > Kafka
> > > > > spout
> > > > > >> ?
> > > > > >> > >>    => Is this annoying initial lag tracked by a JIRA ?
> > > > > >> > >>
> > > > > >> > >> Issue#2:
> > > > > >> > >>     The kafka Lag is increasing constantly and this leads
> to
> > > the
> > > > > >> > overload
> > > > > >> > >> of the storm worker running the kafka spout. At the end,
> the
> > > > worker
> > > > > >> > crashes
> > > > > >> > >> and it is automatically restarted by Storm.
> > > > > >> > >>     => This is unlike what we observe with the old Storm
> > Kafka
> > > > > spout
> > > > > >> > >>     => What is the recommended way to analyze this issue?
> > > > > >> > >>
> > > > > >> > >> Issue3:
> > > > > >> > >>   With the new Kafka Spout, we have faced this exception
> many
> > > > > times:
> > > > > >> > >>
> > > > > >> > >>  org.apache.kafka.clients.consumer.CommitFailedException:
> > > Commit
> > > > > >> cannot
> > > > > >> > >> be completed since the group has already rebalanced and
> > > assigned
> > > > > the
> > > > > >> > >> partitions to another member. This means that the time
> > between
> > > > > >> > subsequent
> > > > > >> > >> calls to poll() was longer than the configured
> > > > > max.poll.interval.ms,
> > > > > >> > >> which typically implies that the poll loop is spending too
> > much
> > > > > time
> > > > > >> > >> message processing. You can address this either by
> increasing
> > > the
> > > > > >> > session
> > > > > >> > >> timeout or by reducing the maximum size of batches returned
> > in
> > > > > poll()
> > > > > >> > with
> > > > > >> > >> max.poll.records. at org.apache.kafka.clients.consu
> > > > > >> > >>
> > > > > >> > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
> > > > > >> nsumerCoordinator.java:702)
> > > > > >> > >> at org.apache.kafka.clients.consumer.internals.
> > > ConsumerCoordina
> > > > > >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
> > > > > >> > >>
> > > > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > > > > >> KafkaConsumer.java:1124)
> > > > > >> > >> at
> > > > > >> > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
> > > > > >> dTuples(KafkaSpout.java:384)
> > > > > >> > >> at
> > > > > >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(
> > > > > KafkaSpout.java:220)
> > > > > >> > >> at
> > > > > >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> > > > > >> 10826.invoke(executor.clj:646)
> > > > > >> > >> at org.apache.storm.util$async_
> loop$fn__555.invoke(util.clj:
> > > 484)
> > > > > at
> > > > > >> > >> clojure.lang.AFn.run(AFn.java:22) at
> > > > > java.lang.Thread.run(Thread.ja
> > > > > >> > >> va:748)
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >>   => Are we the only ones experiencing such issues with
> Storm
> > > > > >> > 1.1.0/1.1.x
> > > > > >> > >> latest ?
> > > > > >> > >>
> > > > > >> > >> Note: We are considering writing our own Kafka Spout, as
> > we're
> > > > > >> > time-bound
> > > > > >> > >> to move to Kafka 0.10.x consumers & producers (to prepare
> our
> > > > next
> > > > > >> step
> > > > > >> > >> with Kafka security, which isn't available with Kafka
> 0.9.x).
> > > We
> > > > > will
> > > > > >> > miss
> > > > > >> > >> the integration of Kafka lag in StormUI, but currently we
> do
> > > not
> > > > > >> > understand
> > > > > >> > >> how to solve the regressions we observe with latest Storm
> > Kafka
> > > > > >> client
> > > > > >> > >> spout.
> > > > > >> > >>
> > > > > >> > >> Are there other Storm developers/users who jumped into this
> > > > > >> alternative?
> > > > > >> > >>
> > > > > >> > >> Best regards,
> > > > > >> > >>
> > > > > >> > >> Alexandre Vermeerbergen
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre
> > > > > >> Vermeerbergen <
> > > > > >> > >> [email protected]>:
> > > > > >> > >>
> > > > > >> > >>> Hello,
> > > > > >> > >>>
> > > > > >> > >>> Coming back to my original post in this list, we have two
> > > issues
> > > > > >> with
> > > > > >> > >>> latest 1.1.x StormKafkaClient spout with our setup:
> > > > > >> > >>>
> > > > > >> > >>> Issue#1:
> > > > > >> > >>>  Initial lag (which we hadn't using the classic Storm
> Kafka
> > > > spout)
> > > > > >> > >>>    For this issue, my understanding of Kristopher's answer
> > is
> > > > that
> > > > > >> this
> > > > > >> > >>> is "by design" of the StormKafkaClient spout, which
> > instances
> > > > > >> > progressively
> > > > > >> > >>> joins Kafka consumers group, which causes consumers
> > > rebalancing.
> > > > > >> This
> > > > > >> > >>> rebalancing is "slow", which means that until all spout
> > > > instances
> > > > > >> are
> > > > > >> > >>> started, the topology starts with an "initial Kafka Lag"
> > > > > >> > >>>    => Is my understanding correct?
> > > > > >> > >>>    => Why don't we have such behavior with the old Storm
> > Kafka
> > > > > >> spout ?
> > > > > >> > >>>    => Is this annoying initial lag tracked by a JIRA ?
> > > > > >> > >>>
> > > > > >> > >>>
> > > > > >> > >>>
> > > > > >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
> > > > > >> > >>> [email protected]>:
> > > > > >> > >>>
> > > > > >> > >>>> Hello Kristopher,
> > > > > >> > >>>>
> > > > > >> > >>>> We built Storm 1.1.1-latest using yesterday's
> (2017-06-26
> > > > > >> > >>>> <20%2017%2006%2026>)  artifacts downloaded from
> > > > > >> > >>>> https://github.com/apache/storm/tree/1.x-branch.
> > > > > >> > >>>> <https://github.com/apache/storm/tree/1.x-branch>
> > > > > >> > >>>>
> > > > > >> > >>>> Is your latest PR supposed to be in what we downloaded &
> > > built,
> > > > > or
> > > > > >> do
> > > > > >> > >>>> we need to upgrade in some way (which?)
> > > > > >> > >>>>
> > > > > >> > >>>> Should we change anything to our settings?
> > > > > >> > >>>>
> > > > > >> > >>>> Please note that I mistakenly wrote that our "Kafka
> > consumer
> > > > > >> strategy
> > > > > >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is
> > set
> > > > to
> > > > > >> > LATEST",
> > > > > >> > >>>> if that matters.
> > > > > >> > >>>>
> > > > > >> > >>>> Best regards,
> > > > > >> > >>>> Alexandre Vermeerbergen
> > > > > >> > >>>>
> > > > > >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <
> > > > [email protected]
> > > > > >:
> > > > > >> > >>>>
> > > > > >> > >>>>> Correction: https://github.com/apache/storm/pull/2174
> has
> > > all
> > > > > of
> > > > > >> > what
> > > > > >> > >>>>> I was
> > > > > >> > >>>>> doing and more.
> > > > > >> > >>>>>
> > > > > >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <
> > > > > >> > [email protected]
> > > > > >> > >>>>> >
> > > > > >> > >>>>> wrote:
> > > > > >> > >>>>>
> > > > > >> > >>>>> > Alexandre,
> > > > > >> > >>>>> >
> > > > > >> > >>>>> > There are quite a few JIRAs and discussions around
> this
> > > > > >> recently.
> > > > > >> > >>>>> The
> > > > > >> > >>>>> > default behavior for storm-kafka-client is the
> > 'subscribe'
> > > > API
> > > > > >> > which
> > > > > >> > >>>>> causes
> > > > > >> > >>>>> > the immediate lag you see since rebalance will happen
> > > > > spout(n)-1
> > > > > >> > >>>>> times
> > > > > >> > >>>>> > just from the spouts spinning up.
> > > > > >> > >>>>> >
> > > > > >> > >>>>> > There is a Builder for ManualPartitionNamedSubscripti
> on
> > > and
> > > > > the
> > > > > >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign'
> > Kafka
> > > > > >> consumer
> > > > > >> > >>>>> API) but
> > > > > >> > >>>>> > they don't work at all.  I hope to have a PR in today
> > > > > >> > >>>>> > to fix these on 1.x-branch
> > > > > >> > >>>>> >
> > > > > >> > >>>>> > The other JIRAs I mentioned are for a redesign of this
> > > spout
> > > > > or
> > > > > >> > >>>>> other more
> > > > > >> > >>>>> > drastic changes.  My goal is a bug fix for a version
> of
> > > the
> > > > > >> spout
> > > > > >> > >>>>> that
> > > > > >> > >>>>> > doesn't provide unnecessary duplicates.
> > > > > >> > >>>>> >
> > > > > >> > >>>>> > Kris
> > > > > >> > >>>>> >
> > > > > >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre
> > Vermeerbergen <
> > > > > >> > >>>>> > [email protected]> wrote:
> > > > > >> > >>>>> >
> > > > > >> > >>>>> >> Hello All,
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> We have been running for a while our real-time
> > > supervision
> > > > > >> > >>>>> application
> > > > > >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts
> > (old
> > > > > >> consumer:
> > > > > >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster based
> on
> > > > > Apache
> > > > > >> > Kafka
> > > > > >> > >>>>> >> 0.10.1.0.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> Backpressure is activated with default parameters.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>  Key
> > > > > >> > >>>>> >> Value
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> backpressure.disruptor.high.watermark
> 0.9
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> backpressure.disruptor.low.watermark
>  0.4
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> task.backpressure.poll.secs
> > > > > >>  30
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> topology.backpressure.enable
> > > > >  true
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to
> benefit
> > > from
> > > > > the
> > > > > >> > new
> > > > > >> > >>>>> Kafka
> > > > > >> > >>>>> >> Spout  (storm-kafka-client lib) with a consumer which
> > has
> > > > no
> > > > > >> more
> > > > > >> > >>>>> >> dependency on Zookeeper.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> After upgrade, we had several issues with kafka
> > > > consumption.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> We saw that several JIRAs were opened and resolved on
> > > > Apache
> > > > > >> Storm
> > > > > >> > >>>>> 1.1.1.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> So we decided to upgrade to the latest available
> Apache
> > > > Storm
> > > > > >> > 1.1.x
> > > > > >> > >>>>> code
> > > > > >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>)
> but
> > > we
> > > > > >> still
> > > > > >> > >>>>> have issues :
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> 1. The kafka Lag is increasing constantly and this
> > leads
> > > to
> > > > > the
> > > > > >> > >>>>> overload
> > > > > >> > >>>>> >> of
> > > > > >> > >>>>> >> the storm worker running the kafka spout. At the end,
> > the
> > > > > >> worker
> > > > > >> > >>>>> crashes
> > > > > >> > >>>>> >> and it is automatically restarted by Storm.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> With old kafka spout version, we had a lag most of
> the
> > > > times
> > > > > >> > bellow
> > > > > >> > >>>>> 10000.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> With the new one, we are starting with Kafka lag
> about
> > > > 30000
> > > > > >> and
> > > > > >> > >>>>> >> increasing
> > > > > >> > >>>>> >> until crash.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> 2. With the new Kafka Spout, we have faced this
> > exception
> > > > > many
> > > > > >> > >>>>> times:
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.
> > CommitFailedException:
> > > > > >> Commit
> > > > > >> > >>>>> cannot be
> > > > > >> > >>>>> >> completed since the group has already rebalanced and
> > > > assigned
> > > > > >> the
> > > > > >> > >>>>> >> partitions to another member. This means that the
> time
> > > > > between
> > > > > >> > >>>>> subsequent
> > > > > >> > >>>>> >> calls to poll() was longer than the configured
> > > > > >> > max.poll.interval.ms
> > > > > >> > >>>>> ,
> > > > > >> > >>>>> >> which
> > > > > >> > >>>>> >> typically implies that the poll loop is spending too
> > much
> > > > > time
> > > > > >> > >>>>> message
> > > > > >> > >>>>> >> processing. You can address this either by increasing
> > the
> > > > > >> session
> > > > > >> > >>>>> timeout
> > > > > >> > >>>>> >> or by reducing the maximum size of batches returned
> in
> > > > poll()
> > > > > >> with
> > > > > >> > >>>>> >> max.poll.records. at
> > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordina
> > > > > >> > >>>>> >> tor.sendOffsetCommitRequest(
> > > ConsumerCoordinator.java:702)
> > > > > >> > >>>>> >> at
> > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordina
> > > > > >> > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
> > > > > >> > >>>>> >> at
> > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > commitSync(
> > > > > >> > >>>>> >> KafkaConsumer.java:1124)
> > > > > >> > >>>>> >> at
> > > > > >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout.
> > > > commitOffsetsForAcke
> > > > > >> > >>>>> >> dTuples(KafkaSpout.java:384)
> > > > > >> > >>>>> >> at org.apache.storm.kafka.spout.K
> > > > > >> afkaSpout.nextTuple(KafkaSpout
> > > > > >> > >>>>> .java:220)
> > > > > >> > >>>>> >> at
> > > > > >> > >>>>> >> org.apache.storm.daemon.
> executor$fn__10780$fn__10795$
> > > fn__
> > > > > >> > >>>>> >> 10826.invoke(executor.clj:646)
> > > > > >> > >>>>> >> at org.apache.storm.util$async_lo
> > > > > >> op$fn__555.invoke(util.clj:484)
> > > > > >> > at
> > > > > >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at
> > > > > >> > java.lang.Thread.run(Thread.ja
> > > > > >> > >>>>> >> va:748)
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> We are using the following configuration for Kafka
> > Spout
> > > :
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> poll.timeout.ms 200.
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> offset.commit.period.ms  30000          (30
> seconds).
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> max.uncommitted.offsets 10000000  (ten million).
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We also
> > set
> > > > the
> > > > > >> > >>>>> following
> > > > > >> > >>>>> >> Kafka consumer parameter :
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> session.timeout.ms  120000
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> Are there any traces/options which we could turn on
> on
> > > > Storm
> > > > > >> or on
> > > > > >> > >>>>> Kafka
> > > > > >> > >>>>> >> Broker that might help understanding how to stabilize
> > our
> > > > > >> > >>>>> topologies with
> > > > > >> > >>>>> >> this new branch?
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> Thanks!
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >> Alexandre Vermeerbergen
> > > > > >> > >>>>> >>
> > > > > >> > >>>>> >
> > > > > >> > >>>>> >
> > > > > >> > >>>>>
> > > > > >> > >>>>
> > > > > >> > >>>>
> > > > > >> > >>>
> > > > > >> > >>
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to