Hello Stig, Thank you very much for trying. I noticed you created this JIRA for tracking: https://issues.apache.org/jira/browse/STORM-2648 and I have added a watch on it, so I will follow-up with this JIRA.
Best regards, Alexandre 2017-07-20 12:07 GMT+02:00 Stig Rohde Døssing <[email protected]>: > Okay, I think this might be fixable then. > > I think storm-kafka-client in autocommit mode should also raise "fails" > > whenever it has Kafka consumption issues. Is it the case with current > > implementation? > > Acking and failing tuples only happens once a tuple has been emitted from > the spout, so the spout doesn't have the behavior you describe. If the > spout is having problems acquiring messages internally, it will not be > reflected in acks/fails. If the problem the spout is having causes an > exception, you should be able to see this in Storm UI. If the problem is > some sort of delay or hanging, you should be able to see this by the Kafka > lag, or the spout's emit rate dropping. > > In order to support having acks/fails and complete latency when using auto > commit, we need to make the spout always emit tuples with anchors. Since > the spout doesn't care about acks or fails when auto commit is enabled, we > should make the spout ack/fail methods return immediately if auto commit is > enabled. > > I think we don't lose much by doing it this way. Users that want the > statistics in Storm UI can enable auto commit and leave topology ackers at > some non-zero number. Users that don't care and don't want the overhead of > having Storm track the tuples can set topology ackers to 0, which should > make the spout behave a lot like it does now. The only use case I can think > of where this won't work is if someone with multiple spouts in a topology > needs one to be auto commit and one to be at-least-once, and they can't > live with the overhead of tuple tracking for the auto commit spout. If this > is a real use case, it can be worked around with an extra configuration > parameter to switch whether tuples are emitted unanchored, but I'd rather > keep it simple for now. > > If we want to do this fix I don't think we need to break the API, so it > could probably go in 1.1.1. I've only given it a cursory look though, so > take that statement with a grain of salt :) > > 2017-07-20 9:30 GMT+02:00 Alexandre Vermeerbergen < > [email protected]> > : > > > Hello Stig, > > > > Thank you very much for your detailed answer. > > > > Yes, we get the same behavior as yours in Storm UI using > storm-kafka-client > > in autocommit mode : the count of emitted & transferred tuples is > > available, but we have 0 acks, 0 failed and "complete latency" remains at > > 0. > > > > And YES, we need the complete latency to be functional while using > > storm-kafka-client with autocommit. > > > > I think storm-kafka-client in autocommit mode should also raise "fails" > > whenever it has Kafka consumption issues. Is it the case with current > > implementation? > > > > Last, would it be possible to have complete latency for > storm-kafka-client > > spouts in autocommit mode for Storm 1.1.1 release ? > > > > Best regards, > > Alexandre Vermeerbergen > > > > > > 2017-07-19 15:55 GMT+02:00 Stig Rohde Døssing <[email protected]>: > > > > > Hi Alexandre, > > > > > > Happy to hear that the spout is working better for you. I think that if > > you > > > don't need at-least-once, then using auto commit is a good option. > > > > > > When you enable auto commit, the spout emits tuples unanchored (see the > > > calls to collector.emit here > > > https://github.com/apache/storm/blob/master/external/ > > > storm-kafka-client/src/main/java/org/apache/storm/kafka/ > > > spout/KafkaSpout.java#L346). > > > This causes Storm to not bother tracking acks and fails for the emitted > > > tuples, which means the counters in Storm UI for acks, fails and > complete > > > latency are zeroed out. The complete latency doesn't make sense to > track > > if > > > the tuples are not acked. The rest of the statistics should still be > > there. > > > I think we implemented it this way from the reasoning that it doesn't > > make > > > sense to make Storm track and ack/fail tuples if the spout is > configured > > to > > > not care about failures. > > > > > > I tried out a small test topology on a local 2.0.0 Storm setup using > auto > > > commit, and here's what I'm seeing: http://imgur.com/a/CoJBa. As you > can > > > see the execute latency and emit/transfer counters still work. Is this > > > different from what you're experiencing? > > > > > > If you need the complete latency to be functional while using auto > > commit, > > > we'll need to update the spout so it emits the tuples with anchors, and > > > then ignores the acks/fails. > > > > > > 2017-07-19 10:12 GMT+02:00 Alexandre Vermeerbergen < > > > [email protected] > > > >: > > > > > > > Hello Stig, > > > > > > > > Thank you very much for your detailed explanations about > > > storm-kafka-client > > > > behavior, it helps us a lot. > > > > > > > > We have topologies for which at-least-one matters, so we're OK to try > > to > > > > converge them with this default behavior of storm-kafka-client. > > > > > > > > However, our main topology is our "realtime alerting" one. It > evaluates > > > > incoming "metrics" (our system is a Supervision system) against > > > "triggers" > > > > (same meaning as in Zabbix), and here we prefer to drop anything that > > we > > > > couldn't evaluate quickly enough. Indeed, to do "real-time alerting", > > we > > > > know that if we skip a few metrics, then, shortly after, there will > be > > > > newer ones that will suffice to get a "near real time enough" view of > > the > > > > "current state of our supervised devices". > > > > > > > > This is the reason why we recently turned on "autocommit" in > > > > storm-kafka-client, to overcome the growing "lag" or OOM we > eventually > > > got. > > > > > > > > Now our topology works like a charm. > > > > > > > > Except that with autocommit turned on, no acks are sent from > > > > storm-kafka-client spouts, so Storm UI isn't displaying any > statistics > > > > information about our topology, which is a big annoyance for us. > > > > > > > > Question: > > > > * Is our approach to use autocommit OK, or given our scenario, do you > > > > recommend othewise? > > > > * If it's OK to use autocommit, then how can we configure the > > > > storm-kafka-client spout to "blindly" ACK all tuples it sends? > > > > > > > > Best regards, > > > > Alexandre Vermeerbergen > > > > > > > > > > > > > > > > > > > > 2017-07-05 17:22 GMT+02:00 Stig Døssing <[email protected]>: > > > > > > > > > Regarding the offsetManagers map, I think the only thing it > contains > > > that > > > > > should take up any space is uncommitted offsets. Those should be > > > cleared > > > > > out when the spout commits offsets to Kafka. There have been at > > least a > > > > few > > > > > issues related to that recently, e.g. > > > > > https://github.com/apache/storm/pull/2153, which may be what > you're > > > > > running > > > > > into. The spout should be able to limit how many uncommitted > offsets > > it > > > > > will accept before slowing down, but this functionality is still > > buggy > > > in > > > > > some cases (see https://github.com/apache/storm/pull/2156, > > > > > https://github.com/apache/storm/pull/2152). > > > > > > > > > > The storm-kafka-client spout doesn't share code with storm-kafka, > > it's > > > an > > > > > implementation based on a different Kafka client (this one > > > > > https://kafka.apache.org/0100/javadoc/index.html?org/apache/ > > > > > kafka/clients/consumer/KafkaConsumer.html), > > > > > where the storm-kafka client used the now deprecated > > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8. > > > > > 0+SimpleConsumer+Example. > > > > > There are still some bugs on the new spout we're working out. > > > > > > > > > > Auto commit is not enabled in storm-kafka, because the > SimpleConsumer > > > > > doesn't have that concept. The old spout kept track of processed > > > offsets > > > > by > > > > > storing how far it had read in Zookeeper. The new client allows us > to > > > > mark > > > > > how far the spout has read in Kafka, which can either be done via > the > > > > > commit methods on the KafkaConsumer (auto commit disabled), or > > > > periodically > > > > > by a background thread in the consumer (auto commit enabled). > > Normally > > > > when > > > > > the spout emits a message into the topology, we want to only mark > > that > > > > > message as "done" once Storm tells the spout to ack the tuple. This > > is > > > > the > > > > > behavior when auto commit is disabled; the spout emits some tuples, > > > > > receives some acks, and periodically uses the commitSync method to > > mark > > > > the > > > > > acked tuples as "done" in Kafka. If auto commit is enabled, the > > > > > KafkaConsumer will instead periodically commit the tuples it has > > > returned > > > > > from polls. The effect is that Storm can't control when an offset > is > > > > > considered "done", which means that some messages can be lost. This > > is > > > > fine > > > > > for some use cases where you don't necessarily need to process all > > > > > messages. > > > > > > > > > > Leaving auto commit disabled is the closest match to how the > > > storm-kafka > > > > > spout handled messages. We disable auto commit by default in the > new > > > > spout > > > > > because at-least-once processing guarantees should be the default, > > not > > > > > something people have to opt in to. > > > > > > > > > > 2017-07-05 15:22 GMT+02:00 Alexandre Vermeerbergen < > > > > > [email protected] > > > > > >: > > > > > > > > > > > Hello, > > > > > > > > > > > > To continue on this thread: > > > > > > > > > > > > Since we are facing some OOMEs and Kafka lag peaks with our > > > topologies > > > > > > using Storm 1.1.0 and Kafka spouts (storm-kafka-client based on > > Kafka > > > > > > 0.10.2.0 libs), we decided to test a much simpler Storm topology. > > > > > > > > > > > > This test topology is very basic and is composed of : > > > > > > > > > > > > - one instance of Kafka spout consuming a topic composed of 4 > > > > partitions > > > > > > with a throughput of about 3000 messages per second and with > > message > > > > size > > > > > > of about 1500 bytes, the consumption strategy is set to "LATEST", > > > other > > > > > > Kafka Spout parameters are default ones > > > > > > > > > > > > - one basic bolt which is just counting received tuples and > logging > > > > > > statistics every minutes (min ,max, avg for number of tuples > > received > > > > per > > > > > > min) with acking every tuple > > > > > > > > > > > > Back pressure is activated with default watermark parameters. > > > > > > > > > > > > topology.message.timeout.secs is not changed (default value is > 30). > > > > > > > > > > > > Message are encrypted (using our own encryption method) > serialized > > > > java > > > > > > objects but we are just using the provided ByteArrayDeserializer > > for > > > > this > > > > > > test since we just want to count tuples without any custom > > > > > deserialization > > > > > > interference. > > > > > > > > > > > > This topology is running alone on a single VM (8 vCPU) with one > > > worker > > > > > > configured with 2 GB of RAM for Xmx JVM option. > > > > > > > > > > > > Please also note that our cluster of Kafka Brokers is based on > > Apache > > > > > Kafka > > > > > > 0.10.1.0 > > > > > > > > > > > > > > > > > > > > > > > > We are tracking the Kafka lag for the topic we are consuming. > > > > > > > > > > > > Kafka lag is 10000 at topology start but it decreased slowly to > > 2000 > > > > > after > > > > > > 24h of execution. > > > > > > > > > > > > > > > > > > > > > > > > We did not face any OOM error this time so we are suspecting that > > the > > > > > > previous OOMEs were due to some lag in the much more complex > > topology > > > > we > > > > > > were running and due to Storm backpressure slowing down the Spout > > > > > > consumption. > > > > > > > > > > > > In fact, it seems that the following Map in the > > > > > > org.apache.storm.kafka.spout.KafkaSpout class was the root cause > > of > > > > the > > > > > > errors : > > > > > > > > > > > > ============================================================ > > > > > > ========================================= > > > > > > > > > > > > // Tuples that were successfully acked/emitted. These tuples will > > be > > > > > > committed periodically when the commit timer expires, //or after > a > > > > > consumer > > > > > > rebalance, or during close/deactivate private transient > > > > > Map<TopicPartition, > > > > > > OffsetManager> offsetManagers; > > > > > > ============================================================ > > > > > > ========================================= > > > > > > > > > > > > > > > > > > > > > > > > Please note however that these errors did not occur with the > "old" > > > > > version > > > > > > of Kafka spout (storm-kafka lib). > > > > > > > > > > > > We ran another test with the same test topology but with > > > > > enable.auto.commit > > > > > > set to true in Kafka consumer properties, lag was starting at > about > > > > 2000 > > > > > > and staying constant for at least 24h. > > > > > > > > > > > > Our understanding is that auto commit is enabled by default in > > Kafka > > > > > > consumers and also in old version of Kafka spouts (storm-kafka) > but > > > it > > > > is > > > > > > disabled in the new Kafka spout version (storm-kafka-client) to > > > manage > > > > > > tuple acking, is it correct please ? > > > > > > > > > > > > What are the consequences of activating auto commit for > consumers > > > with > > > > > the > > > > > > new Kafka Spout then ? > > > > > > > > > > > > Thank you for your feedbacks, > > > > > > > > > > > > Alexandre Vermeerbergen > > > > > > > > > > > > 2017-06-29 12:09 GMT+02:00 Alexandre Vermeerbergen < > > > > > > [email protected] > > > > > > >: > > > > > > > > > > > > > Hello All, > > > > > > > > > > > > > > Thank you very much for your feedbacks on our experience with > > > > > > > storm-kafka-client, we're going to take your suggestions into > > > account > > > > > to > > > > > > > dig into our issues and we'll feedback as soon as we have more > to > > > > > share. > > > > > > > > > > > > > > Best regards, > > > > > > > Alexandre Vermeerbergen > > > > > > > > > > > > > > 2017-06-29 1:46 GMT+02:00 Jungtaek Lim <[email protected]>: > > > > > > > > > > > > > >> Hi Alexandre, > > > > > > >> > > > > > > >> I don't know much of storm-kafka-client, but at a glimpse, I > > can't > > > > > find > > > > > > >> misuse of HashMap in KafkaSpout so I'd rather suspect that > > > > > OffsetManager > > > > > > >> being really huge. If you are willing to dig more on > KafkaSpout > > > OOME > > > > > > >> issue, > > > > > > >> you can get more information of KafkaSpout for tracking with > > > > changing > > > > > > log > > > > > > >> level to DEBUG or even TRACE. > > > > > > >> > > > > > > >> Thanks, > > > > > > >> Jungtaek Lim (HeartSaVioR) > > > > > > >> > > > > > > >> 2017년 6월 29일 (목) 오전 4:58, Stig Døssing < > > [email protected] > > > >님이 > > > > > 작성: > > > > > > >> > > > > > > >> > Hi Alexandre, > > > > > > >> > > > > > > > >> > About issue 1: > > > > > > >> > This issue is not by design. It is a side effect of the > spout > > > > > > internally > > > > > > >> > using the KafkaConsumer's subscribe API instead of the > assign > > > API. > > > > > > >> Support > > > > > > >> > for using the assign API was added a while back, but has a > bug > > > > that > > > > > is > > > > > > >> > preventing the spout from starting when configured to use > that > > > > API. > > > > > We > > > > > > >> are > > > > > > >> > working on fixing the issues with that implementation in > these > > > PRs > > > > > > >> > https://github.com/apache/storm/pull/2150, > > > > > https://github.com/apache/ > > > > > > >> > storm/pull/2174 <https://github.com/apache/storm/pull/2174 > >. > > I > > > > > think > > > > > > it > > > > > > >> > is very likely that we will remove support for > > > > > > >> > the subscribe API at some point as well, making the assign > API > > > the > > > > > > >> default, > > > > > > >> > since several users have had issues with the subscribe API's > > > > > behavior. > > > > > > >> > > > > > > > >> > Once the assign API support is fixed, you can switch to > using > > it > > > > via > > > > > > >> this > > > > > > >> > KafkaSpoutConfig Builder constructor > > https://github.com/apache/ > > > > > > >> > storm/blob/master/external/storm-kafka-client/src/main/ > > > > > > >> > java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java# > L136 > > > and > > > > > this > > > > > > >> > Subscription implementation https://github.com/srdo/storm/ > > blob/ > > > > > > >> > f524868baa5929f29b69258f0da2948d0f6e152b/external/storm- > > > > > > >> > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > > > > > >> > ManualPartitionSubscription.java > > > > > > >> > > > > > > > >> > If you'd like to try out the code from the PR branch, you > can > > > > check > > > > > it > > > > > > >> out > > > > > > >> > with some of the steps described here > > > > > > >> > > > > > > > >> > https://help.github.com/articles/checking-out-pull- > requests- > > > > > > >> locally/#modifying-an-inactive-pull-request-locally > > > > > > >> > . > > > > > > >> > > > > > > > >> > Note that the PRs for fixing the manual partitioning option > > are > > > > > > against > > > > > > >> the > > > > > > >> > master branch right now, which is targeting Storm version > > 2.0.0, > > > > > but I > > > > > > >> > believe you may be able to use the 2.0.0 storm-kafka-client > > jar > > > > > with a > > > > > > >> > 1.1.0 cluster. > > > > > > >> > > > > > > > >> > Switching to the assign API should solve remove the > > instability > > > as > > > > > the > > > > > > >> > spouts start. > > > > > > >> > > > > > > > >> > About issue 2: > > > > > > >> > The spout lag shouldn't have an effect on memory use. I'm > > > > wondering > > > > > if > > > > > > >> your > > > > > > >> > spout instances are not progressing at all, which might > > explain > > > > the > > > > > > lag? > > > > > > >> > You should be able to check this using the > > > > kafka-consumer-groups.sh > > > > > > >> script > > > > > > >> > in the Kafka /bin directory. Once you've started the spout, > > you > > > > can > > > > > > use > > > > > > >> the > > > > > > >> > script to inspect which offsets the consumer group have > > > committed. > > > > > Try > > > > > > >> > checking if the offsets are moving once the spouts have > > started > > > > > > running. > > > > > > >> > > > > > > > >> > I can't spout any suspicious use of HashMap in the 1.x > > > KafkaSpout. > > > > > > Your > > > > > > >> > attachment didn't make it through, could you post it > > somewhere? > > > > > > >> > > > > > > > >> > About issue 3: > > > > > > >> > The CommitException you are getting is likely because we use > > the > > > > > > >> subscribe > > > > > > >> > API. When using the subscribe API Kafka is in control of > > > partition > > > > > > >> > assigment, and will reassign partitions if a consumer > doesn't > > > call > > > > > > poll > > > > > > >> on > > > > > > >> > the consumer often enough. The default is that the consumer > > must > > > > be > > > > > > >> polled > > > > > > >> > at least every 5 minutes. See max.poll.interval.ms in the > > Kafka > > > > > > >> consumer > > > > > > >> > configuration. The assign API doesn't require this, and > won't > > > shut > > > > > > down > > > > > > >> > spouts because they are too slow. > > > > > > >> > > > > > > > >> > There's likely still another underlying issue, because it > > seems > > > > > > strange > > > > > > >> > that your spout instances are not polling at least once per > 5 > > > > > minutes, > > > > > > >> at > > > > > > >> > least if you didn't set a high max.poll.records. It's almost > > > > > certainly > > > > > > >> > related to issue 2. Do you have acking enabled, and what is > > your > > > > > > >> > topology.message.timeout.secs? > > > > > > >> > > > > > > > >> > I'm not sure I understand why you would want to write your > own > > > > spout > > > > > > >> from > > > > > > >> > scratch, rather than contributing fixes to the > > > storm-kafka-client > > > > > > >> spout. It > > > > > > >> > seems likely to be more effort than fixing the problems with > > > > > > >> > storm-kafka-client. > > > > > > >> > > > > > > > >> > 2017-06-28 14:25 GMT+02:00 Alexandre Vermeerbergen < > > > > > > >> > [email protected] > > > > > > >> > >: > > > > > > >> > > > > > > > >> > > More about this thread: we noticed that with > > StormKafkaClient > > > > > 1.1.x > > > > > > >> > > latest, we get OutOfMemoryError after ~2hours of running > our > > > > > simple > > > > > > >> test > > > > > > >> > > topology. > > > > > > >> > > > > > > > > >> > > We reproduce it everytime, so we decided to generate a > heap > > > dump > > > > > > >> before > > > > > > >> > > the OutOfMemoryError, and viewed the result using > > EclipseMAT. > > > > > > >> > > > > > > > > >> > > The results tends to show that there's a memory leak in > > > > > > >> KafkaSpoutClient: > > > > > > >> > > ============================== > > ============================== > > > > > > ========= > > > > > > >> > > > > > > > > >> > > One instance of *"org.apache.storm.kafka. > spout.KafkaSpout"* > > > > > loaded > > > > > > by > > > > > > >> > *"sun.misc.Launcher$AppClassLoader > > > > > > >> > > @ 0x80023d98"* occupies *1,662,509,664 (93.87%)* bytes. > The > > > > memory > > > > > > is > > > > > > >> > > accumulated in one instance of > *"java.util.HashMap$Node[]"* > > > > loaded > > > > > > by > > > > > > >> > *"<system > > > > > > >> > > class loader>"*. > > > > > > >> > > > > > > > > >> > > *Keywords* > > > > > > >> > > sun.misc.Launcher$AppClassLoader @ 0x80023d98 > > > > > > >> > > java.util.HashMap$Node[] > > > > > > >> > > org.apache.storm.kafka.spout.KafkaSpout > > > > > > >> > > > > > > > > >> > > ============================== > > ============================== > > > > > > ========= > > > > > > >> > > > > > > > > >> > > See attached screenshots of EclipseMAT session showing > > > graphical > > > > > > >> > > representation of memory usage > > > > > > >> > > > > > > > > >> > > FYI we tried to follow instructions from > > > > > > >> https://docs.hortonworks.com/ > > > > > > >> > > HDPDocuments/HDP2/HDP-2.5.5/bk_storm-component-guide/ > > > > > > >> > > content/storm-kafkaspout-perf.html to avoid the use of > too > > > much > > > > > > >> memory, > > > > > > >> > > but still after 2 hours the memory fills up and the > process > > > > > hosting > > > > > > >> our > > > > > > >> > > spout is killed by Supervisor... > > > > > > >> > > > > > > > > >> > > Any clue of what we may have missed? > > > > > > >> > > > > > > > > >> > > Best regards, > > > > > > >> > > Alexandre Vermeerbergen > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > 2017-06-28 <20%2017%2006%2028> 9:17 GMT+02:00 Alexandre > > > > > > Vermeerbergen > > > > > > >> < > > > > > > >> > > [email protected]>: > > > > > > >> > > > > > > > > >> > >> Oops, sent my last mail too fast, let me continue it: > > > > > > >> > >> > > > > > > >> > >> Hello, > > > > > > >> > >> > > > > > > >> > >> Coming back to my original post in this list, we have 3 > > > issues > > > > > with > > > > > > >> > >> latest 1.1.x StormKafkaClient spout with our setup: > > > > > > >> > >> > > > > > > >> > >> Issue#1: > > > > > > >> > >> Initial lag (which we hadn't using the classic Storm > Kafka > > > > > spout) > > > > > > >> > >> For this issue, my understanding of Kristopher's > answer > > is > > > > > that > > > > > > >> this > > > > > > >> > >> is "by design" of the StormKafkaClient spout, which > > instances > > > > > > >> > progressively > > > > > > >> > >> joins Kafka consumers group, which causes consumers > > > > rebalancing. > > > > > > This > > > > > > >> > >> rebalancing is "slow", which means that until all spout > > > > instances > > > > > > are > > > > > > >> > >> started, the topology starts with an "initial Kafka Lag" > > > > > > >> > >> => Is my understanding correct? > > > > > > >> > >> => Why don't we have such behavior with the old Storm > > > Kafka > > > > > > spout > > > > > > >> ? > > > > > > >> > >> => Is this annoying initial lag tracked by a JIRA ? > > > > > > >> > >> > > > > > > >> > >> Issue#2: > > > > > > >> > >> The kafka Lag is increasing constantly and this leads > > to > > > > the > > > > > > >> > overload > > > > > > >> > >> of the storm worker running the kafka spout. At the end, > > the > > > > > worker > > > > > > >> > crashes > > > > > > >> > >> and it is automatically restarted by Storm. > > > > > > >> > >> => This is unlike what we observe with the old Storm > > > Kafka > > > > > > spout > > > > > > >> > >> => What is the recommended way to analyze this issue? > > > > > > >> > >> > > > > > > >> > >> Issue3: > > > > > > >> > >> With the new Kafka Spout, we have faced this exception > > many > > > > > > times: > > > > > > >> > >> > > > > > > >> > >> org.apache.kafka.clients.consumer. > CommitFailedException: > > > > Commit > > > > > > >> cannot > > > > > > >> > >> be completed since the group has already rebalanced and > > > > assigned > > > > > > the > > > > > > >> > >> partitions to another member. This means that the time > > > between > > > > > > >> > subsequent > > > > > > >> > >> calls to poll() was longer than the configured > > > > > > max.poll.interval.ms, > > > > > > >> > >> which typically implies that the poll loop is spending > too > > > much > > > > > > time > > > > > > >> > >> message processing. You can address this either by > > increasing > > > > the > > > > > > >> > session > > > > > > >> > >> timeout or by reducing the maximum size of batches > returned > > > in > > > > > > poll() > > > > > > >> > with > > > > > > >> > >> max.poll.records. at org.apache.kafka.clients.consu > > > > > > >> > >> > > > > > > >> > mer.internals.ConsumerCoordinator. > sendOffsetCommitRequest(Co > > > > > > >> nsumerCoordinator.java:702) > > > > > > >> > >> at org.apache.kafka.clients.consumer.internals. > > > > ConsumerCoordina > > > > > > >> > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) at > > > > > > >> > >> > > > > > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > > > > > >> KafkaConsumer.java:1124) > > > > > > >> > >> at > > > > > > >> > org.apache.storm.kafka.spout.KafkaSpout. > commitOffsetsForAcke > > > > > > >> dTuples(KafkaSpout.java:384) > > > > > > >> > >> at > > > > > > >> > org.apache.storm.kafka.spout.KafkaSpout.nextTuple( > > > > > > KafkaSpout.java:220) > > > > > > >> > >> at > > > > > > >> > org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ > > > > > > >> 10826.invoke(executor.clj:646) > > > > > > >> > >> at org.apache.storm.util$async_ > > loop$fn__555.invoke(util.clj: > > > > 484) > > > > > > at > > > > > > >> > >> clojure.lang.AFn.run(AFn.java:22) at > > > > > > java.lang.Thread.run(Thread.ja > > > > > > >> > >> va:748) > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> => Are we the only ones experiencing such issues with > > Storm > > > > > > >> > 1.1.0/1.1.x > > > > > > >> > >> latest ? > > > > > > >> > >> > > > > > > >> > >> Note: We are considering writing our own Kafka Spout, as > > > we're > > > > > > >> > time-bound > > > > > > >> > >> to move to Kafka 0.10.x consumers & producers (to prepare > > our > > > > > next > > > > > > >> step > > > > > > >> > >> with Kafka security, which isn't available with Kafka > > 0.9.x). > > > > We > > > > > > will > > > > > > >> > miss > > > > > > >> > >> the integration of Kafka lag in StormUI, but currently we > > do > > > > not > > > > > > >> > understand > > > > > > >> > >> how to solve the regressions we observe with latest Storm > > > Kafka > > > > > > >> client > > > > > > >> > >> spout. > > > > > > >> > >> > > > > > > >> > >> Are there other Storm developers/users who jumped into > this > > > > > > >> alternative? > > > > > > >> > >> > > > > > > >> > >> Best regards, > > > > > > >> > >> > > > > > > >> > >> Alexandre Vermeerbergen > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> 2017-06-28 <20%2017%2006%2028> 9:09 GMT+02:00 Alexandre > > > > > > >> Vermeerbergen < > > > > > > >> > >> [email protected]>: > > > > > > >> > >> > > > > > > >> > >>> Hello, > > > > > > >> > >>> > > > > > > >> > >>> Coming back to my original post in this list, we have > two > > > > issues > > > > > > >> with > > > > > > >> > >>> latest 1.1.x StormKafkaClient spout with our setup: > > > > > > >> > >>> > > > > > > >> > >>> Issue#1: > > > > > > >> > >>> Initial lag (which we hadn't using the classic Storm > > Kafka > > > > > spout) > > > > > > >> > >>> For this issue, my understanding of Kristopher's > answer > > > is > > > > > that > > > > > > >> this > > > > > > >> > >>> is "by design" of the StormKafkaClient spout, which > > > instances > > > > > > >> > progressively > > > > > > >> > >>> joins Kafka consumers group, which causes consumers > > > > rebalancing. > > > > > > >> This > > > > > > >> > >>> rebalancing is "slow", which means that until all spout > > > > > instances > > > > > > >> are > > > > > > >> > >>> started, the topology starts with an "initial Kafka Lag" > > > > > > >> > >>> => Is my understanding correct? > > > > > > >> > >>> => Why don't we have such behavior with the old Storm > > > Kafka > > > > > > >> spout ? > > > > > > >> > >>> => Is this annoying initial lag tracked by a JIRA ? > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen < > > > > > > >> > >>> [email protected]>: > > > > > > >> > >>> > > > > > > >> > >>>> Hello Kristopher, > > > > > > >> > >>>> > > > > > > >> > >>>> We built Storm 1.1.1-latest using yesterday's > > (2017-06-26 > > > > > > >> > >>>> <20%2017%2006%2026>) artifacts downloaded from > > > > > > >> > >>>> https://github.com/apache/storm/tree/1.x-branch. > > > > > > >> > >>>> <https://github.com/apache/storm/tree/1.x-branch> > > > > > > >> > >>>> > > > > > > >> > >>>> Is your latest PR supposed to be in what we downloaded > & > > > > built, > > > > > > or > > > > > > >> do > > > > > > >> > >>>> we need to upgrade in some way (which?) > > > > > > >> > >>>> > > > > > > >> > >>>> Should we change anything to our settings? > > > > > > >> > >>>> > > > > > > >> > >>>> Please note that I mistakenly wrote that our "Kafka > > > consumer > > > > > > >> strategy > > > > > > >> > >>>> is set to EARLY" whereas it's "Kafka consumer strategy > is > > > set > > > > > to > > > > > > >> > LATEST", > > > > > > >> > >>>> if that matters. > > > > > > >> > >>>> > > > > > > >> > >>>> Best regards, > > > > > > >> > >>>> Alexandre Vermeerbergen > > > > > > >> > >>>> > > > > > > >> > >>>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane < > > > > > [email protected] > > > > > > >: > > > > > > >> > >>>> > > > > > > >> > >>>>> Correction: https://github.com/apache/storm/pull/2174 > > has > > > > all > > > > > > of > > > > > > >> > what > > > > > > >> > >>>>> I was > > > > > > >> > >>>>> doing and more. > > > > > > >> > >>>>> > > > > > > >> > >>>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane < > > > > > > >> > [email protected] > > > > > > >> > >>>>> > > > > > > > >> > >>>>> wrote: > > > > > > >> > >>>>> > > > > > > >> > >>>>> > Alexandre, > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > There are quite a few JIRAs and discussions around > > this > > > > > > >> recently. > > > > > > >> > >>>>> The > > > > > > >> > >>>>> > default behavior for storm-kafka-client is the > > > 'subscribe' > > > > > API > > > > > > >> > which > > > > > > >> > >>>>> causes > > > > > > >> > >>>>> > the immediate lag you see since rebalance will > happen > > > > > > spout(n)-1 > > > > > > >> > >>>>> times > > > > > > >> > >>>>> > just from the spouts spinning up. > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > There is a Builder for > ManualPartitionNamedSubscripti > > on > > > > and > > > > > > the > > > > > > >> > >>>>> > RoundRobinManualPartitioner (which use the 'assign' > > > Kafka > > > > > > >> consumer > > > > > > >> > >>>>> API) but > > > > > > >> > >>>>> > they don't work at all. I hope to have a PR in > today > > > > > > >> > >>>>> > to fix these on 1.x-branch > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > The other JIRAs I mentioned are for a redesign of > this > > > > spout > > > > > > or > > > > > > >> > >>>>> other more > > > > > > >> > >>>>> > drastic changes. My goal is a bug fix for a version > > of > > > > the > > > > > > >> spout > > > > > > >> > >>>>> that > > > > > > >> > >>>>> > doesn't provide unnecessary duplicates. > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > Kris > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre > > > Vermeerbergen < > > > > > > >> > >>>>> > [email protected]> wrote: > > > > > > >> > >>>>> > > > > > > > >> > >>>>> >> Hello All, > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> We have been running for a while our real-time > > > > supervision > > > > > > >> > >>>>> application > > > > > > >> > >>>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts > > > (old > > > > > > >> consumer: > > > > > > >> > >>>>> >> storm-kafka) and with our Kafka Broker cluster > based > > on > > > > > > Apache > > > > > > >> > Kafka > > > > > > >> > >>>>> >> 0.10.1.0. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> Backpressure is activated with default parameters. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> Key > > > > > > >> > >>>>> >> Value > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> backpressure.disruptor.high.watermark > > 0.9 > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> backpressure.disruptor.low.watermark > > 0.4 > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> task.backpressure.poll.secs > > > > > > >> 30 > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> topology.backpressure.enable > > > > > > true > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> We decided to upgrade to Apache Storm 1.1.0 to > > benefit > > > > from > > > > > > the > > > > > > >> > new > > > > > > >> > >>>>> Kafka > > > > > > >> > >>>>> >> Spout (storm-kafka-client lib) with a consumer > which > > > has > > > > > no > > > > > > >> more > > > > > > >> > >>>>> >> dependency on Zookeeper. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> After upgrade, we had several issues with kafka > > > > > consumption. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> We saw that several JIRAs were opened and resolved > on > > > > > Apache > > > > > > >> Storm > > > > > > >> > >>>>> 1.1.1. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> So we decided to upgrade to the latest available > > Apache > > > > > Storm > > > > > > >> > 1.1.x > > > > > > >> > >>>>> code > > > > > > >> > >>>>> >> built from source (2017-06-26 <20%2017%2006%2026>) > > but > > > > we > > > > > > >> still > > > > > > >> > >>>>> have issues : > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> 1. The kafka Lag is increasing constantly and this > > > leads > > > > to > > > > > > the > > > > > > >> > >>>>> overload > > > > > > >> > >>>>> >> of > > > > > > >> > >>>>> >> the storm worker running the kafka spout. At the > end, > > > the > > > > > > >> worker > > > > > > >> > >>>>> crashes > > > > > > >> > >>>>> >> and it is automatically restarted by Storm. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> With old kafka spout version, we had a lag most of > > the > > > > > times > > > > > > >> > bellow > > > > > > >> > >>>>> 10000. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> With the new one, we are starting with Kafka lag > > about > > > > > 30000 > > > > > > >> and > > > > > > >> > >>>>> >> increasing > > > > > > >> > >>>>> >> until crash. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> 2. With the new Kafka Spout, we have faced this > > > exception > > > > > > many > > > > > > >> > >>>>> times: > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer. > > > CommitFailedException: > > > > > > >> Commit > > > > > > >> > >>>>> cannot be > > > > > > >> > >>>>> >> completed since the group has already rebalanced > and > > > > > assigned > > > > > > >> the > > > > > > >> > >>>>> >> partitions to another member. This means that the > > time > > > > > > between > > > > > > >> > >>>>> subsequent > > > > > > >> > >>>>> >> calls to poll() was longer than the configured > > > > > > >> > max.poll.interval.ms > > > > > > >> > >>>>> , > > > > > > >> > >>>>> >> which > > > > > > >> > >>>>> >> typically implies that the poll loop is spending > too > > > much > > > > > > time > > > > > > >> > >>>>> message > > > > > > >> > >>>>> >> processing. You can address this either by > increasing > > > the > > > > > > >> session > > > > > > >> > >>>>> timeout > > > > > > >> > >>>>> >> or by reducing the maximum size of batches returned > > in > > > > > poll() > > > > > > >> with > > > > > > >> > >>>>> >> max.poll.records. at > > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals. > > > > > ConsumerCoordina > > > > > > >> > >>>>> >> tor.sendOffsetCommitRequest( > > > > ConsumerCoordinator.java:702) > > > > > > >> > >>>>> >> at > > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.internals. > > > > > ConsumerCoordina > > > > > > >> > >>>>> >> tor.commitOffsetsSync( > ConsumerCoordinator.java:581) > > > > > > >> > >>>>> >> at > > > > > > >> > >>>>> >> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > > commitSync( > > > > > > >> > >>>>> >> KafkaConsumer.java:1124) > > > > > > >> > >>>>> >> at > > > > > > >> > >>>>> >> org.apache.storm.kafka.spout.KafkaSpout. > > > > > commitOffsetsForAcke > > > > > > >> > >>>>> >> dTuples(KafkaSpout.java:384) > > > > > > >> > >>>>> >> at org.apache.storm.kafka.spout.K > > > > > > >> afkaSpout.nextTuple(KafkaSpout > > > > > > >> > >>>>> .java:220) > > > > > > >> > >>>>> >> at > > > > > > >> > >>>>> >> org.apache.storm.daemon. > > executor$fn__10780$fn__10795$ > > > > fn__ > > > > > > >> > >>>>> >> 10826.invoke(executor.clj:646) > > > > > > >> > >>>>> >> at org.apache.storm.util$async_lo > > > > > > >> op$fn__555.invoke(util.clj:484) > > > > > > >> > at > > > > > > >> > >>>>> >> clojure.lang.AFn.run(AFn.java:22) at > > > > > > >> > java.lang.Thread.run(Thread.ja > > > > > > >> > >>>>> >> va:748) > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> We are using the following configuration for Kafka > > > Spout > > > > : > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> poll.timeout.ms 200. > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> offset.commit.period.ms 30000 (30 > > seconds). > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> max.uncommitted.offsets 10000000 (ten million). > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> The Kafka consumer strategy is set to EARLY. We > also > > > set > > > > > the > > > > > > >> > >>>>> following > > > > > > >> > >>>>> >> Kafka consumer parameter : > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> session.timeout.ms 120000 > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> Are there any traces/options which we could turn on > > on > > > > > Storm > > > > > > >> or on > > > > > > >> > >>>>> Kafka > > > > > > >> > >>>>> >> Broker that might help understanding how to > stabilize > > > our > > > > > > >> > >>>>> topologies with > > > > > > >> > >>>>> >> this new branch? > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> Thanks! > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> >> Alexandre Vermeerbergen > > > > > > >> > >>>>> >> > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> > > > > > > >> > >>>> > > > > > > >> > >>>> > > > > > > >> > >>> > > > > > > >> > >> > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
