davehagman edited a comment on issue #3733:
URL: https://github.com/apache/hudi/issues/3733#issuecomment-933538622


   More details on this issue. The root cause is a when a single batch of 
records results in a large number of partitions being scanned for index lookup 
(for record de-duplication). 
   
   For example, our data is partitioned by `year/month/day/hour`.  Normally an 
entire batch of records "matches" only 1-8 partitions depending on the 
timestamp of the event (they are usually all very recent and fit into several 
hourly partitions). 
   
   The issue here is that a process was kicked off which loaded very old 
records into the kafka stream which resulted in many more partitions being 
involved in the batch (100's of not thousands of partitions in a single batch 
of records). 
   
   Example ideal case within a single batch:
   ```
   Record 1: year=2021/month=10/day=04/hour=11
   Record 2: year=2021/month=10/day=04/hour=11
   Record 3: year=2021/month=10/day=04/hour=10
   Record 4: year=2021/month=10/day=04/hour=10
   Record 5: year=2021/month=10/day=04/hour=10
   ...
   ``` 
   As you can see this is ideal because very few partitions that are evaluated 
within this batch.
   
   Here is an example of a much worse dataset where we have recent data mixed 
in with a lot of old data:
   ```
   Record 1: year=2020/month=1/day=04/hour=04
   Record 2: year=2021/month=10/day=13/hour=11
   Record 3: year=2020/month=06/day=20/hour=22
   Record 4: year=2019/month=07/day=19/hour=05
   Record 5: year=2018/month=02/day=23/hour=18
   ...
   ```
   Our record batch size max is `20Million` so this quickly explodes to many 
partitions which results in very slow index comparison times (the step in the 
insert process which checks the bloom filter based on the record key). 
   
   My main question at the moment is, should I expect very poor performance 
when the incoming data contains many different partition paths? The code that 
actually evaluates the records against the bloom filter lives here:
   
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java#L78
   
   The reason it can be slow is because we iterate over each record serially 
(`while (inputItr.hasNext())`) and check the bloom filter. This is within a 
pre-sorted RDD where the records are ordered by record key field. Would a 
batched/async operation be possible here (at least within a given partition)? I 
could try to PR something to speed up this part of the process. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to