Hello Stig, Thank you very much for your detailed explanations about storm-kafka-client behavior, it helps us a lot.
We have topologies for which at-least-one matters, so we're OK to try to converge them with this default behavior of storm-kafka-client. However, our main topology is our "realtime alerting" one. It evaluates incoming "metrics" (our system is a Supervision system) against "triggers" (same meaning as in Zabbix), and here we prefer to drop anything that we couldn't evaluate quickly enough. Indeed, to do "real-time alerting", we know that if we skip a few metrics, then, shortly after, there will be newer ones that will suffice to get a "near real time enough" view of the "current state of our supervised devices". This is the reason why we recently turned on "autocommit" in storm-kafka-client, to overcome the growing "lag" or OOM we eventually got. Now our topology works like a charm. Except that with autocommit turned on, no acks are sent from storm-kafka-client spouts, so Storm UI isn't displaying any statistics information about our topology, which is a big annoyance for us. Question: * Is our approach to use autocommit OK, or given our scenario, do you recommend othewise? * If it's OK to use autocommit, then how can we configure the storm-kafka-client spout to "blindly" ACK all tuples it sends? Best regards, Alexandre Vermeerbergen 2017-07-05 17:22 GMT+02:00 Stig Døssing <[email protected]>: > Regarding the offsetManagers map, I think the only thing it contains that > should take up any space is uncommitted offsets. Those should be cleared > out when the spout commits offsets to Kafka. There have been at least a few > issues related to that recently, e.g. > https://github.com/apache/storm/pull/2153, which may be what you're > running > into. The spout should be able to limit how many uncommitted offsets it > will accept before slowing down, but this functionality is still buggy in > some cases (see https://github.com/apache/storm/pull/2156, > https://github.com/apache/storm/pull/2152). > > The storm-kafka-client spout doesn't share code with storm-kafka, it's an > implementation based on a different Kafka client (this one > https://kafka.apache.org/0100/javadoc/index.html?org/apache/ > kafka/clients/consumer/KafkaConsumer.html), > where the storm-kafka client used the now deprecated > https://cwiki.apache.org/confluence/display/KAFKA/0.8. > 0+SimpleConsumer+Example. > There are still some bugs on the new spout we're working out. > > Auto commit is not enabled in storm-kafka, because the SimpleConsumer > doesn't have that concept. The old spout kept track of processed offsets by > storing how far it had read in Zookeeper. The new client allows us to mark > how far the spout has read in Kafka, which can either be done via the > commit methods on the KafkaConsumer (auto commit disabled), or periodically > by a background thread in the consumer (auto commit enabled). Normally when > the spout emits a message into the topology, we want to only mark that > message as "done" once Storm tells the spout to ack the tuple. This is the > behavior when auto commit is disabled; the spout emits some tuples, > receives some acks, and periodically uses the commitSync method to mark the > acked tuples as "done" in Kafka. If auto commit is enabled, the > KafkaConsumer will instead periodically commit the tuples it has returned > from polls. The effect is that Storm can't control when an offset is > considered "done", which means that some messages can be lost. This is fine > for some use cases where you don't necessarily need to process all > messages. > > Leaving auto commit disabled is the closest match to how the storm-kafka > spout handled messages. We disable auto commit by default in the new spout > because at-least-once processing guarantees should be the default, not > something people have to opt in to. > > 2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen < > [email protected] > >: > > > Hello, > > > > To continue on this thread: > > > > Since we are facing some OOMEs and Kafka lag peaks with our topologies > > using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on Kafka > > 0.10.2.0 libs), we decided to test a much simpler Storm topology. > > > > This test topology is very basic and is composed of : > > > > - one instance of Kafka spout consuming a topic composed of 4 partitions > > with a throughput of about 3000 messages per second and with message size > > of about 1500 bytes, the consumption strategy is set to "LATEST", other > > Kafka Spout parameters are default ones > > > > - one basic bolt which is just counting received tuples and logging > > statistics every minutes (min ,max, avg for number of tuples received per > > min) with acking every tuple > > > > Back pressure is activated with default watermark parameters. > > > > topology.message.timeout.secs is not changed (default value is 30). > > > > Message are encrypted (using our own encryption method) serialized java > > objects but we are just using the provided ByteArrayDeserializer for this > > test since we just want to count tuples without any custom > deserialization > > interference. > > > > This topology is running alone on a single VM (8 vCPU) with one worker > > configured with 2 GB of RAM for Xmx JVM option. > > > > Please also note that our cluster of Kafka Brokers is based on Apache > Kafka > > 0.10.1.0 > > > > > > > > We are tracking the Kafka lag for the topic we are consuming. > > > > Kafka lag is 10000 at topology start but it decreased slowly to 2000 > after > > 24h of execution. > > > > > > > > We did not face any OOM error this time so we are suspecting that the > > previous OOMEs were due to some lag in the much more complex topology we > > were running and due to Storm backpressure slowing down the Spout > > consumption. > > > > In fact, it seems that the following Map in the > > org.apache.storm.kafka.spout.KafkaSpout class was the root cause of the > > errors : > > > > ============================================================ > > ========================================= > > > > // Tuples that were successfully acked/emitted. These tuples will be > > committed periodically when the commit timer expires, //or after a > consumer > > rebalance, or during close/deactivate private transient > Map<TopicPartition, > > OffsetManager> offsetManagers; > > ============================================================ > > ========================================= > > > > > > > > Please note however that these errors did not occur with the "old" > version > > of Kafka spout (storm-kafka lib). > > > > We ran another test with the same test topology but with > enable.auto.commit > > set to true in Kafka consumer properties, lag was starting at about 2000 > > and staying constant for at least 24h. > > > > Our understanding is that auto commit is enabled by default in Kafka > > consumers and also in old version of Kafka spouts (storm-kafka) but it is > > disabled in the new Kafka spout version (storm-kafka-client) to manage > > tuple acking, is it correct please ? > > > > What are the consequences of activating auto commit for consumers with > the > > new Kafka Spout then ? > > > > Thank you for your feedbacks, > > > > Alexandre Vermeerbergen > > > > 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen < > > [email protected] > > >: > > > > > Hello All, > > > > > > Thank you very much for your feedbacks on our experience with > > > storm-kafka-client, we're going to take your suggestions into account > to > > > dig into our issues and we'll feedback as soon as we have more to > share. > > > > > > Best regards, > > > Alexandre Vermeerbergen > > > > > > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>: > > > > > >> Hi Alexandre, > > >> > > >> I don't know much of storm-kafka-client, but at a glimpse, I can't > find > > >> misuse of HashMap in KafkaSpout so I'd rather suspect that > OffsetManager > > >> being really huge. If you are willing to dig more on KafkaSpout OOME > > >> issue, > > >> you can get more information of KafkaSpout for tracking with changing > > log > > >> level to DEBUG or even TRACE. > > >> > > >> Thanks, > > >> Jungtaek Lim (HeartSaVioR) > > >> > > >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing <[email protected]>님이 > 작성: > > >> > > >> > Hi Alexandre, > > >> > > > >> > About issue 1: > > >> > This issue is not by design. It is a side effect of the spout > > internally > > >> > using the KafkaConsumer's subscribe API instead of the assign API. > > >> Support > > >> > for using the assign API was added a while back, but has a bug that > is > > >> > preventing the spout from starting when configured to use that API. > We > > >> are > > >> > working on fixing the issues with that implementation in these PRs > > >> > https://github.com/apache/storm/pull/2150, > https://github.com/apache/ > > >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174>. I > think > > it > > >> > is very likely that we will remove support for > > >> > the subscribe API at some point as well, making the assign API the > > >> default, > > >> > since several users have had issues with the subscribe API's > behavior. > > >> > > > >> > Once the assign API support is fixed, you can switch to using it via > > >> this > > >> > KafkaSpoutConfig Builder constructor https://github.com/apache/ > > >> > storm/blob/master/external/storm-kafka-client/src/main/ > > >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136 and > this > > >> > Subscription implementation https://github.com/srdo/storm/blob/ > > >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm- > > >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > >> > ManualPartitionSubscription.java > > >> > > > >> > If you'd like to try out the code from the PR branch, you can check > it > > >> out > > >> > with some of the steps described here > > >> > > > >> > https://help.github.com/articles/checking-out-pull-requests- > > >> locally/#modifying-an-inactive-pull-request-locally > > >> > . > > >> > > > >> > Note that the PRs for fixing the manual partitioning option are > > against > > >> the > > >> > master branch right now, which is targeting Storm version 2.0.0, > but I > > >> > believe you may be able to use the 2.0.0 storm-kafka-client jar > with a > > >> > 1.1.0 cluster. > > >> > > > >> > Switching to the assign API should solve remove the instability as > the > > >> > spouts start. > > >> > > > >> > About issue 2: > > >> > The spout lag shouldn't have an effect on memory use. I'm wondering > if > > >> your > > >> > spout instances are not progressing at all, which might explain the > > lag? > > >> > You should be able to check this using the kafka-consumer-groups.sh > > >> script > > >> > in the Kafka /bin directory. Once you've started the spout, you can > > use > > >> the > > >> > script to inspect which offsets the consumer group have committed. > Try > > >> > checking if the offsets are moving once the spouts have started > > running. > > >> > > > >> > I can't spout any suspicious use of HashMap in the 1.x KafkaSpout. > > Your > > >> > attachment didn't make it through, could you post it somewhere? > > >> > > > >> > About issue 3: > > >> > The CommitException you are getting is likely because we use the > > >> subscribe > > >> > API. When using the subscribe API Kafka is in control of partition > > >> > assigment, and will reassign partitions if a consumer doesn't call > > poll > > >> on > > >> > the consumer often enough. The default is that the consumer must be > > >> polled > > >> > at least every 5 minutes. See max.poll.interval.ms in the Kafka > > >> consumer > > >> > configuration. The assign API doesn't require this, and won't shut > > down > > >> > spouts because they are too slow. > > >> > > > >> > There's likely still another underlying issue, because it seems > > strange > > >> > that your spout instances are not polling at least once per 5 > minutes, > > >> at > > >> > least if you didn't set a high max.poll.records. It's almost > certainly > > >> > related to issue 2. Do you have acking enabled, and what is your > > >> > topology.message.timeout.secs? > > >> > > > >> > I'm not sure I understand why you would want to write your own spout > > >> from > > >> > scratch, rather than contributing fixes to the storm-kafka-client > > >> spout. It > > >> > seems likely to be more effort than fixing the problems with > > >> > storm-kafka-client. > > >> > > > >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen < > > >> > [email protected] > > >> > >: > > >> > > > >> > > More about this thread: we noticed that with StormKafkaClient > 1.1.x > > >> > > latest, we get OutOfMemoryError after ~2hours of running our > simple > > >> test > > >> > > topology. > > >> > > > > >> > > We reproduce it everytime, so we decided to generate a heap dump > > >> before > > >> > > the OutOfMemoryError, and viewed the result using EclipseMAT. > > >> > > > > >> > > The results tends to show that there's a memory leak in > > >> KafkaSpoutClient: > > >> > > ============================================================ > > ========= > > >> > > > > >> > > One instance of *"org.apache.storm.kafka.spout.KafkaSpout"* > loaded > > by > > >> > *"sun.misc.Launcher$AppClassLoader > > >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. The memory > > is > > >> > > accumulated in one instance of *"java.util.HashMap$Node[]"* loaded > > by > > >> > *"<system > > >> > > class loader>"*. > > >> > > > > >> > > *Keywords* > > >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98 > > >> > > java.util.HashMap$Node[] > > >> > > org.apache.storm.kafka.spout.KafkaSpout > > >> > > > > >> > > ============================================================ > > ========= > > >> > > > > >> > > See attached screenshots of EclipseMAT session showing graphical > > >> > > representation of memory usage > > >> > > > > >> > > FYI we tried to follow instructions from > > >> https://docs.hortonworks.com/ > > >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/ > > >> > > content/storm-kafkaspout-perf.html to avoid the use of too much > > >> memory, > > >> > > but still after 2 hours the memory fills up and the process > hosting > > >> our > > >> > > spout is killed by Supervisor... > > >> > > > > >> > > Any clue of what we may have missed? > > >> > > > > >> > > Best regards, > > >> > > Alexandre Vermeerbergen > > >> > > > > >> > > > > >> > > > > >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre > > Vermeerbergen > > >> < > > >> > > [email protected]>: > > >> > > > > >> > >> Oops, sent my last mail too fast, let me continue it: > > >> > >> > > >> > >> Hello, > > >> > >> > > >> > >> Coming back to my original post in this list, we have 3 issues > with > > >> > >> latest 1.1.x StormKafkaClient spout with our setup: > > >> > >> > > >> > >> Issue#1: > > >> > >> Initial lag (which we hadn't using the classic Storm Kafka > spout) > > >> > >> For this issue, my understanding of Kristopher's answer is > that > > >> this > > >> > >> is "by design" of the StormKafkaClient spout, which instances > > >> > progressively > > >> > >> joins Kafka consumers group, which causes consumers rebalancing. > > This > > >> > >> rebalancing is "slow", which means that until all spout instances > > are > > >> > >> started, the topology starts with an "initial Kafka Lag" > > >> > >> => Is my understanding correct? > > >> > >> => Why don't we have such behavior with the old Storm Kafka > > spout > > >> ? > > >> > >> => Is this annoying initial lag tracked by a JIRA ? > > >> > >> > > >> > >> Issue#2: > > >> > >> The kafka Lag is increasing constantly and this leads to the > > >> > overload > > >> > >> of the storm worker running the kafka spout. At the end, the > worker > > >> > crashes > > >> > >> and it is automatically restarted by Storm. > > >> > >> => This is unlike what we observe with the old Storm Kafka > > spout > > >> > >> => What is the recommended way to analyze this issue? > > >> > >> > > >> > >> Issue3: > > >> > >> With the new Kafka Spout, we have faced this exception many > > times: > > >> > >> > > >> > >> org.apache.kafka.clients.consumer.CommitFailedException: Commit > > >> cannot > > >> > >> be completed since the group has already rebalanced and assigned > > the > > >> > >> partitions to another member. This means that the time between > > >> > subsequent > > >> > >> calls to poll() was longer than the configured > > max.poll.interval.ms, > > >> > >> which typically implies that the poll loop is spending too much > > time > > >> > >> message processing. You can address this either by increasing the > > >> > session > > >> > >> timeout or by reducing the maximum size of batches returned in > > poll() > > >> > with > > >> > >> max.poll.records. at org.apache.kafka.clients.consu > > >> > >> > > >> > mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co > > >> nsumerCoordinator.java:702) > > >> > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > > >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at > > >> > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > >> KafkaConsumer.java:1124) > > >> > >> at > > >> > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke > > >> dTuples(KafkaSpout.java:384) > > >> > >> at > > >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple( > > KafkaSpout.java:220) > > >> > >> at > > >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ > > >> 10826.invoke(executor.clj:646) > > >> > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) > > at > > >> > >> clojure.lang.AFn.run(AFn.java:22) at > > java.lang.Thread.run(Thread.ja > > >> > >> va:748) > > >> > >> > > >> > >> > > >> > >> => Are we the only ones experiencing such issues with Storm > > >> > 1.1.0/1.1.x > > >> > >> latest ? > > >> > >> > > >> > >> Note: We are considering writing our own Kafka Spout, as we're > > >> > time-bound > > >> > >> to move to Kafka 0.10.x consumers & producers (to prepare our > next > > >> step > > >> > >> with Kafka security, which isn't available with Kafka 0.9.x). We > > will > > >> > miss > > >> > >> the integration of Kafka lag in StormUI, but currently we do not > > >> > understand > > >> > >> how to solve the regressions we observe with latest Storm Kafka > > >> client > > >> > >> spout. > > >> > >> > > >> > >> Are there other Storm developers/users who jumped into this > > >> alternative? > > >> > >> > > >> > >> Best regards, > > >> > >> > > >> > >> Alexandre Vermeerbergen > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre > > >> Vermeerbergen < > > >> > >> [email protected]>: > > >> > >> > > >> > >>> Hello, > > >> > >>> > > >> > >>> Coming back to my original post in this list, we have two issues > > >> with > > >> > >>> latest 1.1.x StormKafkaClient spout with our setup: > > >> > >>> > > >> > >>> Issue#1: > > >> > >>> Initial lag (which we hadn't using the classic Storm Kafka > spout) > > >> > >>> For this issue, my understanding of Kristopher's answer is > that > > >> this > > >> > >>> is "by design" of the StormKafkaClient spout, which instances > > >> > progressively > > >> > >>> joins Kafka consumers group, which causes consumers rebalancing. > > >> This > > >> > >>> rebalancing is "slow", which means that until all spout > instances > > >> are > > >> > >>> started, the topology starts with an "initial Kafka Lag" > > >> > >>> => Is my understanding correct? > > >> > >>> => Why don't we have such behavior with the old Storm Kafka > > >> spout ? > > >> > >>> => Is this annoying initial lag tracked by a JIRA ? > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen < > > >> > >>> [email protected]>: > > >> > >>> > > >> > >>>> Hello Kristopher, > > >> > >>>> > > >> > >>>> We built Storm 1.1.1-latest using yesterday's (2017-06-26 > > >> > >>>> <20%2017%2006%2026>) artifacts downloaded from > > >> > >>>> https://github.com/apache/storm/tree/1.x-branch. > > >> > >>>> <https://github.com/apache/storm/tree/1.x-branch> > > >> > >>>> > > >> > >>>> Is your latest PR supposed to be in what we downloaded & built, > > or > > >> do > > >> > >>>> we need to upgrade in some way (which?) > > >> > >>>> > > >> > >>>> Should we change anything to our settings? > > >> > >>>> > > >> > >>>> Please note that I mistakenly wrote that our "Kafka consumer > > >> strategy > > >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy is set > to > > >> > LATEST", > > >> > >>>> if that matters. > > >> > >>>> > > >> > >>>> Best regards, > > >> > >>>> Alexandre Vermeerbergen > > >> > >>>> > > >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane < > [email protected] > > >: > > >> > >>>> > > >> > >>>>> Correction: https://github.com/apache/storm/pull/2174 has all > > of > > >> > what > > >> > >>>>> I was > > >> > >>>>> doing and more. > > >> > >>>>> > > >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane < > > >> > [email protected] > > >> > >>>>> > > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> > Alexandre, > > >> > >>>>> > > > >> > >>>>> > There are quite a few JIRAs and discussions around this > > >> recently. > > >> > >>>>> The > > >> > >>>>> > default behavior for storm-kafka-client is the 'subscribe' > API > > >> > which > > >> > >>>>> causes > > >> > >>>>> > the immediate lag you see since rebalance will happen > > spout(n)-1 > > >> > >>>>> times > > >> > >>>>> > just from the spouts spinning up. > > >> > >>>>> > > > >> > >>>>> > There is a Builder for ManualPartitionNamedSubscription and > > the > > >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka > > >> consumer > > >> > >>>>> API) but > > >> > >>>>> > they don't work at all. I hope to have a PR in today > > >> > >>>>> > to fix these on 1.x-branch > > >> > >>>>> > > > >> > >>>>> > The other JIRAs I mentioned are for a redesign of this spout > > or > > >> > >>>>> other more > > >> > >>>>> > drastic changes. My goal is a bug fix for a version of the > > >> spout > > >> > >>>>> that > > >> > >>>>> > doesn't provide unnecessary duplicates. > > >> > >>>>> > > > >> > >>>>> > Kris > > >> > >>>>> > > > >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < > > >> > >>>>> > [email protected]> wrote: > > >> > >>>>> > > > >> > >>>>> >> Hello All, > > >> > >>>>> >> > > >> > >>>>> >> We have been running for a while our real-time supervision > > >> > >>>>> application > > >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old > > >> consumer: > > >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster based on > > Apache > > >> > Kafka > > >> > >>>>> >> 0.10.1.0. > > >> > >>>>> >> > > >> > >>>>> >> Backpressure is activated with default parameters. > > >> > >>>>> >> > > >> > >>>>> >> Key > > >> > >>>>> >> Value > > >> > >>>>> >> > > >> > >>>>> >> backpressure.disruptor.high.watermark 0.9 > > >> > >>>>> >> > > >> > >>>>> >> backpressure.disruptor.low.watermark 0.4 > > >> > >>>>> >> > > >> > >>>>> >> task.backpressure.poll.secs > > >> 30 > > >> > >>>>> >> > > >> > >>>>> >> topology.backpressure.enable > > true > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from > > the > > >> > new > > >> > >>>>> Kafka > > >> > >>>>> >> Spout (storm-kafka-client lib) with a consumer which has > no > > >> more > > >> > >>>>> >> dependency on Zookeeper. > > >> > >>>>> >> > > >> > >>>>> >> After upgrade, we had several issues with kafka > consumption. > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> We saw that several JIRAs were opened and resolved on > Apache > > >> Storm > > >> > >>>>> 1.1.1. > > >> > >>>>> >> > > >> > >>>>> >> So we decided to upgrade to the latest available Apache > Storm > > >> > 1.1.x > > >> > >>>>> code > > >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) but we > > >> still > > >> > >>>>> have issues : > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> 1. The kafka Lag is increasing constantly and this leads to > > the > > >> > >>>>> overload > > >> > >>>>> >> of > > >> > >>>>> >> the storm worker running the kafka spout. At the end, the > > >> worker > > >> > >>>>> crashes > > >> > >>>>> >> and it is automatically restarted by Storm. > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> With old kafka spout version, we had a lag most of the > times > > >> > bellow > > >> > >>>>> 10000. > > >> > >>>>> >> > > >> > >>>>> >> With the new one, we are starting with Kafka lag about > 30000 > > >> and > > >> > >>>>> >> increasing > > >> > >>>>> >> until crash. > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> 2. With the new Kafka Spout, we have faced this exception > > many > > >> > >>>>> times: > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> org.apache.kafka.clients.consumer.CommitFailedException: > > >> Commit > > >> > >>>>> cannot be > > >> > >>>>> >> completed since the group has already rebalanced and > assigned > > >> the > > >> > >>>>> >> partitions to another member. This means that the time > > between > > >> > >>>>> subsequent > > >> > >>>>> >> calls to poll() was longer than the configured > > >> > max.poll.interval.ms > > >> > >>>>> , > > >> > >>>>> >> which > > >> > >>>>> >> typically implies that the poll loop is spending too much > > time > > >> > >>>>> message > > >> > >>>>> >> processing. You can address this either by increasing the > > >> session > > >> > >>>>> timeout > > >> > >>>>> >> or by reducing the maximum size of batches returned in > poll() > > >> with > > >> > >>>>> >> max.poll.records. at > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals. > ConsumerCoordina > > >> > >>>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702) > > >> > >>>>> >> at > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals. > ConsumerCoordina > > >> > >>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) > > >> > >>>>> >> at > > >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync( > > >> > >>>>> >> KafkaConsumer.java:1124) > > >> > >>>>> >> at > > >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout. > commitOffsetsForAcke > > >> > >>>>> >> dTuples(KafkaSpout.java:384) > > >> > >>>>> >> at org.apache.storm.kafka.spout.K > > >> afkaSpout.nextTuple(KafkaSpout > > >> > >>>>> .java:220) > > >> > >>>>> >> at > > >> > >>>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ > > >> > >>>>> >> 10826.invoke(executor.clj:646) > > >> > >>>>> >> at org.apache.storm.util$async_lo > > >> op$fn__555.invoke(util.clj:484) > > >> > at > > >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at > > >> > java.lang.Thread.run(Thread.ja > > >> > >>>>> >> va:748) > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> We are using the following configuration for Kafka Spout : > > >> > >>>>> >> > > >> > >>>>> >> poll.timeout.ms 200. > > >> > >>>>> >> > > >> > >>>>> >> offset.commit.period.ms 30000 (30 seconds). > > >> > >>>>> >> > > >> > >>>>> >> max.uncommitted.offsets 10000000 (ten million). > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We also set > the > > >> > >>>>> following > > >> > >>>>> >> Kafka consumer parameter : > > >> > >>>>> >> > > >> > >>>>> >> session.timeout.ms 120000 > > >> > >>>>> >> > > >> > >>>>> >> Are there any traces/options which we could turn on on > Storm > > >> or on > > >> > >>>>> Kafka > > >> > >>>>> >> Broker that might help understanding how to stabilize our > > >> > >>>>> topologies with > > >> > >>>>> >> this new branch? > > >> > >>>>> >> > > >> > >>>>> >> Thanks! > > >> > >>>>> >> > > >> > >>>>> >> Alexandre Vermeerbergen > > >> > >>>>> >> > > >> > >>>>> > > > >> > >>>>> > > > >> > >>>>> > > >> > >>>> > > >> > >>>> > > >> > >>> > > >> > >> > > >> > > > > >> > > > >> > > > > > > > > >
