I know there are a number of questions in this thread, but I have some
specific comments on dealing with your failed tuples in the indexing
topology. For starters, I can almost guarantee that your problem is going
to be with tuning ES and not HDFS. Not to say you won't need to tune HDFS,
but look at the # emitted tuples, capacity, and # acked in Storm for both
the hdfsIndexingBolt and the elasticsearchIndexingBolt. See which one is
falling behind and start there. Turn off HDFS or ES and see what the
difference is. That at least helps with narrowing focus in the short term.
Having been through this a couple times now, I'd like to tell you there's a
fast and easy silver bullet, but there isn't one. There are a lot of moving
parts and you need to be careful of making small incremental changes
towards a stable stream.

If you're not seeing exceptions in the logs, e.g. OOM issues, but you're
seeing a lot of failed tuples it sounds like your kafka spout speed might
be ok. So let's focus on the ES indexing portion for now. Check out the
workflow I've outlined below and you'll see my thoughts on one potential
ordering of steps to perf tuning the indexing topology.

   - Look at the # emitted tuples, capacity, and # acked in Storm for both
   the hdfsIndexingBolt and the elasticsearchIndexingBolt. Either of these way
   ahead or way behind?
   - Try disabling HDFS indexing (enabled: false in the indexing configs).
   HDFS is probably not your bottleneck, but if it is, consider disabling ES
   across all sensors for indexing instead. Push the config changes and
   restart your topologies. I don't think you technically need to restart the
   topologies for ZK config changes here, but it makes debugging a lot
   simpler. If you find changing it on the fly works well, then I'm sure
   myself and the community will be happy to hear the results!
   - Adjust your # of workers up along with # ackers. When we were testing
   just HDFS (no ES) we had 4 workers and 24 ackers for a million
   records/second. Restart the topology. Any difference?
   - Now increasing ES indexing bolt parallelism. Restart topology.
   - Increase batch sizes for ES - try 5, then 50, then 100. Restart after
   each test.
   - Once you've increased parallelism and removed variables like HDFS from
   the equation, you can narrow in on the real culprit. One method I found
   extremely effective for getting stability is to adjust the
   *indexing_topology_max_spout_pending* in Ambari, under Indexing. By
   default, the Kafka spout will suck data as fast as the spout can take it.
   This in turn can cause timeouts when ES is not tuned/able to keep up.
   Basically, the indexing topology will get further and further behind,
   regardless of how much parallelism (Workers and Executors) you throw at it
   because it's unable to get data into ES at the rate matching what's coming
   from your spouts. You might even think to up the number of partitions in
   your indexing Kafka topic, which helps overall consumption rate, but that
   will only firehouse the downstream bottleneck, eventually resulting in the
   failed tuples you're seeing. You need to first find a stable rate that ES
   can handle given its current tuning configuration and then work up from
   there. I just went through this the other day and I started by lowering the
   setting to a conservative 1000.
   - From this point, you can work on tuning ES or HDFS, whichever the
   culprit is. Realistically, you're going to need to do a few tuning cycles
   to tune that end point, increase the upstream throughput, tune again, and
   repeat until you get the rate of throughput necessary to handle your live
   feeds.

The other recommendation I have that goes hand in hand with the approach
above is to look at the lag offsets for your indexing topic [1]. This is
invaluable, and it's really, really easy to use from the CLI. There are 2
ways to approach this problem:

   1. live sensor data streaming with the indexing topology keeping pace
   e2e. e.g. bro sensor to bro topic to enrichment topic to indexing topic.
   You would expect to see your kafka partition consumer offsets maintaining a
   fairly consistent lag after a few minutes of activity and stabilizing.
   2. reading a large quantity of data already landed in the indexing topic
   (start indexing topology with Kafka offset = EARLIEST). The goal would be
   to see your offset lag catch up eventually. This allows you to stress test
   your indexing topology different from a normal streaming experience. One,
   it simplifies the moving parts (no parsers, enrichments are live at this
   point - you're replaying data already in the topic). But two, you can
   easily tune the levers outlined above, iterate with each change in rapid
   succession, and record your results.

1.
https://github.com/apache/metron/blob/master/metron-platform/Performance-tuning-guide.md

Sample command without Kerberos enabled (see link [1] for more detail with
Kerberos):

watch -n 10 -d ${KAFKA_HOME}/bin/kafka-consumer-groups.sh \
    --describe \
    --group indexing \
    --bootstrap-server $BROKERLIST \
    --new-consumer

Hope this helps.

Cheers,
Michael Miklavcic


On Sun, Dec 10, 2017 at 5:38 AM, Ali Nazemian <alinazem...@gmail.com> wrote:

> This seems not the same as our observations. Whenever there are some
> messages in the indexing or enrichments backlog, the new configurations (at
> least related to the batch size) won't be applied to the new messages. It
> will remain as the previous state until it processes all the old messages.
> This scenario can be produced very easily.
>
> Create a feed with an inefficient batch size to create a backlog on
> indexing topic. Then change the batch size to an effective value and wait
> to see how long it will take to process the backlog. Based on our
> observations, it takes a while to process messages in a back-log even if
> you fix the batch size. It feels batch size changes are not synchronised
> instantly.
>
> On Thu, Dec 7, 2017 at 11:45 PM, Otto Fowler <ottobackwa...@gmail.com>
> wrote:
>
> > We use TreeCache
> > <https://curator.apache.org/apidocs/org/apache/curator/
> framework/recipes/cache/TreeCache.html>
> > .
> >
> > When the configuration is updated in zookeeper, the configuration object
> > in the bolt is updated. This configuration is read on each message, so I
> > think from what I see new configurations should get picked up for the
> next
> > message.
> >
> > I could be wrong though.
> >
> >
> >
> >
> > On December 7, 2017 at 06:47:15, Ali Nazemian (alinazem...@gmail.com)
> > wrote:
> >
> > Thank you very much. Unfortunately, reproducing all the situations are
> > very costly for us at this moment. We are kind of avoiding to hit that
> > issue by using the same batch size for all the feeds. Hopefully, with the
> > new PR Casey provided for the segregation of ES and HDFS, it will be very
> > much clear to tune them.
> >
> > Do you know how the synchronization of indexing config will happen with
> > the topology? Does the topology gets synchronised by pulling the last
> > configs from ZK based on some background mechanism or it is based on an
> > update trigger? As I mentioned, based on our observation it looks like
> the
> > synchronization doesn't work until all the old messages in Kafka queue
> get
> > processed based on the old indexing configs.
> >
> > Regards,
> > Ali
> >
> > On Thu, Dec 7, 2017 at 12:33 AM, Otto Fowler <ottobackwa...@gmail.com>
> > wrote:
> >
> >> Sorry,
> >> We flush for timeouts on every storm ‘tick’ message, not on every
> message.
> >>
> >>
> >>
> >> On December 6, 2017 at 08:29:51, Otto Fowler (ottobackwa...@gmail.com)
> >> wrote:
> >>
> >> I have looked at it.
> >>
> >> We maintain batch lists for each sensor which gather messages to index.
> >> When we get a message that puts it over the batch size the messages are
> >> flushed and written to the target.
> >> There is also a timeout component, where the batch would be flushed
> based
> >> on timeout.
> >>
> >> While batch size checking occurs on a per sensor-message receipt basis,
> >> each message, regardless of sensor will trigger a check of the batch
> >> timeout for all the lists.
> >>
> >> At least that is what I think I see.
> >>
> >> Without understanding what the failures are for it is hard to see what
> >> the issue is.
> >>
> >> Do we have timing issues where all the lists are timing out all the time
> >> causing some kind of cascading failure for example?
> >> Does the number of sensors matter?  For example if only one sensor
> >> topology is running with batch setup X, is everything fine?  Do failures
> >> start after adding Nth additional sensor?
> >>
> >> Hopefully someone else on the list may have an idea.
> >> That code does not have any logging to speak of… well debug / trace
> >> logging that would help here either.
> >>
> >>
> >>
> >> On December 6, 2017 at 08:18:01, Ali Nazemian (alinazem...@gmail.com)
> >> wrote:
> >>
> >> Everything looks normal except the high number of failed tuples. Do you
> >> know how the indexing batch size works? Based on our observations it
> seems
> >> it doesn't update the messages that are in enrichments and indexing
> topics.
> >>
> >> On Thu, Dec 7, 2017 at 12:13 AM, Otto Fowler <ottobackwa...@gmail.com>
> >> wrote:
> >>
> >>> What do you see in the storm ui for the indexing topology?
> >>>
> >>>
> >>> On December 6, 2017 at 07:10:17, Ali Nazemian (alinazem...@gmail.com)
> >>> wrote:
> >>>
> >>> Both hdfs and Elasticsearch batch sizes. There is no error in the logs.
> >>> It mpacts topology error rate and cause almost 90% error rate on
> indexing
> >>> tuples.
> >>>
> >>> On 6 Dec. 2017 00:20, "Otto Fowler" <ottobackwa...@gmail.com> wrote:
> >>>
> >>> Where are you seeing the errors?  Screenshot?
> >>>
> >>>
> >>> On December 5, 2017 at 08:03:46, Otto Fowler (ottobackwa...@gmail.com)
> >>> wrote:
> >>>
> >>> Which of the indexing options are you changing the batch size for?
> >>> HDFS?  Elasticsearch?  Both?
> >>>
> >>> Can you give an example?
> >>>
> >>>
> >>>
> >>> On December 5, 2017 at 02:09:29, Ali Nazemian (alinazem...@gmail.com)
> >>> wrote:
> >>>
> >>> No specific error in the logs. I haven't enabled debug/trace, though.
> >>>
> >>> On Tue, Dec 5, 2017 at 11:54 AM, Otto Fowler <ottobackwa...@gmail.com>
> >>> wrote:
> >>>
> >>>> My first thought is what are the errors when you get a high error
> rate?
> >>>>
> >>>>
> >>>> On December 4, 2017 at 19:34:29, Ali Nazemian (alinazem...@gmail.com)
> >>>> wrote:
> >>>>
> >>>> Any thoughts?
> >>>>
> >>>> On Sun, Dec 3, 2017 at 11:27 PM, Ali Nazemian <alinazem...@gmail.com>
> >>>> wrote:
> >>>>
> >>>> > Hi,
> >>>> >
> >>>> > We have noticed recently that no matter what batch size we use for
> >>>> Metron
> >>>> > indexing feeds, as long as we start using different batch size for
> >>>> > different Metron feeds, indexing topology throughput will start
> >>>> dropping
> >>>> > due to the high error rate! So I was wondering whether based on the
> >>>> current
> >>>> > indexing topology design, we have to choose the same batch size for
> >>>> all the
> >>>> > feeds or not. Otherwise, throughout will be dropped. I assume since
> >>>> it is
> >>>> > acceptable to use different batch sizes for different feeds, it is
> not
> >>>> > expected by design.
> >>>> >
> >>>> > Moreover, I have noticed in practice that even if we change the
> batch
> >>>> > size, it will not affect the messages that are already in
> enrichments
> >>>> or
> >>>> > indexing topics, and it will only affect the new messages that are
> >>>> coming
> >>>> > to the parser. Therefore, we need to let all the messages pass the
> >>>> indexing
> >>>> > topology so that we can change the batch size!
> >>>> >
> >>>> > It would be great if we can have more details regarding the design
> of
> >>>> this
> >>>> > section so we can understand our observations are based on the
> design
> >>>> or
> >>>> > some kind of bug.
> >>>> >
> >>>> > Regards,
> >>>> > Ali
> >>>> >
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> A.Nazemian
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> A.Nazemian
> >>>
> >>>
> >>>
> >>
> >>
> >> --
> >> A.Nazemian
> >>
> >>
> >
> >
> > --
> > A.Nazemian
> >
> >
>
>
> --
> A.Nazemian
>

Reply via email to