Hello All,

We have been running for a while our real-time supervision application
based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer:
storm-kafka) and with our Kafka Broker cluster based on Apache Kafka
0.10.1.0.

Backpressure is activated with default parameters.

 Key
Value

backpressure.disruptor.high.watermark              0.9

backpressure.disruptor.low.watermark               0.4

task.backpressure.poll.secs                                     30

topology.backpressure.enable                               true



We decided to upgrade to Apache Storm 1.1.0 to benefit from the new Kafka
Spout  (storm-kafka-client lib) with a consumer which has no more
dependency on Zookeeper.

After upgrade, we had several issues with kafka consumption.


We saw that several JIRAs were opened and resolved on Apache Storm 1.1.1.

So we decided to upgrade to the latest available Apache Storm 1.1.x code
built from source (2017-06-26) but  we still have issues :



1. The kafka Lag is increasing constantly and this leads to the overload of
the storm worker running the kafka spout. At the end, the worker crashes
and it is automatically restarted by Storm.



With old kafka spout version, we had a lag most of the times bellow 10000.

With the new one, we are starting with Kafka lag about 30000 and increasing
until crash.



2. With the new Kafka Spout, we have faced this exception many times:



org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
at
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220)
at
org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__10826.invoke(executor.clj:646)
at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748)



We are using the following configuration for Kafka Spout :

poll.timeout.ms 200.

offset.commit.period.ms  30000          (30 seconds).

max.uncommitted.offsets 10000000  (ten million).



The Kafka consumer strategy is set to EARLY. We also set the following
Kafka consumer parameter :

session.timeout.ms  120000

Are there any traces/options which we could turn on on Storm or on Kafka
Broker that might help understanding how to stabilize our topologies with
this new branch?

Thanks!

Alexandre Vermeerbergen

Reply via email to