Alexandre, There are quite a few JIRAs and discussions around this recently. The default behavior for storm-kafka-client is the 'subscribe' API which causes the immediate lag you see since rebalance will happen spout(n)-1 times just from the spouts spinning up.
There is a Builder for ManualPartitionNamedSubscription and the RoundRobinManualPartitioner (which use the 'assign' Kafka consumer API) but they don't work at all. I hope to have a PR in today to fix these on 1.x-branch The other JIRAs I mentioned are for a redesign of this spout or other more drastic changes. My goal is a bug fix for a version of the spout that doesn't provide unnecessary duplicates. Kris On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen < [email protected]> wrote: > Hello All, > > We have been running for a while our real-time supervision application > based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer: > storm-kafka) and with our Kafka Broker cluster based on Apache Kafka > 0.10.1.0. > > Backpressure is activated with default parameters. > > Key > Value > > backpressure.disruptor.high.watermark 0.9 > > backpressure.disruptor.low.watermark 0.4 > > task.backpressure.poll.secs 30 > > topology.backpressure.enable true > > > > We decided to upgrade to Apache Storm 1.1.0 to benefit from the new Kafka > Spout (storm-kafka-client lib) with a consumer which has no more > dependency on Zookeeper. > > After upgrade, we had several issues with kafka consumption. > > > We saw that several JIRAs were opened and resolved on Apache Storm 1.1.1. > > So we decided to upgrade to the latest available Apache Storm 1.1.x code > built from source (2017-06-26) but we still have issues : > > > > 1. The kafka Lag is increasing constantly and this leads to the overload of > the storm worker running the kafka spout. At the end, the worker crashes > and it is automatically restarted by Storm. > > > > With old kafka spout version, we had a lag most of the times bellow 10000. > > With the new one, we are starting with Kafka lag about 30000 and increasing > until crash. > > > > 2. With the new Kafka Spout, we have faced this exception many times: > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time message > processing. You can address this either by increasing the session timeout > or by reducing the maximum size of batches returned in poll() with > max.poll.records. at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > sendOffsetCommitRequest(ConsumerCoordinator.java:702) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > commitOffsetsSync(ConsumerCoordinator.java:581) > at > org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1124) > at > org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples( > KafkaSpout.java:384) > at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220) > at > org.apache.storm.daemon.executor$fn__10780$fn__10795$ > fn__10826.invoke(executor.clj:646) > at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at > clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748) > > > > We are using the following configuration for Kafka Spout : > > poll.timeout.ms 200. > > offset.commit.period.ms 30000 (30 seconds). > > max.uncommitted.offsets 10000000 (ten million). > > > > The Kafka consumer strategy is set to EARLY. We also set the following > Kafka consumer parameter : > > session.timeout.ms 120000 > > Are there any traces/options which we could turn on on Storm or on Kafka > Broker that might help understanding how to stabilize our topologies with > this new branch? > > Thanks! > > Alexandre Vermeerbergen >
