Hello All, We have been running for a while our real-time supervision application based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer: storm-kafka) and with our Kafka Broker cluster based on Apache Kafka 0.10.1.0.
Backpressure is activated with default parameters. Key Value backpressure.disruptor.high.watermark 0.9 backpressure.disruptor.low.watermark 0.4 task.backpressure.poll.secs 30 topology.backpressure.enable true We decided to upgrade to Apache Storm 1.1.0 to benefit from the new Kafka Spout (storm-kafka-client lib) with a consumer which has no more dependency on Zookeeper. After upgrade, we had several issues with kafka consumption. We saw that several JIRAs were opened and resolved on Apache Storm 1.1.1. So we decided to upgrade to the latest available Apache Storm 1.1.x code built from source (2017-06-26) but we still have issues : 1. The kafka Lag is increasing constantly and this leads to the overload of the storm worker running the kafka spout. At the end, the worker crashes and it is automatically restarted by Storm. With old kafka spout version, we had a lag most of the times bellow 10000. With the new one, we are starting with Kafka lag about 30000 and increasing until crash. 2. With the new Kafka Spout, we have faced this exception many times: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) at org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220) at org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__10826.invoke(executor.clj:646) at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748) We are using the following configuration for Kafka Spout : poll.timeout.ms 200. offset.commit.period.ms 30000 (30 seconds). max.uncommitted.offsets 10000000 (ten million). The Kafka consumer strategy is set to EARLY. We also set the following Kafka consumer parameter : session.timeout.ms 120000 Are there any traces/options which we could turn on on Storm or on Kafka Broker that might help understanding how to stabilize our topologies with this new branch? Thanks! Alexandre Vermeerbergen
