Hi Alexandre, Happy to hear that the spout is working better for you. I think that if you don't need at-least-once, then using auto commit is a good option.
When you enable auto commit, the spout emits tuples unanchored (see the calls to collector.emit here https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L346). This causes Storm to not bother tracking acks and fails for the emitted tuples, which means the counters in Storm UI for acks, fails and complete latency are zeroed out. The complete latency doesn't make sense to track if the tuples are not acked. The rest of the statistics should still be there. I think we implemented it this way from the reasoning that it doesn't make sense to make Storm track and ack/fail tuples if the spout is configured to not care about failures. I tried out a small test topology on a local 2.0.0 Storm setup using auto commit, and here's what I'm seeing: http://imgur.com/a/CoJBa. As you can see the execute latency and emit/transfer counters still work. Is this different from what you're experiencing? If you need the complete latency to be functional while using auto commit, we'll need to update the spout so it emits the tuples with anchors, and then ignores the acks/fails. 2017-07-19 10:12 GMT+02:00 Alexandre Vermeerbergen <[email protected] >: > Hello Stig, > > Thank you very much for your detailed explanations about storm-kafka-client > behavior, it helps us a lot. > > We have topologies for which at-least-one matters, so we're OK to try to > converge them with this default behavior of storm-kafka-client. > > However, our main topology is our "realtime alerting" one. It evaluates > incoming "metrics" (our system is a Supervision system) against "triggers" > (same meaning as in Zabbix), and here we prefer to drop anything that we > couldn't evaluate quickly enough. Indeed, to do "real-time alerting", we > know that if we skip a few metrics, then, shortly after, there will be > newer ones that will suffice to get a "near real time enough" view of the > "current state of our supervised devices". > > This is the reason why we recently turned on "autocommit" in > storm-kafka-client, to overcome the growing "lag" or OOM we eventually got. > > Now our topology works like a charm. > > Except that with autocommit turned on, no acks are sent from > storm-kafka-client spouts, so Storm UI isn't displaying any statistics > information about our topology, which is a big annoyance for us. > > Question: > * Is our approach to use autocommit OK, or given our scenario, do you > recommend othewise? > * If it's OK to use autocommit, then how can we configure the > storm-kafka-client spout to "blindly" ACK all tuples it sends? > > Best regards, > Alexandre Vermeerbergen > > > > > 2017-07-05 17:22 GMT+02:00 Stig Døssing <[email protected]>: > > > Regarding the offsetManagers map, I think the only thing it contains that > > should take up any space is uncommitted offsets. Those should be cleared > > out when the spout commits offsets to Kafka. There have been at least a > few > > issues related to that recently, e.g. > > https://github.com/apache/storm/pull/2153, which may be what you're > > running > > into. The spout should be able to limit how many uncommitted offsets it > > will accept before slowing down, but this functionality is still buggy in > > some cases (see https://github.com/apache/storm/pull/2156, > > https://github.com/apache/storm/pull/2152). > > > > The storm-kafka-client spout doesn't share code with storm-kafka, it's an > > implementation based on a different Kafka client (this one > > https://kafka.apache.org/0100/javadoc/index.html?org/apache/ > > kafka/clients/consumer/KafkaConsumer.html), > > where the storm-kafka client used the now deprecated > > https://cwiki.apache.org/confluence/display/KAFKA/0.8. > > 0+SimpleConsumer+Example. > > There are still some bugs on the new spout we're working out. > > > > Auto commit is not enabled in storm-kafka, because the SimpleConsumer > > doesn't have that concept. The old spout kept track of processed offsets > by > > storing how far it had read in Zookeeper. The new client allows us to > mark > > how far the spout has read in Kafka, which can either be done via the > > commit methods on the KafkaConsumer (auto commit disabled), or > periodically > > by a background thread in the consumer (auto commit enabled). Normally > when > > the spout emits a message into the topology, we want to only mark that > > message as "done" once Storm tells the spout to ack the tuple. This is > the > > behavior when auto commit is disabled; the spout emits some tuples, > > receives some acks, and periodically uses the commitSync method to mark > the > > acked tuples as "done" in Kafka. If auto commit is enabled, the > > KafkaConsumer will instead periodically commit the tuples it has returned > > from polls. The effect is that Storm can't control when an offset is > > considered "done", which means that some messages can be lost. This is > fine > > for some use cases where you don't necessarily need to process all > > messages. > > > > Leaving auto commit disabled is the closest match to how the storm-kafka > > spout handled messages. We disable auto commit by default in the new > spout > > because at-least-once processing guarantees should be the default, not > > something people have to opt in to. > > > > 2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen < > > [email protected] > > >: > > > > > Hello, > > > > > > To continue on this thread: > > > > > > Since we are facing some OOMEs and Kafka lag peaks with our topologies > > > using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on Kafka > > > 0.10.2.0 libs), we decided to test a much simpler Storm topology. > > > > > > This test topology is very basic and is composed of : > > > > > > - one instance of Kafka spout consuming a topic composed of 4 > partitions > > > with a throughput of about 3000 messages per second and with message > size > > > of about 1500 bytes, the consumption strategy is set to "LATEST", other > > > Kafka Spout parameters are default ones > > > > > > - one basic bolt which is just counting received tuples and logging > > > statistics every minutes (min ,max, avg for number of tuples received > per > > > min) with acking every tuple > > > > > > Back pressure is activated with default watermark parameters. > > > > > > topology.message.timeout.secs is not changed (default value is 30). > > > > > > Message are encrypted (using our own encryption method) serialized > java > > > objects but we are just using the provided ByteArrayDeserializer for > this > > > test since we just want to count tuples without any custom > > deserialization > > > interference. > > > > > > This topology is running alone on a single VM (8 vCPU) with one worker > > > configured with 2 GB of RAM for Xmx JVM option. > > > > > > Please also note that our cluster of Kafka Brokers is based on Apache > > Kafka > > > 0.10.1.0 > > > > > > > > > > > > We are tracking the Kafka lag for the topic we are consuming. > > > > > > Kafka lag is 10000 at topology start but it decreased slowly to 2000 > > after > > > 24h of execution. > > > > > > > > > > > > We did not face any OOM error this time so we are suspecting that the > > > previous OOMEs were due to some lag in the much more complex topology > we > > > were running and due to Storm backpressure slowing down the Spout > > > consumption. > > > > > > In fact, it seems that the following Map in the > > > org.apache.storm.kafka.spout.KafkaSpout class was the root cause of > the > > > errors : > > > > > > ============================================================ > > > ========================================= > > > > > > // Tuples that were successfully acked/emitted. These tuples will be > > > committed periodically when the commit timer expires, //or after a > > consumer > > > rebalance, or during close/deactivate private transient > > Map<TopicPartition, > > > OffsetManager> offsetManagers; > > > ============================================================ > > > ========================================= > > > > > > > > > > > > Please note however that these errors did not occur with the "old" > > version > > > of Kafka spout (storm-kafka lib). > > > > > > We ran another test with the same test topology but with > > enable.auto.commit > > > set to true in Kafka consumer properties, lag was starting at about > 2000 > > > and staying constant for at least 24h. > > > > > > Our understanding is that auto commit is enabled by default in Kafka > > > consumers and also in old version of Kafka spouts (storm-kafka) but it > is > > > disabled in the new Kafka spout version (storm-kafka-client) to manage > > > tuple acking, is it correct please ? > > > > > > What are the consequences of activating auto commit for consumers with > > the > > > new Kafka Spout then ? > > > > > > Thank you for your feedbacks, > > > > > > Alexandre Vermeerbergen > > > > > > 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen < > > > [email protected] > > > >: > > > > > > > Hello All, > > > > > > > > Thank you very much for your feedbacks on our experience with > > > > storm-kafka-client, we're going to take your suggestions into account > > to > > > > dig into our issues and we'll feedback as soon as we have more to > > share. > > > > > > > > Best regards, > > > > Alexandre Vermeerbergen > > > > > > > > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>: > > > > > > > >> Hi Alexandre, > > > >> > > > >> I don't know much of storm-kafka-client, but at a glimpse, I can't > > find > > > >> misuse of HashMap in KafkaSpout so I'd rather suspect that > > OffsetManager > > > >> being really huge. If you are willing to dig more on KafkaSpout OOME > > > >> issue, > > > >> you can get more information of KafkaSpout for tracking with > changing > > > log > > > >> level to DEBUG or even TRACE. > > > >> > > > >> Thanks, > > > >> Jungtaek Lim (HeartSaVioR) > > > >> > > > >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <[email protected]>님이 > > 작성: > > > >> > > > >> > Hi Alexandre, > > > >> > > > > >> > About issue 1: > > > >> > This issue is not by design. It is a side effect of the spout > > > internally > > > >> > using the KafkaConsumer's subscribe API instead of the assign API. > > > >> Support > > > >> > for using the assign API was added a while back, but has a bug > that > > is > > > >> > preventing the spout from starting when configured to use that > API. > > We > > > >> are > > > >> > working on fixing the issues with that implementation in these PRs > > > >> > https://github.com/apache/storm/pull/2150, > > https://github.com/apache/ > > > >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174>. I > > think > > > it > > > >> > is very likely that we will remove support for > > > >> > the subscribe API at some point as well, making the assign API the > > > >> default, > > > >> > since several users have had issues with the subscribe API's > > behavior. > > > >> > > > > >> > Once the assign API support is fixed, you can switch to using it > via > > > >> this > > > >> > KafkaSpoutConfig Builder constructor https://github.com/apache/ > > > >> > storm/blob/master/external/storm-kafka-client/src/main/ > > > >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and > > this > > > >> > Subscription implementation https://github.com/srdo/storm/blob/ > > > >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm- > > > >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > > >> > ManualPartitionSubscription.java > > > >> > > > > >> > If you'd like to try out the code from the PR branch, you can > check > > it > > > >> out > > > >> > with some of the steps described here > > > >> > > > > >> > https://help.github.com/articles/checking-out-pull-requests- > > > >> locally/#modifying-an-inactive-pull-request-locally > > > >> > . > > > >> > > > > >> > Note that the PRs for fixing the manual partitioning option are > > > against > > > >> the > > > >> > master branch right now, which is targeting Storm version 2.0.0, > > but I > > > >> > believe you may be able to use the 2.0.0 storm-kafka-client jar > > with a > > > >> > 1.1.0 cluster. > > > >> > > > > >> > Switching to the assign API should solve remove the instability as > > the > > > >> > spouts start. > > > >> > > > > >> > About issue 2: > > > >> > The spout lag shouldn't have an effect on memory use. I'm > wondering > > if > > > >> your > > > >> > spout instances are not progressing at all, which might explain > the > > > lag? > > > >> > You should be able to check this using the > kafka-consumer-groups.sh > > > >> script > > > >> > in the Kafka /bin directory. Once you've started the spout, you > can > > > use > > > >> the > > > >> > script to inspect which offsets the consumer group have committed. > > Try > > > >> > checking if the offsets are moving once the spouts have started > > > running. > > > >> > > > > >> > I can't spout any suspicious use of HashMap in the 1.x KafkaSpout. > > > Your > > > >> > attachment didn't make it through, could you post it somewhere? > > > >> > > > > >> > About issue 3: > > > >> > The CommitException you are getting is likely because we use the > > > >> subscribe > > > >> > API. When using the subscribe API Kafka is in control of partition > > > >> > assigment, and will reassign partitions if a consumer doesn't call > > > poll > > > >> on > > > >> > the consumer often enough. The default is that the consumer must > be > > > >> polled > > > >> > at least every 5 minutes. See max.poll.interval.ms in the Kafka > > > >> consumer > > > >> > configuration. The assign API doesn't require this, and won't shut > > > down > > > >> > spouts because they are too slow. > > > >> > > > > >> > There's likely still another underlying issue, because it seems > > > strange > > > >> > that your spout instances are not polling at least once per 5 > > minutes, > > > >> at > > > >> > least if you didn't set a high max.poll.records. It's almost > > certainly > > > >> > related to issue 2. Do you have acking enabled, and what is your > > > >> > topology.message.timeout.secs? > > > >> > > > > >> > I'm not sure I understand why you would want to write your own > spout > > > >> from > > > >> > scratch, rather than contributing fixes to the storm-kafka-client > > > >> spout. It > > > >> > seems likely to be more effort than fixing the problems with > > > >> > storm-kafka-client. > > > >> > > > > >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen < > > > >> > [email protected] > > > >> > >: > > > >> > > > > >> > > More about this thread: we noticed that with StormKafkaClient > > 1.1.x > > > >> > > latest, we get OutOfMemoryError after ~2hours of running our > > simple > > > >> test > > > >> > > topology. > > > >> > > > > > >> > > We reproduce it everytime, so we decided to generate a heap dump > > > >> before > > > >> > > the OutOfMemoryError, and viewed the result using EclipseMAT. > > > >> > > > > > >> > > The results tends to show that there's a memory leak in > > > >> KafkaSpoutClient: > > > >> > > ============================================================ > > > ========= > > > >> > > > > > >> > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* > > loaded > > > by > > > >> > *"sun.misc.Launcher$AppClassLoader > > > >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The > memory > > > is > > > >> > > accumulated in one instance of *"java.util.HashMap$Node[]"* > loaded > > > by > > > >> > *"<system > > > >> > > class loader>"*. > > > >> > > > > > >> > > *Keywords* > > > >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98 > > > >> > > java.util.HashMap$Node[] > > > >> > > org.apache.storm.kafka.spout.KafkaSpout > > > >> > > > > > >> > > ============================================================ > > > ========= > > > >> > > > > > >> > > See attached screenshots of EclipseMAT session showing graphical > > > >> > > representation of memory usage > > > >> > > > > > >> > > FYI we tried to follow instructions from > > > >> https://docs.hortonworks.com/ > > > >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/ > > > >> > > content/storm-kafkaspout-perf.html to avoid the use of too much > > > >> memory, > > > >> > > but still after 2 hours the memory fills up and the process > > hosting > > > >> our > > > >> > > spout is killed by Supervisor... > > > >> > > > > > >> > > Any clue of what we may have missed? > > > >> > > > > > >> > > Best regards, > > > >> > > Alexandre Vermeerbergen > > > >> > > > > > >> > > > > > >> > > > > > >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre > > > Vermeerbergen > > > >> < > > > >> > > [email protected]>: > > > >> > > > > > >> > >> Oops, sent my last mail too fast, let me continue it: > > > >> > >> > > > >> > >> Hello, > > > >> > >> > > > >> > >> Coming back to my original post in this list, we have 3 issues > > with > > > >> > >> latest 1.1.x StormKafkaClient spout with our setup: > > > >> > >> > > > >> > >> Issue#1: > > > >> > >> Initial lag (which we hadn't using the classic Storm Kafka > > spout) > > > >> > >> For this issue, my understanding of Kristopher's answer is > > that > > > >> this > > > >> > >> is "by design" of the StormKafkaClient spout, which instances > > > >> > progressively > > > >> > >> joins Kafka consumers group, which causes consumers > rebalancing. > > > This > > > >> > >> rebalancing is "slow", which means that until all spout > instances > > > are > > > >> > >> started, the topology starts with an "initial Kafka Lag" > > > >> > >> => Is my understanding correct? > > > >> > >> => Why don't we have such behavior with the old Storm Kafka > > > spout > > > >> ? > > > >> > >> => Is this annoying initial lag tracked by a JIRA ? > > > >> > >> > > > >> > >> Issue#2: > > > >> > >> The kafka Lag is increasing constantly and this leads to > the > > > >> > overload > > > >> > >> of the storm worker running the kafka spout. At the end, the > > worker > > > >> > crashes > > > >> > >> and it is automatically restarted by Storm. > > > >> > >> => This is unlike what we observe with the old Storm Kafka > > > spout > > > >> > >> => What is the recommended way to analyze this issue? > > > >> > >> > > > >> > >> Issue3: > > > >> > >> With the new Kafka Spout, we have faced this exception many > > > times: > > > >> > >> > > > >> > >> org.apache.kafka.clients.consumer.CommitFailedException: > Commit > > > >> cannot > > > >> > >> be completed since the group has already rebalanced and > assigned > > > the > > > >> > >> partitions to another member. This means that the time between > > > >> > subsequent > > > >> > >> calls to poll() was longer than the configured > > > max.poll.interval.ms, > > > >> > >> which typically implies that the poll loop is spending too much > > > time > > > >> > >> message processing. You can address this either by increasing > the > > > >> > session > > > >> > >> timeout or by reducing the maximum size of batches returned in > > > poll() > > > >> > with > > > >> > >> max.poll.records. at org.apache.kafka.clients.consu > > > >> > >> > > > >> > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co > > > >> nsumerCoordinator.java:702) > > > >> > >> at org.apache.kafka.clients.consumer.internals. > ConsumerCoordina > > > >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at > > > >> > >> > > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > > >> KafkaConsumer.java:1124) > > > >> > >> at > > > >> > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke > > > >> dTuples(KafkaSpout.java:384) > > > >> > >> at > > > >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple( > > > KafkaSpout.java:220) > > > >> > >> at > > > >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ > > > >> 10826.invoke(executor.clj:646) > > > >> > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj: > 484) > > > at > > > >> > >> clojure.lang.AFn.run(AFn.java:22) at > > > java.lang.Thread.run(Thread.ja > > > >> > >> va:748) > > > >> > >> > > > >> > >> > > > >> > >> => Are we the only ones experiencing such issues with Storm > > > >> > 1.1.0/1.1.x > > > >> > >> latest ? > > > >> > >> > > > >> > >> Note: We are considering writing our own Kafka Spout, as we're > > > >> > time-bound > > > >> > >> to move to Kafka 0.10.x consumers & producers (to prepare our > > next > > > >> step > > > >> > >> with Kafka security, which isn't available with Kafka 0.9.x). > We > > > will > > > >> > miss > > > >> > >> the integration of Kafka lag in StormUI, but currently we do > not > > > >> > understand > > > >> > >> how to solve the regressions we observe with latest Storm Kafka > > > >> client > > > >> > >> spout. > > > >> > >> > > > >> > >> Are there other Storm developers/users who jumped into this > > > >> alternative? > > > >> > >> > > > >> > >> Best regards, > > > >> > >> > > > >> > >> Alexandre Vermeerbergen > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre > > > >> Vermeerbergen < > > > >> > >> [email protected]>: > > > >> > >> > > > >> > >>> Hello, > > > >> > >>> > > > >> > >>> Coming back to my original post in this list, we have two > issues > > > >> with > > > >> > >>> latest 1.1.x StormKafkaClient spout with our setup: > > > >> > >>> > > > >> > >>> Issue#1: > > > >> > >>> Initial lag (which we hadn't using the classic Storm Kafka > > spout) > > > >> > >>> For this issue, my understanding of Kristopher's answer is > > that > > > >> this > > > >> > >>> is "by design" of the StormKafkaClient spout, which instances > > > >> > progressively > > > >> > >>> joins Kafka consumers group, which causes consumers > rebalancing. > > > >> This > > > >> > >>> rebalancing is "slow", which means that until all spout > > instances > > > >> are > > > >> > >>> started, the topology starts with an "initial Kafka Lag" > > > >> > >>> => Is my understanding correct? > > > >> > >>> => Why don't we have such behavior with the old Storm Kafka > > > >> spout ? > > > >> > >>> => Is this annoying initial lag tracked by a JIRA ? > > > >> > >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen < > > > >> > >>> [email protected]>: > > > >> > >>> > > > >> > >>>> Hello Kristopher, > > > >> > >>>> > > > >> > >>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26 > > > >> > >>>> <20%2017%2006%2026>) artifacts downloaded from > > > >> > >>>> https://github.com/apache/storm/tree/1.x-branch. > > > >> > >>>> <https://github.com/apache/storm/tree/1.x-branch> > > > >> > >>>> > > > >> > >>>> Is your latest PR supposed to be in what we downloaded & > built, > > > or > > > >> do > > > >> > >>>> we need to upgrade in some way (which?) > > > >> > >>>> > > > >> > >>>> Should we change anything to our settings? > > > >> > >>>> > > > >> > >>>> Please note that I mistakenly wrote that our "Kafka consumer > > > >> strategy > > > >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is set > > to > > > >> > LATEST", > > > >> > >>>> if that matters. > > > >> > >>>> > > > >> > >>>> Best regards, > > > >> > >>>> Alexandre Vermeerbergen > > > >> > >>>> > > > >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane < > > [email protected] > > > >: > > > >> > >>>> > > > >> > >>>>> Correction: https://github.com/apache/storm/pull/2174 has > all > > > of > > > >> > what > > > >> > >>>>> I was > > > >> > >>>>> doing and more. > > > >> > >>>>> > > > >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane < > > > >> > [email protected] > > > >> > >>>>> > > > > >> > >>>>> wrote: > > > >> > >>>>> > > > >> > >>>>> > Alexandre, > > > >> > >>>>> > > > > >> > >>>>> > There are quite a few JIRAs and discussions around this > > > >> recently. > > > >> > >>>>> The > > > >> > >>>>> > default behavior for storm-kafka-client is the 'subscribe' > > API > > > >> > which > > > >> > >>>>> causes > > > >> > >>>>> > the immediate lag you see since rebalance will happen > > > spout(n)-1 > > > >> > >>>>> times > > > >> > >>>>> > just from the spouts spinning up. > > > >> > >>>>> > > > > >> > >>>>> > There is a Builder for ManualPartitionNamedSubscription > and > > > the > > > >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka > > > >> consumer > > > >> > >>>>> API) but > > > >> > >>>>> > they don't work at all. I hope to have a PR in today > > > >> > >>>>> > to fix these on 1.x-branch > > > >> > >>>>> > > > > >> > >>>>> > The other JIRAs I mentioned are for a redesign of this > spout > > > or > > > >> > >>>>> other more > > > >> > >>>>> > drastic changes. My goal is a bug fix for a version of > the > > > >> spout > > > >> > >>>>> that > > > >> > >>>>> > doesn't provide unnecessary duplicates. > > > >> > >>>>> > > > > >> > >>>>> > Kris > > > >> > >>>>> > > > > >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < > > > >> > >>>>> > [email protected]> wrote: > > > >> > >>>>> > > > > >> > >>>>> >> Hello All, > > > >> > >>>>> >> > > > >> > >>>>> >> We have been running for a while our real-time > supervision > > > >> > >>>>> application > > > >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old > > > >> consumer: > > > >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster based on > > > Apache > > > >> > Kafka > > > >> > >>>>> >> 0.10.1.0. > > > >> > >>>>> >> > > > >> > >>>>> >> Backpressure is activated with default parameters. > > > >> > >>>>> >> > > > >> > >>>>> >> Key > > > >> > >>>>> >> Value > > > >> > >>>>> >> > > > >> > >>>>> >> backpressure.disruptor.high.watermark 0.9 > > > >> > >>>>> >> > > > >> > >>>>> >> backpressure.disruptor.low.watermark 0.4 > > > >> > >>>>> >> > > > >> > >>>>> >> task.backpressure.poll.secs > > > >> 30 > > > >> > >>>>> >> > > > >> > >>>>> >> topology.backpressure.enable > > > true > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit > from > > > the > > > >> > new > > > >> > >>>>> Kafka > > > >> > >>>>> >> Spout (storm-kafka-client lib) with a consumer which has > > no > > > >> more > > > >> > >>>>> >> dependency on Zookeeper. > > > >> > >>>>> >> > > > >> > >>>>> >> After upgrade, we had several issues with kafka > > consumption. > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> We saw that several JIRAs were opened and resolved on > > Apache > > > >> Storm > > > >> > >>>>> 1.1.1. > > > >> > >>>>> >> > > > >> > >>>>> >> So we decided to upgrade to the latest available Apache > > Storm > > > >> > 1.1.x > > > >> > >>>>> code > > > >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but > we > > > >> still > > > >> > >>>>> have issues : > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> 1. The kafka Lag is increasing constantly and this leads > to > > > the > > > >> > >>>>> overload > > > >> > >>>>> >> of > > > >> > >>>>> >> the storm worker running the kafka spout. At the end, the > > > >> worker > > > >> > >>>>> crashes > > > >> > >>>>> >> and it is automatically restarted by Storm. > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> With old kafka spout version, we had a lag most of the > > times > > > >> > bellow > > > >> > >>>>> 10000. > > > >> > >>>>> >> > > > >> > >>>>> >> With the new one, we are starting with Kafka lag about > > 30000 > > > >> and > > > >> > >>>>> >> increasing > > > >> > >>>>> >> until crash. > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> 2. With the new Kafka Spout, we have faced this exception > > > many > > > >> > >>>>> times: > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> org.apache.kafka.clients.consumer.CommitFailedException: > > > >> Commit > > > >> > >>>>> cannot be > > > >> > >>>>> >> completed since the group has already rebalanced and > > assigned > > > >> the > > > >> > >>>>> >> partitions to another member. This means that the time > > > between > > > >> > >>>>> subsequent > > > >> > >>>>> >> calls to poll() was longer than the configured > > > >> > max.poll.interval.ms > > > >> > >>>>> , > > > >> > >>>>> >> which > > > >> > >>>>> >> typically implies that the poll loop is spending too much > > > time > > > >> > >>>>> message > > > >> > >>>>> >> processing. You can address this either by increasing the > > > >> session > > > >> > >>>>> timeout > > > >> > >>>>> >> or by reducing the maximum size of batches returned in > > poll() > > > >> with > > > >> > >>>>> >> max.poll.records. at > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals. > > ConsumerCoordina > > > >> > >>>>> >> tor.sendOffsetCommitRequest( > ConsumerCoordinator.java:702) > > > >> > >>>>> >> at > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals. > > ConsumerCoordina > > > >> > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) > > > >> > >>>>> >> at > > > >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer. > > commitSync( > > > >> > >>>>> >> KafkaConsumer.java:1124) > > > >> > >>>>> >> at > > > >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout. > > commitOffsetsForAcke > > > >> > >>>>> >> dTuples(KafkaSpout.java:384) > > > >> > >>>>> >> at org.apache.storm.kafka.spout.K > > > >> afkaSpout.nextTuple(KafkaSpout > > > >> > >>>>> .java:220) > > > >> > >>>>> >> at > > > >> > >>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$ > fn__ > > > >> > >>>>> >> 10826.invoke(executor.clj:646) > > > >> > >>>>> >> at org.apache.storm.util$async_lo > > > >> op$fn__555.invoke(util.clj:484) > > > >> > at > > > >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at > > > >> > java.lang.Thread.run(Thread.ja > > > >> > >>>>> >> va:748) > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> We are using the following configuration for Kafka Spout > : > > > >> > >>>>> >> > > > >> > >>>>> >> poll.timeout.ms 200. > > > >> > >>>>> >> > > > >> > >>>>> >> offset.commit.period.ms 30000 (30 seconds). > > > >> > >>>>> >> > > > >> > >>>>> >> max.uncommitted.offsets 10000000 (ten million). > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> > > > >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We also set > > the > > > >> > >>>>> following > > > >> > >>>>> >> Kafka consumer parameter : > > > >> > >>>>> >> > > > >> > >>>>> >> session.timeout.ms 120000 > > > >> > >>>>> >> > > > >> > >>>>> >> Are there any traces/options which we could turn on on > > Storm > > > >> or on > > > >> > >>>>> Kafka > > > >> > >>>>> >> Broker that might help understanding how to stabilize our > > > >> > >>>>> topologies with > > > >> > >>>>> >> this new branch? > > > >> > >>>>> >> > > > >> > >>>>> >> Thanks! > > > >> > >>>>> >> > > > >> > >>>>> >> Alexandre Vermeerbergen > > > >> > >>>>> >> > > > >> > >>>>> > > > > >> > >>>>> > > > > >> > >>>>> > > > >> > >>>> > > > >> > >>>> > > > >> > >>> > > > >> > >> > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >
