Hello, Coming back to my original post in this list, we have two issues with latest 1.1.x StormKafkaClient spout with our setup:
Issue#1: Initial lag (which we hadn't using the classic Storm Kafka spout) For this issue, my understanding of Kristopher's answer is that this is "by design" of the StormKafkaClient spout, which instances progressively joins Kafka consumers group, which causes consumers rebalancing. This rebalancing is "slow", which means that until all spout instances are started, the topology starts with an "initial Kafka Lag" => Is my understanding correct? => Why don't we have such behavior with the old Storm Kafka spout ? => Is this annoying initial lag tracked by a JIRA ? 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <[email protected] >: > Hello Kristopher, > > We built Storm 1.1.1-latest using yesterday's (2017-06-26) artifacts > downloaded from https://github.com/apache/storm/tree/1.x-branch. > <https://github.com/apache/storm/tree/1.x-branch> > > Is your latest PR supposed to be in what we downloaded & built, or do we > need to upgrade in some way (which?) > > Should we change anything to our settings? > > Please note that I mistakenly wrote that our "Kafka consumer strategy is > set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST", if > that matters. > > Best regards, > Alexandre Vermeerbergen > > 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>: > >> Correction: https://github.com/apache/storm/pull/2174 has all of what I >> was >> doing and more. >> >> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]> >> wrote: >> >> > Alexandre, >> > >> > There are quite a few JIRAs and discussions around this recently. The >> > default behavior for storm-kafka-client is the 'subscribe' API which >> causes >> > the immediate lag you see since rebalance will happen spout(n)-1 times >> > just from the spouts spinning up. >> > >> > There is a Builder for ManualPartitionNamedSubscription and the >> > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer API) >> but >> > they don't work at all. I hope to have a PR in today >> > to fix these on 1.x-branch >> > >> > The other JIRAs I mentioned are for a redesign of this spout or other >> more >> > drastic changes. My goal is a bug fix for a version of the spout that >> > doesn't provide unnecessary duplicates. >> > >> > Kris >> > >> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < >> > [email protected]> wrote: >> > >> >> Hello All, >> >> >> >> We have been running for a while our real-time supervision application >> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer: >> >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka >> >> 0.10.1.0. >> >> >> >> Backpressure is activated with default parameters. >> >> >> >> Key >> >> Value >> >> >> >> backpressure.disruptor.high.watermark 0.9 >> >> >> >> backpressure.disruptor.low.watermark 0.4 >> >> >> >> task.backpressure.poll.secs 30 >> >> >> >> topology.backpressure.enable true >> >> >> >> >> >> >> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new >> Kafka >> >> Spout (storm-kafka-client lib) with a consumer which has no more >> >> dependency on Zookeeper. >> >> >> >> After upgrade, we had several issues with kafka consumption. >> >> >> >> >> >> We saw that several JIRAs were opened and resolved on Apache Storm >> 1.1.1. >> >> >> >> So we decided to upgrade to the latest available Apache Storm 1.1.x >> code >> >> built from source (2017-06-26) but we still have issues : >> >> >> >> >> >> >> >> 1. The kafka Lag is increasing constantly and this leads to the >> overload >> >> of >> >> the storm worker running the kafka spout. At the end, the worker >> crashes >> >> and it is automatically restarted by Storm. >> >> >> >> >> >> >> >> With old kafka spout version, we had a lag most of the times bellow >> 10000. >> >> >> >> With the new one, we are starting with Kafka lag about 30000 and >> >> increasing >> >> until crash. >> >> >> >> >> >> >> >> 2. With the new Kafka Spout, we have faced this exception many times: >> >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit >> cannot be >> >> completed since the group has already rebalanced and assigned the >> >> partitions to another member. This means that the time between >> subsequent >> >> calls to poll() was longer than the configured max.poll.interval.ms, >> >> which >> >> typically implies that the poll loop is spending too much time message >> >> processing. You can address this either by increasing the session >> timeout >> >> or by reducing the maximum size of batches returned in poll() with >> >> max.poll.records. at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702) >> >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) >> >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( >> >> KafkaConsumer.java:1124) >> >> at >> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke >> >> dTuples(KafkaSpout.java:384) >> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout >> .java:220) >> >> at >> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ >> >> 10826.invoke(executor.clj:646) >> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at >> >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja >> >> va:748) >> >> >> >> >> >> >> >> We are using the following configuration for Kafka Spout : >> >> >> >> poll.timeout.ms 200. >> >> >> >> offset.commit.period.ms 30000 (30 seconds). >> >> >> >> max.uncommitted.offsets 10000000 (ten million). >> >> >> >> >> >> >> >> The Kafka consumer strategy is set to EARLY. We also set the following >> >> Kafka consumer parameter : >> >> >> >> session.timeout.ms 120000 >> >> >> >> Are there any traces/options which we could turn on on Storm or on >> Kafka >> >> Broker that might help understanding how to stabilize our topologies >> with >> >> this new branch? >> >> >> >> Thanks! >> >> >> >> Alexandre Vermeerbergen >> >> >> > >> > >> > >
