Hello Kristopher, We built Storm 1.1.1-latest using yesterday's (2017-06-26) artifacts downloaded from https://github.com/apache/storm/tree/1.x-branch. <https://github.com/apache/storm/tree/1.x-branch>
Is your latest PR supposed to be in what we downloaded & built, or do we need to upgrade in some way (which?) Should we change anything to our settings? Please note that I mistakenly wrote that our "Kafka consumer strategy is set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST", if that matters. Best regards, Alexandre Vermeerbergen 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>: > Correction: https://github.com/apache/storm/pull/2174 has all of what I > was > doing and more. > > On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]> > wrote: > > > Alexandre, > > > > There are quite a few JIRAs and discussions around this recently. The > > default behavior for storm-kafka-client is the 'subscribe' API which > causes > > the immediate lag you see since rebalance will happen spout(n)-1 times > > just from the spouts spinning up. > > > > There is a Builder for ManualPartitionNamedSubscription and the > > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer API) > but > > they don't work at all. I hope to have a PR in today > > to fix these on 1.x-branch > > > > The other JIRAs I mentioned are for a redesign of this spout or other > more > > drastic changes. My goal is a bug fix for a version of the spout that > > doesn't provide unnecessary duplicates. > > > > Kris > > > > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < > > [email protected]> wrote: > > > >> Hello All, > >> > >> We have been running for a while our real-time supervision application > >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer: > >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka > >> 0.10.1.0. > >> > >> Backpressure is activated with default parameters. > >> > >> Key > >> Value > >> > >> backpressure.disruptor.high.watermark 0.9 > >> > >> backpressure.disruptor.low.watermark 0.4 > >> > >> task.backpressure.poll.secs 30 > >> > >> topology.backpressure.enable true > >> > >> > >> > >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new > Kafka > >> Spout (storm-kafka-client lib) with a consumer which has no more > >> dependency on Zookeeper. > >> > >> After upgrade, we had several issues with kafka consumption. > >> > >> > >> We saw that several JIRAs were opened and resolved on Apache Storm > 1.1.1. > >> > >> So we decided to upgrade to the latest available Apache Storm 1.1.x code > >> built from source (2017-06-26) but we still have issues : > >> > >> > >> > >> 1. The kafka Lag is increasing constantly and this leads to the overload > >> of > >> the storm worker running the kafka spout. At the end, the worker crashes > >> and it is automatically restarted by Storm. > >> > >> > >> > >> With old kafka spout version, we had a lag most of the times bellow > 10000. > >> > >> With the new one, we are starting with Kafka lag about 30000 and > >> increasing > >> until crash. > >> > >> > >> > >> 2. With the new Kafka Spout, we have faced this exception many times: > >> > >> > >> > >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be > >> completed since the group has already rebalanced and assigned the > >> partitions to another member. This means that the time between > subsequent > >> calls to poll() was longer than the configured max.poll.interval.ms, > >> which > >> typically implies that the poll loop is spending too much time message > >> processing. You can address this either by increasing the session > timeout > >> or by reducing the maximum size of batches returned in poll() with > >> max.poll.records. at > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702) > >> at > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.commitOffsetsSync(ConsumerCoordinator.java:581) > >> at > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > >> KafkaConsumer.java:1124) > >> at > >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke > >> dTuples(KafkaSpout.java:384) > >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple( > KafkaSpout.java:220) > >> at > >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__ > >> 10826.invoke(executor.clj:646) > >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at > >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja > >> va:748) > >> > >> > >> > >> We are using the following configuration for Kafka Spout : > >> > >> poll.timeout.ms 200. > >> > >> offset.commit.period.ms 30000 (30 seconds). > >> > >> max.uncommitted.offsets 10000000 (ten million). > >> > >> > >> > >> The Kafka consumer strategy is set to EARLY. We also set the following > >> Kafka consumer parameter : > >> > >> session.timeout.ms 120000 > >> > >> Are there any traces/options which we could turn on on Storm or on Kafka > >> Broker that might help understanding how to stabilize our topologies > with > >> this new branch? > >> > >> Thanks! > >> > >> Alexandre Vermeerbergen > >> > > > > >
