Hello Stig,

Thank you very much for trying.
I noticed you created this JIRA for tracking:
https://issues.apache.org/jira/browse/STORM-2648 and I have added a watch
on it, so I will follow-up with this JIRA.

Best regards,
Alexandre

2017-07-20 12:07 GMT+02:00 Stig Rohde Døssing <[email protected]>:

> Okay, I think this might be fixable then.
>
> I think storm-kafka-client in autocommit mode should also raise "fails"
> > whenever it has Kafka consumption issues.  Is it the case with current
> > implementation?
>
> Acking and failing tuples only happens once a tuple has been emitted from
> the spout, so the spout doesn't have the behavior you describe. If the
> spout is having problems acquiring messages internally, it will not be
> reflected in acks/fails. If the problem the spout is having causes an
> exception, you should be able to see this in Storm UI. If the problem is
> some sort of delay or hanging, you should be able to see this by the Kafka
> lag, or the spout's emit rate dropping.
>
> In order to support having acks/fails and complete latency when using auto
> commit, we need to make the spout always emit tuples with anchors. Since
> the spout doesn't care about acks or fails when auto commit is enabled, we
> should make the spout ack/fail methods return immediately if auto commit is
> enabled.
>
> I think we don't lose much by doing it this way. Users that want the
> statistics in Storm UI can enable auto commit and leave topology ackers at
> some non-zero number. Users that don't care and don't want the overhead of
> having Storm track the tuples can set topology ackers to 0, which should
> make the spout behave a lot like it does now. The only use case I can think
> of where this won't work is if someone with multiple spouts in a topology
> needs one to be auto commit and one to be at-least-once, and they can't
> live with the overhead of tuple tracking for the auto commit spout. If this
> is a real use case, it can be worked around with an extra configuration
> parameter to switch whether tuples are emitted unanchored, but I'd rather
> keep it simple for now.
>
> If we want to do this fix I don't think we need to break the API, so it
> could probably go in 1.1.1. I've only given it a cursory look though, so
> take that statement with a grain of salt :)
>
> 2017-07-20 9:30 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]>
> :
>
> > Hello Stig,
> >
> > Thank you very much for your detailed answer.
> >
> > Yes, we get the same behavior as yours in Storm UI using
> storm-kafka-client
> > in autocommit mode : the count of emitted & transferred tuples is
> > available, but we have 0 acks, 0 failed and "complete latency" remains at
> > 0.
> >
> > And YES, we need the complete latency to be functional while using
> > storm-kafka-client with autocommit.
> >
> > I think storm-kafka-client in autocommit mode should also raise "fails"
> > whenever it has Kafka consumption issues.  Is it the case with current
> > implementation?
> >
> > Last, would it be possible to have complete latency for
> storm-kafka-client
> > spouts in autocommit mode for Storm 1.1.1 release ?
> >
> > Best regards,
> > Alexandre Vermeerbergen
> >
> >
> > 2017-07-19 15:55 GMT+02:00 Stig Rohde Døssing <[email protected]>:
> >
> > > Hi Alexandre,
> > >
> > > Happy to hear that the spout is working better for you. I think that if
> > you
> > > don't need at-least-once, then using auto commit is a good option.
> > >
> > > When you enable auto commit, the spout emits tuples unanchored (see the
> > > calls to collector.emit here
> > > https://github.com/apache/storm/blob/master/external/
> > > storm-kafka-client/src/main/java/org/apache/storm/kafka/
> > > spout/KafkaSpout.java#L346).
> > > This causes Storm to not bother tracking acks and fails for the emitted
> > > tuples, which means the counters in Storm UI for acks, fails and
> complete
> > > latency are zeroed out. The complete latency doesn't make sense to
> track
> > if
> > > the tuples are not acked. The rest of the statistics should still be
> > there.
> > > I think we implemented it this way from the reasoning that it doesn't
> > make
> > > sense to make Storm track and ack/fail tuples if the spout is
> configured
> > to
> > > not care about failures.
> > >
> > > I tried out a small test topology on a local 2.0.0 Storm setup using
> auto
> > > commit, and here's what I'm seeing: http://imgur.com/a/CoJBa. As you
> can
> > > see the execute latency and emit/transfer counters still work. Is this
> > > different from what you're experiencing?
> > >
> > > If you need the complete latency to be functional while using auto
> > commit,
> > > we'll need to update the spout so it emits the tuples with anchors, and
> > > then ignores the acks/fails.
> > >
> > > 2017-07-19 10:12 GMT+02:00 Alexandre Vermeerbergen <
> > > [email protected]
> > > >:
> > >
> > > > Hello Stig,
> > > >
> > > > Thank you very much for your detailed explanations about
> > > storm-kafka-client
> > > > behavior, it helps us a lot.
> > > >
> > > > We have topologies for which at-least-one matters, so we're OK to try
> > to
> > > > converge them with this default behavior of storm-kafka-client.
> > > >
> > > > However, our main topology is our "realtime alerting" one. It
> evaluates
> > > > incoming "metrics" (our system is a Supervision system) against
> > > "triggers"
> > > > (same meaning as in Zabbix), and here we prefer to drop anything that
> > we
> > > > couldn't evaluate quickly enough. Indeed, to do "real-time alerting",
> > we
> > > > know that if we skip a few metrics, then, shortly after, there will
> be
> > > > newer ones that will suffice to get a "near real time enough" view of
> > the
> > > > "current state of our supervised devices".
> > > >
> > > > This is the reason why we recently turned on "autocommit" in
> > > > storm-kafka-client, to overcome the growing "lag" or OOM we
> eventually
> > > got.
> > > >
> > > > Now our topology works like a charm.
> > > >
> > > > Except that with autocommit turned on, no acks are sent from
> > > > storm-kafka-client spouts, so Storm UI isn't displaying any
> statistics
> > > > information about our topology, which is a big annoyance for us.
> > > >
> > > > Question:
> > > > * Is our approach to use autocommit OK, or given our scenario, do you
> > > > recommend othewise?
> > > > * If it's OK to use autocommit, then how can we configure the
> > > > storm-kafka-client spout to "blindly" ACK all tuples it sends?
> > > >
> > > > Best regards,
> > > > Alexandre Vermeerbergen
> > > >
> > > >
> > > >
> > > >
> > > > 2017-07-05 17:22 GMT+02:00 Stig Døssing <[email protected]>:
> > > >
> > > > > Regarding the offsetManagers map, I think the only thing it
> contains
> > > that
> > > > > should take up any space is uncommitted offsets. Those should be
> > > cleared
> > > > > out when the spout commits offsets to Kafka. There have been at
> > least a
> > > > few
> > > > > issues related to that recently, e.g.
> > > > > https://github.com/apache/storm/pull/2153, which may be what
> you're
> > > > > running
> > > > > into. The spout should be able to limit how many uncommitted
> offsets
> > it
> > > > > will accept before slowing down, but this functionality is still
> > buggy
> > > in
> > > > > some cases (see https://github.com/apache/storm/pull/2156,
> > > > > https://github.com/apache/storm/pull/2152).
> > > > >
> > > > > The storm-kafka-client spout doesn't share code with storm-kafka,
> > it's
> > > an
> > > > > implementation based on a different Kafka client (this one
> > > > > https://kafka.apache.org/0100/javadoc/index.html?org/apache/
> > > > > kafka/clients/consumer/KafkaConsumer.html),
> > > > > where the storm-kafka client used the now deprecated
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.
> > > > > 0+SimpleConsumer+Example.
> > > > > There are still some bugs on the new spout we're working out.
> > > > >
> > > > > Auto commit is not enabled in storm-kafka, because the
> SimpleConsumer
> > > > > doesn't have that concept. The old spout kept track of processed
> > > offsets
> > > > by
> > > > > storing how far it had read in Zookeeper. The new client allows us
> to
> > > > mark
> > > > > how far the spout has read in Kafka, which can either be done via
> the
> > > > > commit methods on the KafkaConsumer (auto commit disabled), or
> > > > periodically
> > > > > by a background thread in the consumer (auto commit enabled).
> > Normally
> > > > when
> > > > > the spout emits a message into the topology, we want to only mark
> > that
> > > > > message as "done" once Storm tells the spout to ack the tuple. This
> > is
> > > > the
> > > > > behavior when auto commit is disabled; the spout emits some tuples,
> > > > > receives some acks, and periodically uses the commitSync method to
> > mark
> > > > the
> > > > > acked tuples as "done" in Kafka. If auto commit is enabled, the
> > > > > KafkaConsumer will instead periodically commit the tuples it has
> > > returned
> > > > > from polls. The effect is that Storm can't control when an offset
> is
> > > > > considered "done", which means that some messages can be lost. This
> > is
> > > > fine
> > > > > for some use cases where you don't necessarily need to process all
> > > > > messages.
> > > > >
> > > > > Leaving auto commit disabled is the closest match to how the
> > > storm-kafka
> > > > > spout handled messages. We disable auto commit by default in the
> new
> > > > spout
> > > > > because at-least-once processing guarantees should be the default,
> > not
> > > > > something people have to opt in to.
> > > > >
> > > > > 2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen <
> > > > > [email protected]
> > > > > >:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > To continue on this thread:
> > > > > >
> > > > > > Since we are facing some OOMEs and Kafka lag peaks with our
> > > topologies
> > > > > > using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on
> > Kafka
> > > > > > 0.10.2.0 libs), we decided to test a much simpler Storm topology.
> > > > > >
> > > > > >  This test topology is very basic and is composed of :
> > > > > >
> > > > > > - one instance of Kafka spout consuming a topic composed of 4
> > > > partitions
> > > > > > with a throughput of about 3000 messages per second and with
> > message
> > > > size
> > > > > > of about 1500 bytes, the consumption strategy is set to "LATEST",
> > > other
> > > > > > Kafka Spout parameters are default ones
> > > > > >
> > > > > > - one basic bolt which is just counting received tuples and
> logging
> > > > > > statistics every minutes (min ,max, avg for number of tuples
> > received
> > > > per
> > > > > > min) with acking every tuple
> > > > > >
> > > > > > Back pressure is activated with default watermark parameters.
> > > > > >
> > > > > > topology.message.timeout.secs is not changed (default value is
> 30).
> > > > > >
> > > > > >  Message are encrypted (using our own encryption method)
> serialized
> > > > java
> > > > > > objects but we are just using the provided ByteArrayDeserializer
> > for
> > > > this
> > > > > > test since we just want to count tuples without any custom
> > > > > deserialization
> > > > > > interference.
> > > > > >
> > > > > > This topology is running alone on a single VM (8 vCPU) with one
> > > worker
> > > > > > configured with 2 GB of RAM for Xmx JVM option.
> > > > > >
> > > > > > Please also note that our cluster of Kafka Brokers is based on
> > Apache
> > > > > Kafka
> > > > > > 0.10.1.0
> > > > > >
> > > > > >
> > > > > >
> > > > > > We are tracking the Kafka lag for the topic we are consuming.
> > > > > >
> > > > > > Kafka lag is 10000 at topology start but it decreased slowly to
> > 2000
> > > > > after
> > > > > > 24h of execution.
> > > > > >
> > > > > >
> > > > > >
> > > > > > We did not face any OOM error this time so we are suspecting that
> > the
> > > > > > previous OOMEs were due to some lag in the much more complex
> > topology
> > > > we
> > > > > > were running and due to Storm backpressure slowing down the Spout
> > > > > > consumption.
> > > > > >
> > > > > > In fact, it seems that the following Map in the
> > > > > > org.apache.storm.kafka.spout.KafkaSpout class was the root cause
> > of
> > > > the
> > > > > > errors :
> > > > > >
> > > > > >  ============================================================
> > > > > > =========================================
> > > > > >
> > > > > > // Tuples that were successfully acked/emitted. These tuples will
> > be
> > > > > > committed periodically when the commit timer expires, //or after
> a
> > > > > consumer
> > > > > > rebalance, or during close/deactivate private transient
> > > > > Map<TopicPartition,
> > > > > > OffsetManager> offsetManagers;
> > > > > > ============================================================
> > > > > > =========================================
> > > > > >
> > > > > >
> > > > > >
> > > > > > Please note however that these errors did not occur with the
> "old"
> > > > > version
> > > > > > of Kafka spout (storm-kafka lib).
> > > > > >
> > > > > > We ran another test with the same test topology but with
> > > > > enable.auto.commit
> > > > > > set to true in Kafka consumer properties, lag was starting at
> about
> > > > 2000
> > > > > > and staying constant for at least 24h.
> > > > > >
> > > > > > Our understanding is that auto commit is enabled by default in
> > Kafka
> > > > > > consumers and also in old version of Kafka spouts (storm-kafka)
> but
> > > it
> > > > is
> > > > > > disabled in the new Kafka spout version (storm-kafka-client) to
> > > manage
> > > > > > tuple acking, is it correct please ?
> > > > > >
> > > > > >  What are the consequences of activating auto commit for
> consumers
> > > with
> > > > > the
> > > > > > new Kafka Spout then ?
> > > > > >
> > > > > >  Thank you for your feedbacks,
> > > > > >
> > > > > > Alexandre Vermeerbergen
> > > > > >
> > > > > > 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen <
> > > > > > [email protected]
> > > > > > >:
> > > > > >
> > > > > > > Hello All,
> > > > > > >
> > > > > > > Thank you very much for your feedbacks on our experience with
> > > > > > > storm-kafka-client, we're going to take your suggestions into
> > > account
> > > > > to
> > > > > > > dig into our issues and we'll feedback as soon as we have more
> to
> > > > > share.
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Alexandre Vermeerbergen
> > > > > > >
> > > > > > > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>:
> > > > > > >
> > > > > > >> Hi Alexandre,
> > > > > > >>
> > > > > > >> I don't know much of storm-kafka-client, but at a glimpse, I
> > can't
> > > > > find
> > > > > > >> misuse of HashMap in KafkaSpout so I'd rather suspect that
> > > > > OffsetManager
> > > > > > >> being really huge. If you are willing to dig more on
> KafkaSpout
> > > OOME
> > > > > > >> issue,
> > > > > > >> you can get more information of KafkaSpout for tracking with
> > > > changing
> > > > > > log
> > > > > > >> level to DEBUG or even TRACE.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Jungtaek Lim (HeartSaVioR)
> > > > > > >>
> > > > > > >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <
> > [email protected]
> > > >님이
> > > > > 작성:
> > > > > > >>
> > > > > > >> > Hi Alexandre,
> > > > > > >> >
> > > > > > >> > About issue 1:
> > > > > > >> > This issue is not by design. It is a side effect of the
> spout
> > > > > > internally
> > > > > > >> > using the KafkaConsumer's subscribe API instead of the
> assign
> > > API.
> > > > > > >> Support
> > > > > > >> > for using the assign API was added a while back, but has a
> bug
> > > > that
> > > > > is
> > > > > > >> > preventing the spout from starting when configured to use
> that
> > > > API.
> > > > > We
> > > > > > >> are
> > > > > > >> > working on fixing the issues with that implementation in
> these
> > > PRs
> > > > > > >> > https://github.com/apache/storm/pull/2150,
> > > > > https://github.com/apache/
> > > > > > >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174
> >.
> > I
> > > > > think
> > > > > > it
> > > > > > >> > is very likely that we will remove support for
> > > > > > >> > the subscribe API at some point as well, making the assign
> API
> > > the
> > > > > > >> default,
> > > > > > >> > since several users have had issues with the subscribe API's
> > > > > behavior.
> > > > > > >> >
> > > > > > >> > Once the assign API support is fixed, you can switch to
> using
> > it
> > > > via
> > > > > > >> this
> > > > > > >> > KafkaSpoutConfig Builder constructor
> > https://github.com/apache/
> > > > > > >> > storm/blob/master/external/storm-kafka-client/src/main/
> > > > > > >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#
> L136
> > > and
> > > > > this
> > > > > > >> > Subscription implementation https://github.com/srdo/storm/
> > blob/
> > > > > > >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm-
> > > > > > >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > > > > > >> > ManualPartitionSubscription.java
> > > > > > >> >
> > > > > > >> > If you'd like to try out the code from the PR branch, you
> can
> > > > check
> > > > > it
> > > > > > >> out
> > > > > > >> > with some of the steps described here
> > > > > > >> >
> > > > > > >> > https://help.github.com/articles/checking-out-pull-
> requests-
> > > > > > >> locally/#modifying-an-inactive-pull-request-locally
> > > > > > >> > .
> > > > > > >> >
> > > > > > >> > Note that the PRs for fixing the manual partitioning option
> > are
> > > > > > against
> > > > > > >> the
> > > > > > >> > master branch right now, which is targeting Storm version
> > 2.0.0,
> > > > > but I
> > > > > > >> > believe you may be able to use the 2.0.0 storm-kafka-client
> > jar
> > > > > with a
> > > > > > >> > 1.1.0 cluster.
> > > > > > >> >
> > > > > > >> > Switching to the assign API should solve remove the
> > instability
> > > as
> > > > > the
> > > > > > >> > spouts start.
> > > > > > >> >
> > > > > > >> > About issue 2:
> > > > > > >> > The spout lag shouldn't have an effect on memory use. I'm
> > > > wondering
> > > > > if
> > > > > > >> your
> > > > > > >> > spout instances are not progressing at all, which might
> > explain
> > > > the
> > > > > > lag?
> > > > > > >> > You should be able to check this using the
> > > > kafka-consumer-groups.sh
> > > > > > >> script
> > > > > > >> > in the Kafka /bin directory. Once you've started the spout,
> > you
> > > > can
> > > > > > use
> > > > > > >> the
> > > > > > >> > script to inspect which offsets the consumer group have
> > > committed.
> > > > > Try
> > > > > > >> > checking if the offsets are moving once the spouts have
> > started
> > > > > > running.
> > > > > > >> >
> > > > > > >> > I can't spout any suspicious use of HashMap in the 1.x
> > > KafkaSpout.
> > > > > > Your
> > > > > > >> > attachment didn't make it through, could you post it
> > somewhere?
> > > > > > >> >
> > > > > > >> > About issue 3:
> > > > > > >> > The CommitException you are getting is likely because we use
> > the
> > > > > > >> subscribe
> > > > > > >> > API. When using the subscribe API Kafka is in control of
> > > partition
> > > > > > >> > assigment, and will reassign partitions if a consumer
> doesn't
> > > call
> > > > > > poll
> > > > > > >> on
> > > > > > >> > the consumer often enough. The default is that the consumer
> > must
> > > > be
> > > > > > >> polled
> > > > > > >> > at least every 5 minutes. See max.poll.interval.ms in the
> > Kafka
> > > > > > >> consumer
> > > > > > >> > configuration. The assign API doesn't require this, and
> won't
> > > shut
> > > > > > down
> > > > > > >> > spouts because they are too slow.
> > > > > > >> >
> > > > > > >> > There's likely still another underlying issue, because it
> > seems
> > > > > > strange
> > > > > > >> > that your spout instances are not polling at least once per
> 5
> > > > > minutes,
> > > > > > >> at
> > > > > > >> > least if you didn't set a high max.poll.records. It's almost
> > > > > certainly
> > > > > > >> > related to issue 2. Do you have acking enabled, and what is
> > your
> > > > > > >> > topology.message.timeout.secs?
> > > > > > >> >
> > > > > > >> > I'm not sure I understand why you would want to write your
> own
> > > > spout
> > > > > > >> from
> > > > > > >> > scratch, rather than contributing fixes to the
> > > storm-kafka-client
> > > > > > >> spout. It
> > > > > > >> > seems likely to be more effort than fixing the problems with
> > > > > > >> > storm-kafka-client.
> > > > > > >> >
> > > > > > >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen <
> > > > > > >> > [email protected]
> > > > > > >> > >:
> > > > > > >> >
> > > > > > >> > > More about this thread: we noticed that with
> > StormKafkaClient
> > > > > 1.1.x
> > > > > > >> > > latest, we get OutOfMemoryError after ~2hours of running
> our
> > > > > simple
> > > > > > >> test
> > > > > > >> > > topology.
> > > > > > >> > >
> > > > > > >> > > We reproduce it everytime, so we decided to generate a
> heap
> > > dump
> > > > > > >> before
> > > > > > >> > > the OutOfMemoryError, and viewed the result using
> > EclipseMAT.
> > > > > > >> > >
> > > > > > >> > > The results tends to show that there's a memory leak in
> > > > > > >> KafkaSpoutClient:
> > > > > > >> > > ==============================
> > ==============================
> > > > > > =========
> > > > > > >> > >
> > > > > > >> > > One instance of *"org.apache.storm.kafka.
> spout.KafkaSpout"*
> > > > > loaded
> > > > > > by
> > > > > > >> > *"sun.misc.Launcher$AppClassLoader
> > > > > > >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes.
> The
> > > > memory
> > > > > > is
> > > > > > >> > > accumulated in one instance of
> *"java.util.HashMap$Node[]"*
> > > > loaded
> > > > > > by
> > > > > > >> > *"<system
> > > > > > >> > > class loader>"*.
> > > > > > >> > >
> > > > > > >> > > *Keywords*
> > > > > > >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98
> > > > > > >> > > java.util.HashMap$Node[]
> > > > > > >> > > org.apache.storm.kafka.spout.KafkaSpout
> > > > > > >> > >
> > > > > > >> > > ==============================
> > ==============================
> > > > > > =========
> > > > > > >> > >
> > > > > > >> > > See attached screenshots of EclipseMAT session showing
> > > graphical
> > > > > > >> > > representation of memory usage
> > > > > > >> > >
> > > > > > >> > > FYI we tried to follow instructions from
> > > > > > >> https://docs.hortonworks.com/
> > > > > > >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/
> > > > > > >> > > content/storm-kafkaspout-perf.html to avoid the use of
> too
> > > much
> > > > > > >> memory,
> > > > > > >> > > but still after 2 hours the memory fills up and the
> process
> > > > > hosting
> > > > > > >> our
> > > > > > >> > > spout is killed by Supervisor...
> > > > > > >> > >
> > > > > > >> > > Any clue of what we may have missed?
> > > > > > >> > >
> > > > > > >> > > Best regards,
> > > > > > >> > > Alexandre Vermeerbergen
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre
> > > > > > Vermeerbergen
> > > > > > >> <
> > > > > > >> > > [email protected]>:
> > > > > > >> > >
> > > > > > >> > >> Oops, sent my last mail too fast, let me continue it:
> > > > > > >> > >>
> > > > > > >> > >> Hello,
> > > > > > >> > >>
> > > > > > >> > >> Coming back to my original post in this list, we have 3
> > > issues
> > > > > with
> > > > > > >> > >> latest 1.1.x StormKafkaClient spout with our setup:
> > > > > > >> > >>
> > > > > > >> > >> Issue#1:
> > > > > > >> > >>  Initial lag (which we hadn't using the classic Storm
> Kafka
> > > > > spout)
> > > > > > >> > >>    For this issue, my understanding of Kristopher's
> answer
> > is
> > > > > that
> > > > > > >> this
> > > > > > >> > >> is "by design" of the StormKafkaClient spout, which
> > instances
> > > > > > >> > progressively
> > > > > > >> > >> joins Kafka consumers group, which causes consumers
> > > > rebalancing.
> > > > > > This
> > > > > > >> > >> rebalancing is "slow", which means that until all spout
> > > > instances
> > > > > > are
> > > > > > >> > >> started, the topology starts with an "initial Kafka Lag"
> > > > > > >> > >>    => Is my understanding correct?
> > > > > > >> > >>    => Why don't we have such behavior with the old Storm
> > > Kafka
> > > > > > spout
> > > > > > >> ?
> > > > > > >> > >>    => Is this annoying initial lag tracked by a JIRA ?
> > > > > > >> > >>
> > > > > > >> > >> Issue#2:
> > > > > > >> > >>     The kafka Lag is increasing constantly and this leads
> > to
> > > > the
> > > > > > >> > overload
> > > > > > >> > >> of the storm worker running the kafka spout. At the end,
> > the
> > > > > worker
> > > > > > >> > crashes
> > > > > > >> > >> and it is automatically restarted by Storm.
> > > > > > >> > >>     => This is unlike what we observe with the old Storm
> > > Kafka
> > > > > > spout
> > > > > > >> > >>     => What is the recommended way to analyze this issue?
> > > > > > >> > >>
> > > > > > >> > >> Issue3:
> > > > > > >> > >>   With the new Kafka Spout, we have faced this exception
> > many
> > > > > > times:
> > > > > > >> > >>
> > > > > > >> > >>  org.apache.kafka.clients.consumer.
> CommitFailedException:
> > > > Commit
> > > > > > >> cannot
> > > > > > >> > >> be completed since the group has already rebalanced and
> > > > assigned
> > > > > > the
> > > > > > >> > >> partitions to another member. This means that the time
> > > between
> > > > > > >> > subsequent
> > > > > > >> > >> calls to poll() was longer than the configured
> > > > > > max.poll.interval.ms,
> > > > > > >> > >> which typically implies that the poll loop is spending
> too
> > > much
> > > > > > time
> > > > > > >> > >> message processing. You can address this either by
> > increasing
> > > > the
> > > > > > >> > session
> > > > > > >> > >> timeout or by reducing the maximum size of batches
> returned
> > > in
> > > > > > poll()
> > > > > > >> > with
> > > > > > >> > >> max.poll.records. at org.apache.kafka.clients.consu
> > > > > > >> > >>
> > > > > > >> > mer.internals.ConsumerCoordinator.
> sendOffsetCommitRequest(Co
> > > > > > >> nsumerCoordinator.java:702)
> > > > > > >> > >> at org.apache.kafka.clients.consumer.internals.
> > > > ConsumerCoordina
> > > > > > >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at
> > > > > > >> > >>
> > > > > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > > > > > >> KafkaConsumer.java:1124)
> > > > > > >> > >> at
> > > > > > >> > org.apache.storm.kafka.spout.KafkaSpout.
> commitOffsetsForAcke
> > > > > > >> dTuples(KafkaSpout.java:384)
> > > > > > >> > >> at
> > > > > > >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(
> > > > > > KafkaSpout.java:220)
> > > > > > >> > >> at
> > > > > > >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> > > > > > >> 10826.invoke(executor.clj:646)
> > > > > > >> > >> at org.apache.storm.util$async_
> > loop$fn__555.invoke(util.clj:
> > > > 484)
> > > > > > at
> > > > > > >> > >> clojure.lang.AFn.run(AFn.java:22) at
> > > > > > java.lang.Thread.run(Thread.ja
> > > > > > >> > >> va:748)
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >>   => Are we the only ones experiencing such issues with
> > Storm
> > > > > > >> > 1.1.0/1.1.x
> > > > > > >> > >> latest ?
> > > > > > >> > >>
> > > > > > >> > >> Note: We are considering writing our own Kafka Spout, as
> > > we're
> > > > > > >> > time-bound
> > > > > > >> > >> to move to Kafka 0.10.x consumers & producers (to prepare
> > our
> > > > > next
> > > > > > >> step
> > > > > > >> > >> with Kafka security, which isn't available with Kafka
> > 0.9.x).
> > > > We
> > > > > > will
> > > > > > >> > miss
> > > > > > >> > >> the integration of Kafka lag in StormUI, but currently we
> > do
> > > > not
> > > > > > >> > understand
> > > > > > >> > >> how to solve the regressions we observe with latest Storm
> > > Kafka
> > > > > > >> client
> > > > > > >> > >> spout.
> > > > > > >> > >>
> > > > > > >> > >> Are there other Storm developers/users who jumped into
> this
> > > > > > >> alternative?
> > > > > > >> > >>
> > > > > > >> > >> Best regards,
> > > > > > >> > >>
> > > > > > >> > >> Alexandre Vermeerbergen
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre
> > > > > > >> Vermeerbergen <
> > > > > > >> > >> [email protected]>:
> > > > > > >> > >>
> > > > > > >> > >>> Hello,
> > > > > > >> > >>>
> > > > > > >> > >>> Coming back to my original post in this list, we have
> two
> > > > issues
> > > > > > >> with
> > > > > > >> > >>> latest 1.1.x StormKafkaClient spout with our setup:
> > > > > > >> > >>>
> > > > > > >> > >>> Issue#1:
> > > > > > >> > >>>  Initial lag (which we hadn't using the classic Storm
> > Kafka
> > > > > spout)
> > > > > > >> > >>>    For this issue, my understanding of Kristopher's
> answer
> > > is
> > > > > that
> > > > > > >> this
> > > > > > >> > >>> is "by design" of the StormKafkaClient spout, which
> > > instances
> > > > > > >> > progressively
> > > > > > >> > >>> joins Kafka consumers group, which causes consumers
> > > > rebalancing.
> > > > > > >> This
> > > > > > >> > >>> rebalancing is "slow", which means that until all spout
> > > > > instances
> > > > > > >> are
> > > > > > >> > >>> started, the topology starts with an "initial Kafka Lag"
> > > > > > >> > >>>    => Is my understanding correct?
> > > > > > >> > >>>    => Why don't we have such behavior with the old Storm
> > > Kafka
> > > > > > >> spout ?
> > > > > > >> > >>>    => Is this annoying initial lag tracked by a JIRA ?
> > > > > > >> > >>>
> > > > > > >> > >>>
> > > > > > >> > >>>
> > > > > > >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
> > > > > > >> > >>> [email protected]>:
> > > > > > >> > >>>
> > > > > > >> > >>>> Hello Kristopher,
> > > > > > >> > >>>>
> > > > > > >> > >>>> We built Storm 1.1.1-latest using yesterday's
> > (2017-06-26
> > > > > > >> > >>>> <20%2017%2006%2026>)  artifacts downloaded from
> > > > > > >> > >>>> https://github.com/apache/storm/tree/1.x-branch.
> > > > > > >> > >>>> <https://github.com/apache/storm/tree/1.x-branch>
> > > > > > >> > >>>>
> > > > > > >> > >>>> Is your latest PR supposed to be in what we downloaded
> &
> > > > built,
> > > > > > or
> > > > > > >> do
> > > > > > >> > >>>> we need to upgrade in some way (which?)
> > > > > > >> > >>>>
> > > > > > >> > >>>> Should we change anything to our settings?
> > > > > > >> > >>>>
> > > > > > >> > >>>> Please note that I mistakenly wrote that our "Kafka
> > > consumer
> > > > > > >> strategy
> > > > > > >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy
> is
> > > set
> > > > > to
> > > > > > >> > LATEST",
> > > > > > >> > >>>> if that matters.
> > > > > > >> > >>>>
> > > > > > >> > >>>> Best regards,
> > > > > > >> > >>>> Alexandre Vermeerbergen
> > > > > > >> > >>>>
> > > > > > >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <
> > > > > [email protected]
> > > > > > >:
> > > > > > >> > >>>>
> > > > > > >> > >>>>> Correction: https://github.com/apache/storm/pull/2174
> > has
> > > > all
> > > > > > of
> > > > > > >> > what
> > > > > > >> > >>>>> I was
> > > > > > >> > >>>>> doing and more.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <
> > > > > > >> > [email protected]
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> wrote:
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> > Alexandre,
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> > There are quite a few JIRAs and discussions around
> > this
> > > > > > >> recently.
> > > > > > >> > >>>>> The
> > > > > > >> > >>>>> > default behavior for storm-kafka-client is the
> > > 'subscribe'
> > > > > API
> > > > > > >> > which
> > > > > > >> > >>>>> causes
> > > > > > >> > >>>>> > the immediate lag you see since rebalance will
> happen
> > > > > > spout(n)-1
> > > > > > >> > >>>>> times
> > > > > > >> > >>>>> > just from the spouts spinning up.
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> > There is a Builder for
> ManualPartitionNamedSubscripti
> > on
> > > > and
> > > > > > the
> > > > > > >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign'
> > > Kafka
> > > > > > >> consumer
> > > > > > >> > >>>>> API) but
> > > > > > >> > >>>>> > they don't work at all.  I hope to have a PR in
> today
> > > > > > >> > >>>>> > to fix these on 1.x-branch
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> > The other JIRAs I mentioned are for a redesign of
> this
> > > > spout
> > > > > > or
> > > > > > >> > >>>>> other more
> > > > > > >> > >>>>> > drastic changes.  My goal is a bug fix for a version
> > of
> > > > the
> > > > > > >> spout
> > > > > > >> > >>>>> that
> > > > > > >> > >>>>> > doesn't provide unnecessary duplicates.
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> > Kris
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre
> > > Vermeerbergen <
> > > > > > >> > >>>>> > [email protected]> wrote:
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> >> Hello All,
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> We have been running for a while our real-time
> > > > supervision
> > > > > > >> > >>>>> application
> > > > > > >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts
> > > (old
> > > > > > >> consumer:
> > > > > > >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster
> based
> > on
> > > > > > Apache
> > > > > > >> > Kafka
> > > > > > >> > >>>>> >> 0.10.1.0.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> Backpressure is activated with default parameters.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>  Key
> > > > > > >> > >>>>> >> Value
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> backpressure.disruptor.high.watermark
> > 0.9
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> backpressure.disruptor.low.watermark
> >  0.4
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> task.backpressure.poll.secs
> > > > > > >>  30
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> topology.backpressure.enable
> > > > > >  true
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to
> > benefit
> > > > from
> > > > > > the
> > > > > > >> > new
> > > > > > >> > >>>>> Kafka
> > > > > > >> > >>>>> >> Spout  (storm-kafka-client lib) with a consumer
> which
> > > has
> > > > > no
> > > > > > >> more
> > > > > > >> > >>>>> >> dependency on Zookeeper.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> After upgrade, we had several issues with kafka
> > > > > consumption.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> We saw that several JIRAs were opened and resolved
> on
> > > > > Apache
> > > > > > >> Storm
> > > > > > >> > >>>>> 1.1.1.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> So we decided to upgrade to the latest available
> > Apache
> > > > > Storm
> > > > > > >> > 1.1.x
> > > > > > >> > >>>>> code
> > > > > > >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>)
> > but
> > > > we
> > > > > > >> still
> > > > > > >> > >>>>> have issues :
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> 1. The kafka Lag is increasing constantly and this
> > > leads
> > > > to
> > > > > > the
> > > > > > >> > >>>>> overload
> > > > > > >> > >>>>> >> of
> > > > > > >> > >>>>> >> the storm worker running the kafka spout. At the
> end,
> > > the
> > > > > > >> worker
> > > > > > >> > >>>>> crashes
> > > > > > >> > >>>>> >> and it is automatically restarted by Storm.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> With old kafka spout version, we had a lag most of
> > the
> > > > > times
> > > > > > >> > bellow
> > > > > > >> > >>>>> 10000.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> With the new one, we are starting with Kafka lag
> > about
> > > > > 30000
> > > > > > >> and
> > > > > > >> > >>>>> >> increasing
> > > > > > >> > >>>>> >> until crash.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> 2. With the new Kafka Spout, we have faced this
> > > exception
> > > > > > many
> > > > > > >> > >>>>> times:
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.
> > > CommitFailedException:
> > > > > > >> Commit
> > > > > > >> > >>>>> cannot be
> > > > > > >> > >>>>> >> completed since the group has already rebalanced
> and
> > > > > assigned
> > > > > > >> the
> > > > > > >> > >>>>> >> partitions to another member. This means that the
> > time
> > > > > > between
> > > > > > >> > >>>>> subsequent
> > > > > > >> > >>>>> >> calls to poll() was longer than the configured
> > > > > > >> > max.poll.interval.ms
> > > > > > >> > >>>>> ,
> > > > > > >> > >>>>> >> which
> > > > > > >> > >>>>> >> typically implies that the poll loop is spending
> too
> > > much
> > > > > > time
> > > > > > >> > >>>>> message
> > > > > > >> > >>>>> >> processing. You can address this either by
> increasing
> > > the
> > > > > > >> session
> > > > > > >> > >>>>> timeout
> > > > > > >> > >>>>> >> or by reducing the maximum size of batches returned
> > in
> > > > > poll()
> > > > > > >> with
> > > > > > >> > >>>>> >> max.poll.records. at
> > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals.
> > > > > ConsumerCoordina
> > > > > > >> > >>>>> >> tor.sendOffsetCommitRequest(
> > > > ConsumerCoordinator.java:702)
> > > > > > >> > >>>>> >> at
> > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals.
> > > > > ConsumerCoordina
> > > > > > >> > >>>>> >> tor.commitOffsetsSync(
> ConsumerCoordinator.java:581)
> > > > > > >> > >>>>> >> at
> > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > commitSync(
> > > > > > >> > >>>>> >> KafkaConsumer.java:1124)
> > > > > > >> > >>>>> >> at
> > > > > > >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout.
> > > > > commitOffsetsForAcke
> > > > > > >> > >>>>> >> dTuples(KafkaSpout.java:384)
> > > > > > >> > >>>>> >> at org.apache.storm.kafka.spout.K
> > > > > > >> afkaSpout.nextTuple(KafkaSpout
> > > > > > >> > >>>>> .java:220)
> > > > > > >> > >>>>> >> at
> > > > > > >> > >>>>> >> org.apache.storm.daemon.
> > executor$fn__10780$fn__10795$
> > > > fn__
> > > > > > >> > >>>>> >> 10826.invoke(executor.clj:646)
> > > > > > >> > >>>>> >> at org.apache.storm.util$async_lo
> > > > > > >> op$fn__555.invoke(util.clj:484)
> > > > > > >> > at
> > > > > > >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at
> > > > > > >> > java.lang.Thread.run(Thread.ja
> > > > > > >> > >>>>> >> va:748)
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> We are using the following configuration for Kafka
> > > Spout
> > > > :
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> poll.timeout.ms 200.
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> offset.commit.period.ms  30000          (30
> > seconds).
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> max.uncommitted.offsets 10000000  (ten million).
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We
> also
> > > set
> > > > > the
> > > > > > >> > >>>>> following
> > > > > > >> > >>>>> >> Kafka consumer parameter :
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> session.timeout.ms  120000
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> Are there any traces/options which we could turn on
> > on
> > > > > Storm
> > > > > > >> or on
> > > > > > >> > >>>>> Kafka
> > > > > > >> > >>>>> >> Broker that might help understanding how to
> stabilize
> > > our
> > > > > > >> > >>>>> topologies with
> > > > > > >> > >>>>> >> this new branch?
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> Thanks!
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >> Alexandre Vermeerbergen
> > > > > > >> > >>>>> >>
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>> >
> > > > > > >> > >>>>>
> > > > > > >> > >>>>
> > > > > > >> > >>>>
> > > > > > >> > >>>
> > > > > > >> > >>
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to