I know there are a number of questions in this thread, but I have some specific comments on dealing with your failed tuples in the indexing topology. For starters, I can almost guarantee that your problem is going to be with tuning ES and not HDFS. Not to say you won't need to tune HDFS, but look at the # emitted tuples, capacity, and # acked in Storm for both the hdfsIndexingBolt and the elasticsearchIndexingBolt. See which one is falling behind and start there. Turn off HDFS or ES and see what the difference is. That at least helps with narrowing focus in the short term. Having been through this a couple times now, I'd like to tell you there's a fast and easy silver bullet, but there isn't one. There are a lot of moving parts and you need to be careful of making small incremental changes towards a stable stream.
If you're not seeing exceptions in the logs, e.g. OOM issues, but you're seeing a lot of failed tuples it sounds like your kafka spout speed might be ok. So let's focus on the ES indexing portion for now. Check out the workflow I've outlined below and you'll see my thoughts on one potential ordering of steps to perf tuning the indexing topology. - Look at the # emitted tuples, capacity, and # acked in Storm for both the hdfsIndexingBolt and the elasticsearchIndexingBolt. Either of these way ahead or way behind? - Try disabling HDFS indexing (enabled: false in the indexing configs). HDFS is probably not your bottleneck, but if it is, consider disabling ES across all sensors for indexing instead. Push the config changes and restart your topologies. I don't think you technically need to restart the topologies for ZK config changes here, but it makes debugging a lot simpler. If you find changing it on the fly works well, then I'm sure myself and the community will be happy to hear the results! - Adjust your # of workers up along with # ackers. When we were testing just HDFS (no ES) we had 4 workers and 24 ackers for a million records/second. Restart the topology. Any difference? - Now increasing ES indexing bolt parallelism. Restart topology. - Increase batch sizes for ES - try 5, then 50, then 100. Restart after each test. - Once you've increased parallelism and removed variables like HDFS from the equation, you can narrow in on the real culprit. One method I found extremely effective for getting stability is to adjust the *indexing_topology_max_spout_pending* in Ambari, under Indexing. By default, the Kafka spout will suck data as fast as the spout can take it. This in turn can cause timeouts when ES is not tuned/able to keep up. Basically, the indexing topology will get further and further behind, regardless of how much parallelism (Workers and Executors) you throw at it because it's unable to get data into ES at the rate matching what's coming from your spouts. You might even think to up the number of partitions in your indexing Kafka topic, which helps overall consumption rate, but that will only firehouse the downstream bottleneck, eventually resulting in the failed tuples you're seeing. You need to first find a stable rate that ES can handle given its current tuning configuration and then work up from there. I just went through this the other day and I started by lowering the setting to a conservative 1000. - From this point, you can work on tuning ES or HDFS, whichever the culprit is. Realistically, you're going to need to do a few tuning cycles to tune that end point, increase the upstream throughput, tune again, and repeat until you get the rate of throughput necessary to handle your live feeds. The other recommendation I have that goes hand in hand with the approach above is to look at the lag offsets for your indexing topic [1]. This is invaluable, and it's really, really easy to use from the CLI. There are 2 ways to approach this problem: 1. live sensor data streaming with the indexing topology keeping pace e2e. e.g. bro sensor to bro topic to enrichment topic to indexing topic. You would expect to see your kafka partition consumer offsets maintaining a fairly consistent lag after a few minutes of activity and stabilizing. 2. reading a large quantity of data already landed in the indexing topic (start indexing topology with Kafka offset = EARLIEST). The goal would be to see your offset lag catch up eventually. This allows you to stress test your indexing topology different from a normal streaming experience. One, it simplifies the moving parts (no parsers, enrichments are live at this point - you're replaying data already in the topic). But two, you can easily tune the levers outlined above, iterate with each change in rapid succession, and record your results. 1. https://github.com/apache/metron/blob/master/metron-platform/Performance-tuning-guide.md Sample command without Kerberos enabled (see link [1] for more detail with Kerberos): watch -n 10 -d ${KAFKA_HOME}/bin/kafka-consumer-groups.sh \ --describe \ --group indexing \ --bootstrap-server $BROKERLIST \ --new-consumer Hope this helps. Cheers, Michael Miklavcic On Sun, Dec 10, 2017 at 5:38 AM, Ali Nazemian <alinazem...@gmail.com> wrote: > This seems not the same as our observations. Whenever there are some > messages in the indexing or enrichments backlog, the new configurations (at > least related to the batch size) won't be applied to the new messages. It > will remain as the previous state until it processes all the old messages. > This scenario can be produced very easily. > > Create a feed with an inefficient batch size to create a backlog on > indexing topic. Then change the batch size to an effective value and wait > to see how long it will take to process the backlog. Based on our > observations, it takes a while to process messages in a back-log even if > you fix the batch size. It feels batch size changes are not synchronised > instantly. > > On Thu, Dec 7, 2017 at 11:45 PM, Otto Fowler <ottobackwa...@gmail.com> > wrote: > > > We use TreeCache > > <https://curator.apache.org/apidocs/org/apache/curator/ > framework/recipes/cache/TreeCache.html> > > . > > > > When the configuration is updated in zookeeper, the configuration object > > in the bolt is updated. This configuration is read on each message, so I > > think from what I see new configurations should get picked up for the > next > > message. > > > > I could be wrong though. > > > > > > > > > > On December 7, 2017 at 06:47:15, Ali Nazemian (alinazem...@gmail.com) > > wrote: > > > > Thank you very much. Unfortunately, reproducing all the situations are > > very costly for us at this moment. We are kind of avoiding to hit that > > issue by using the same batch size for all the feeds. Hopefully, with the > > new PR Casey provided for the segregation of ES and HDFS, it will be very > > much clear to tune them. > > > > Do you know how the synchronization of indexing config will happen with > > the topology? Does the topology gets synchronised by pulling the last > > configs from ZK based on some background mechanism or it is based on an > > update trigger? As I mentioned, based on our observation it looks like > the > > synchronization doesn't work until all the old messages in Kafka queue > get > > processed based on the old indexing configs. > > > > Regards, > > Ali > > > > On Thu, Dec 7, 2017 at 12:33 AM, Otto Fowler <ottobackwa...@gmail.com> > > wrote: > > > >> Sorry, > >> We flush for timeouts on every storm ‘tick’ message, not on every > message. > >> > >> > >> > >> On December 6, 2017 at 08:29:51, Otto Fowler (ottobackwa...@gmail.com) > >> wrote: > >> > >> I have looked at it. > >> > >> We maintain batch lists for each sensor which gather messages to index. > >> When we get a message that puts it over the batch size the messages are > >> flushed and written to the target. > >> There is also a timeout component, where the batch would be flushed > based > >> on timeout. > >> > >> While batch size checking occurs on a per sensor-message receipt basis, > >> each message, regardless of sensor will trigger a check of the batch > >> timeout for all the lists. > >> > >> At least that is what I think I see. > >> > >> Without understanding what the failures are for it is hard to see what > >> the issue is. > >> > >> Do we have timing issues where all the lists are timing out all the time > >> causing some kind of cascading failure for example? > >> Does the number of sensors matter? For example if only one sensor > >> topology is running with batch setup X, is everything fine? Do failures > >> start after adding Nth additional sensor? > >> > >> Hopefully someone else on the list may have an idea. > >> That code does not have any logging to speak of… well debug / trace > >> logging that would help here either. > >> > >> > >> > >> On December 6, 2017 at 08:18:01, Ali Nazemian (alinazem...@gmail.com) > >> wrote: > >> > >> Everything looks normal except the high number of failed tuples. Do you > >> know how the indexing batch size works? Based on our observations it > seems > >> it doesn't update the messages that are in enrichments and indexing > topics. > >> > >> On Thu, Dec 7, 2017 at 12:13 AM, Otto Fowler <ottobackwa...@gmail.com> > >> wrote: > >> > >>> What do you see in the storm ui for the indexing topology? > >>> > >>> > >>> On December 6, 2017 at 07:10:17, Ali Nazemian (alinazem...@gmail.com) > >>> wrote: > >>> > >>> Both hdfs and Elasticsearch batch sizes. There is no error in the logs. > >>> It mpacts topology error rate and cause almost 90% error rate on > indexing > >>> tuples. > >>> > >>> On 6 Dec. 2017 00:20, "Otto Fowler" <ottobackwa...@gmail.com> wrote: > >>> > >>> Where are you seeing the errors? Screenshot? > >>> > >>> > >>> On December 5, 2017 at 08:03:46, Otto Fowler (ottobackwa...@gmail.com) > >>> wrote: > >>> > >>> Which of the indexing options are you changing the batch size for? > >>> HDFS? Elasticsearch? Both? > >>> > >>> Can you give an example? > >>> > >>> > >>> > >>> On December 5, 2017 at 02:09:29, Ali Nazemian (alinazem...@gmail.com) > >>> wrote: > >>> > >>> No specific error in the logs. I haven't enabled debug/trace, though. > >>> > >>> On Tue, Dec 5, 2017 at 11:54 AM, Otto Fowler <ottobackwa...@gmail.com> > >>> wrote: > >>> > >>>> My first thought is what are the errors when you get a high error > rate? > >>>> > >>>> > >>>> On December 4, 2017 at 19:34:29, Ali Nazemian (alinazem...@gmail.com) > >>>> wrote: > >>>> > >>>> Any thoughts? > >>>> > >>>> On Sun, Dec 3, 2017 at 11:27 PM, Ali Nazemian <alinazem...@gmail.com> > >>>> wrote: > >>>> > >>>> > Hi, > >>>> > > >>>> > We have noticed recently that no matter what batch size we use for > >>>> Metron > >>>> > indexing feeds, as long as we start using different batch size for > >>>> > different Metron feeds, indexing topology throughput will start > >>>> dropping > >>>> > due to the high error rate! So I was wondering whether based on the > >>>> current > >>>> > indexing topology design, we have to choose the same batch size for > >>>> all the > >>>> > feeds or not. Otherwise, throughout will be dropped. I assume since > >>>> it is > >>>> > acceptable to use different batch sizes for different feeds, it is > not > >>>> > expected by design. > >>>> > > >>>> > Moreover, I have noticed in practice that even if we change the > batch > >>>> > size, it will not affect the messages that are already in > enrichments > >>>> or > >>>> > indexing topics, and it will only affect the new messages that are > >>>> coming > >>>> > to the parser. Therefore, we need to let all the messages pass the > >>>> indexing > >>>> > topology so that we can change the batch size! > >>>> > > >>>> > It would be great if we can have more details regarding the design > of > >>>> this > >>>> > section so we can understand our observations are based on the > design > >>>> or > >>>> > some kind of bug. > >>>> > > >>>> > Regards, > >>>> > Ali > >>>> > > >>>> > >>>> > >>>> > >>>> -- > >>>> A.Nazemian > >>>> > >>>> > >>> > >>> > >>> -- > >>> A.Nazemian > >>> > >>> > >>> > >> > >> > >> -- > >> A.Nazemian > >> > >> > > > > > > -- > > A.Nazemian > > > > > > > -- > A.Nazemian >