Alexandre,

There are quite a few JIRAs and discussions around this recently.  The
default behavior for storm-kafka-client is the 'subscribe' API which causes
the immediate lag you see since rebalance will happen spout(n)-1 times just
from the spouts spinning up.

There is a Builder for ManualPartitionNamedSubscription and the
RoundRobinManualPartitioner (which use the 'assign' Kafka consumer API) but
they don't work at all.  I hope to have a PR in today
to fix these on 1.x-branch

The other JIRAs I mentioned are for a redesign of this spout or other more
drastic changes.  My goal is a bug fix for a version of the spout that
doesn't provide unnecessary duplicates.

Kris

On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
[email protected]> wrote:

> Hello All,
>
> We have been running for a while our real-time supervision application
> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer:
> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka
> 0.10.1.0.
>
> Backpressure is activated with default parameters.
>
>  Key
> Value
>
> backpressure.disruptor.high.watermark              0.9
>
> backpressure.disruptor.low.watermark               0.4
>
> task.backpressure.poll.secs                                     30
>
> topology.backpressure.enable                               true
>
>
>
> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new Kafka
> Spout  (storm-kafka-client lib) with a consumer which has no more
> dependency on Zookeeper.
>
> After upgrade, we had several issues with kafka consumption.
>
>
> We saw that several JIRAs were opened and resolved on Apache Storm 1.1.1.
>
> So we decided to upgrade to the latest available Apache Storm 1.1.x code
> built from source (2017-06-26) but  we still have issues :
>
>
>
> 1. The kafka Lag is increasing constantly and this leads to the overload of
> the storm worker running the kafka spout. At the end, the worker crashes
> and it is automatically restarted by Storm.
>
>
>
> With old kafka spout version, we had a lag most of the times bellow 10000.
>
> With the new one, we are starting with Kafka lag about 30000 and increasing
> until crash.
>
>
>
> 2. With the new Kafka Spout, we have faced this exception many times:
>
>
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms, which
> typically implies that the poll loop is spending too much time message
> processing. You can address this either by increasing the session timeout
> or by reducing the maximum size of batches returned in poll() with
> max.poll.records. at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> sendOffsetCommitRequest(ConsumerCoordinator.java:702)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:581)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1124)
> at
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(
> KafkaSpout.java:384)
> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220)
> at
> org.apache.storm.daemon.executor$fn__10780$fn__10795$
> fn__10826.invoke(executor.clj:646)
> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:748)
>
>
>
> We are using the following configuration for Kafka Spout :
>
> poll.timeout.ms 200.
>
> offset.commit.period.ms  30000          (30 seconds).
>
> max.uncommitted.offsets 10000000  (ten million).
>
>
>
> The Kafka consumer strategy is set to EARLY. We also set the following
> Kafka consumer parameter :
>
> session.timeout.ms  120000
>
> Are there any traces/options which we could turn on on Storm or on Kafka
> Broker that might help understanding how to stabilize our topologies with
> this new branch?
>
> Thanks!
>
> Alexandre Vermeerbergen
>

Reply via email to