Hello Kristopher,

We built Storm 1.1.1-latest using yesterday's (2017-06-26)  artifacts
downloaded from https://github.com/apache/storm/tree/1.x-branch.
<https://github.com/apache/storm/tree/1.x-branch>

Is your latest PR supposed to be in what we downloaded & built, or do we
need to upgrade in some way (which?)

Should we change anything to our settings?

Please note that I mistakenly wrote that our "Kafka consumer strategy is
set to EARLY" whereas it's "Kafka consumer strategy is set to LATEST", if
that matters.

Best regards,
Alexandre Vermeerbergen

2017-06-27 16:37 GMT+02:00 Kristopher Kane <[email protected]>:

> Correction: https://github.com/apache/storm/pull/2174 has all of what I
> was
> doing and more.
>
> On Tue, Jun 27, 2017 at 9:33 AM, Kristopher Kane <[email protected]>
> wrote:
>
> > Alexandre,
> >
> > There are quite a few JIRAs and discussions around this recently.  The
> > default behavior for storm-kafka-client is the 'subscribe' API which
> causes
> > the immediate lag you see since rebalance will happen spout(n)-1 times
> > just from the spouts spinning up.
> >
> > There is a Builder for ManualPartitionNamedSubscription and the
> > RoundRobinManualPartitioner (which use the 'assign' Kafka consumer API)
> but
> > they don't work at all.  I hope to have a PR in today
> > to fix these on 1.x-branch
> >
> > The other JIRAs I mentioned are for a redesign of this spout or other
> more
> > drastic changes.  My goal is a bug fix for a version of the spout that
> > doesn't provide unnecessary duplicates.
> >
> > Kris
> >
> > On Tue, Jun 27, 2017 at 8:00 AM, Alexandre Vermeerbergen <
> > [email protected]> wrote:
> >
> >> Hello All,
> >>
> >> We have been running for a while our real-time supervision application
> >> based on Apache Storm 1.0.3 with Storm Kafka Spouts (old consumer:
> >> storm-kafka) and with our Kafka Broker cluster based on Apache Kafka
> >> 0.10.1.0.
> >>
> >> Backpressure is activated with default parameters.
> >>
> >>  Key
> >> Value
> >>
> >> backpressure.disruptor.high.watermark              0.9
> >>
> >> backpressure.disruptor.low.watermark               0.4
> >>
> >> task.backpressure.poll.secs                                     30
> >>
> >> topology.backpressure.enable                               true
> >>
> >>
> >>
> >> We decided to upgrade to Apache Storm 1.1.0 to benefit from the new
> Kafka
> >> Spout  (storm-kafka-client lib) with a consumer which has no more
> >> dependency on Zookeeper.
> >>
> >> After upgrade, we had several issues with kafka consumption.
> >>
> >>
> >> We saw that several JIRAs were opened and resolved on Apache Storm
> 1.1.1.
> >>
> >> So we decided to upgrade to the latest available Apache Storm 1.1.x code
> >> built from source (2017-06-26) but  we still have issues :
> >>
> >>
> >>
> >> 1. The kafka Lag is increasing constantly and this leads to the overload
> >> of
> >> the storm worker running the kafka spout. At the end, the worker crashes
> >> and it is automatically restarted by Storm.
> >>
> >>
> >>
> >> With old kafka spout version, we had a lag most of the times bellow
> 10000.
> >>
> >> With the new one, we are starting with Kafka lag about 30000 and
> >> increasing
> >> until crash.
> >>
> >>
> >>
> >> 2. With the new Kafka Spout, we have faced this exception many times:
> >>
> >>
> >>
> >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be
> >> completed since the group has already rebalanced and assigned the
> >> partitions to another member. This means that the time between
> subsequent
> >> calls to poll() was longer than the configured max.poll.interval.ms,
> >> which
> >> typically implies that the poll loop is spending too much time message
> >> processing. You can address this either by increasing the session
> timeout
> >> or by reducing the maximum size of batches returned in poll() with
> >> max.poll.records. at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.commitOffsetsSync(ConsumerCoordinator.java:581)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> >> KafkaConsumer.java:1124)
> >> at
> >> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAcke
> >> dTuples(KafkaSpout.java:384)
> >> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(
> KafkaSpout.java:220)
> >> at
> >> org.apache.storm.daemon.executor$fn__10780$fn__10795$fn__
> >> 10826.invoke(executor.clj:646)
> >> at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) at
> >> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.ja
> >> va:748)
> >>
> >>
> >>
> >> We are using the following configuration for Kafka Spout :
> >>
> >> poll.timeout.ms 200.
> >>
> >> offset.commit.period.ms  30000          (30 seconds).
> >>
> >> max.uncommitted.offsets 10000000  (ten million).
> >>
> >>
> >>
> >> The Kafka consumer strategy is set to EARLY. We also set the following
> >> Kafka consumer parameter :
> >>
> >> session.timeout.ms  120000
> >>
> >> Are there any traces/options which we could turn on on Storm or on Kafka
> >> Broker that might help understanding how to stabilize our topologies
> with
> >> this new branch?
> >>
> >> Thanks!
> >>
> >> Alexandre Vermeerbergen
> >>
> >
> >
>

Reply via email to