Oops, sent my last mail too fast, let me continue it:

Hello,

Coming back to my original post in this list, we have 3 issues with latest
1.1.x StormKafkaClient spout with our setup:

Issue#1:
 Initial lag (which we hadn't using the classic Storm Kafka spout)
   For this issue, my understanding of Kristopher's answer is that this is
"by design" of the StormKafkaClient spout, which instances progressively
joins Kafka consumers group, which causes consumers rebalancing. This
rebalancing is "slow", which means that until all spout instances are
started, the topology starts with an "initial Kafka Lag"
   => Is my understanding correct?
   => Why don't we have such behavior with the old Storm Kafka spout ?
   => Is this annoying initial lag tracked by a JIRA ?

Issue#2:
    The kafka Lag is increasing constantly and this leads to the overload
of the storm worker running the kafka spout. At the end, the worker crashes
and it is automatically restarted by Storm.
    => This is unlike what we observe with the old Storm Kafka spout
    => What is the recommended way to analyze this issue?

Issue3:
  With the new Kafka Spout, we have faced this exception many times:

 org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records. at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
commitOffsetsSync(ConsumerCoordinator.java:581) at org.apache.kafka.clients.
consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) at
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:220)
at org.apache.storm.daemon.executor$fn__10780$fn__10795$
fn__10826.invoke(executor.clj:646) at org.apache.storm.util$async_
loop$fn__555.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at
java.lang.Thread.run(Thread.java:748)


  => Are we the only ones experiencing such issues with Storm 1.1.0/1.1.x
latest ?

Note: We are considering writing our own Kafka Spout, as we're time-bound
to move to Kafka 0.10.x consumers & producers (to prepare our next step
with Kafka security, which isn't available with Kafka 0.9.x). We will miss
the integration of Kafka lag in StormUI, but currently we do not understand
how to solve the regressions we observe with latest Storm Kafka client
spout.

Are there other Storm developers/users who jumped into this alternative?

Best regards,

Alexandre Vermeerbergen




2017-06-28 9:09 GMT+02:00 Alexandre Vermeerbergen <[email protected]>
:

> Hello,
>
> Coming back to my original post in this list, we have two issues with
> latest 1.1.x StormKafkaClient spout with our setup:
>
> Issue#1:
>  Initial lag (which we hadn't using the classic Storm Kafka spout)
>    For this issue, my understanding of Kristopher's answer is that this is
> "by design" of the StormKafkaClient spout, which instances progressively
> joins Kafka consumers group, which causes consumers rebalancing. This
> rebalancing is "slow", which means that until all spout instances are
> started, the topology starts with an "initial Kafka Lag"
>    => Is my understanding correct?
>    => Why don't we have such behavior with the old Storm Kafka spout ?
>    => Is this annoying initial lag tracked by a JIRA ?
>
>
>
> 2017-06-27 17:15 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]>:
>
>> Hello Kristopher,
>>
>> We built Storm 1.1.1-latest using yesterday's (2017-06-26)  artifacts
>> downloaded from https://github.com/apache/storm/tree/1.x-branch.
>> <https://github.com/apache/storm/tree/1.x-branch>
>>
>> Is your latest PR supposed to be in what we downloaded & built, or do we
>> need to upgrade in some way (which?)
>>
>> Should we change anything to our settings?
>>
>> Please note that I mistakenly wrote that our "Kafka consumer strategy is
>> set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST", if
>> that matters.
>>
>> Best regards,
>> Alexandre Vermeerbergen
>>
>> 2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>:
>>
>>> Correction: https://github.com/apache/storm/pull/2174 has all of what I
>>> was
>>> doing and more.
>>>
>>> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]>
>>> wrote:
>>>
>>> > Alexandre,
>>> >
>>> > There are quite a few JIRAs and discussions around this recently.  The
>>> > default behavior for storm-kafka-client is the 'subscribe' API which
>>> causes
>>> > the immediate lag you see since rebalance will happen spout(n)-1 times
>>> > just from the spouts spinning up.
>>> >
>>> > There is a Builder for ManualPartitionNamedSubscription and the
>>> > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer
>>> API) but
>>> > they don't work at all.  I hope to have a PR in today
>>> > to fix these on 1.x-branch
>>> >
>>> > The other JIRAs I mentioned are for a redesign of this spout or other
>>> more
>>> > drastic changes.  My goal is a bug fix for a version of the spout that
>>> > doesn't provide unnecessary duplicates.
>>> >
>>> > Kris
>>> >
>>> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
>>> > [email protected]> wrote:
>>> >
>>> >> Hello All,
>>> >>
>>> >> We have been running for a while our real-time supervision application
>>> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer:
>>> >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka
>>> >> 0.10.1.0.
>>> >>
>>> >> Backpressure is activated with default parameters.
>>> >>
>>> >>  Key
>>> >> Value
>>> >>
>>> >> backpressure.disruptor.high.watermark              0.9
>>> >>
>>> >> backpressure.disruptor.low.watermark               0.4
>>> >>
>>> >> task.backpressure.poll.secs                                     30
>>> >>
>>> >> topology.backpressure.enable                               true
>>> >>
>>> >>
>>> >>
>>> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new
>>> Kafka
>>> >> Spout  (storm-kafka-client lib) with a consumer which has no more
>>> >> dependency on Zookeeper.
>>> >>
>>> >> After upgrade, we had several issues with kafka consumption.
>>> >>
>>> >>
>>> >> We saw that several JIRAs were opened and resolved on Apache Storm
>>> 1.1.1.
>>> >>
>>> >> So we decided to upgrade to the latest available Apache Storm 1.1.x
>>> code
>>> >> built from source (2017-06-26) but  we still have issues :
>>> >>
>>> >>
>>> >>
>>> >> 1. The kafka Lag is increasing constantly and this leads to the
>>> overload
>>> >> of
>>> >> the storm worker running the kafka spout. At the end, the worker
>>> crashes
>>> >> and it is automatically restarted by Storm.
>>> >>
>>> >>
>>> >>
>>> >> With old kafka spout version, we had a lag most of the times bellow
>>> 10000.
>>> >>
>>> >> With the new one, we are starting with Kafka lag about 30000 and
>>> >> increasing
>>> >> until crash.
>>> >>
>>> >>
>>> >>
>>> >> 2. With the new Kafka Spout, we have faced this exception many times:
>>> >>
>>> >>
>>> >>
>>> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> cannot be
>>> >> completed since the group has already rebalanced and assigned the
>>> >> partitions to another member. This means that the time between
>>> subsequent
>>> >> calls to poll() was longer than the configured max.poll.interval.ms,
>>> >> which
>>> >> typically implies that the poll loop is spending too much time message
>>> >> processing. You can address this either by increasing the session
>>> timeout
>>> >> or by reducing the maximum size of batches returned in poll() with
>>> >> max.poll.records. at
>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
>>> >> at
>>> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
>>> >> at
>>> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>>> >> KafkaConsumer.java:1124)
>>> >> at
>>> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
>>> >> dTuples(KafkaSpout.java:384)
>>> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout
>>> .java:220)
>>> >> at
>>> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
>>> >> 10826.invoke(executor.clj:646)
>>> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
>>> >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja
>>> >> va:748)
>>> >>
>>> >>
>>> >>
>>> >> We are using the following configuration for Kafka Spout :
>>> >>
>>> >> poll.timeout.ms 200.
>>> >>
>>> >> offset.commit.period.ms  30000          (30 seconds).
>>> >>
>>> >> max.uncommitted.offsets 10000000  (ten million).
>>> >>
>>> >>
>>> >>
>>> >> The Kafka consumer strategy is set to EARLY. We also set the following
>>> >> Kafka consumer parameter :
>>> >>
>>> >> session.timeout.ms  120000
>>> >>
>>> >> Are there any traces/options which we could turn on on Storm or on
>>> Kafka
>>> >> Broker that might help understanding how to stabilize our topologies
>>> with
>>> >> this new branch?
>>> >>
>>> >> Thanks!
>>> >>
>>> >> Alexandre Vermeerbergen
>>> >>
>>> >
>>> >
>>>
>>
>>
>

Reply via email to