spyzzz opened a new issue #2193:
URL: https://github.com/apache/hudi/issues/2193


   **_Tips before filing an issue_**
   
   Hello, here is my use case : i've to read millions of message from kafka and 
then write into hudi table on hdfs.
   I use structured streaming to do it, with the option maxOffsetsPerTrigger to 
500000. 
   500K records represents 50MB 
   
   <img width="1395" alt="Capture d’écran 2020-10-21 à 14 16 22" 
src="https://user-images.githubusercontent.com/5584892/96718272-0e488800-13a8-11eb-9974-d837c9106acb.png";>
   
   
   The kafka topic got 6 partitions, so i use 6 executor core to read in //
   
   Every 'micro batch' (i set it to 120s for now, but it can be change) hudi is 
write 5 files of approx 5MB. 
   I tried multiple config to avoid writing to much small files but i didnt 
managed to make it work now.
   
   ![Capture d’écran 2020-10-21 à 14 12 
23](https://user-images.githubusercontent.com/5584892/96717867-75197180-13a7-11eb-99e1-cf554a5bfd15.png)
   
   The things is, every next batch the processing time is bigger and bigger 
because hudi has to read and parse saved data i thinks. Every batch has 50s to 
pull data from kafka (this is stable) and then the hudi work is going bigger 
and bigger, especially in the step : Obtain key ranges for file slices (range 
pruning=on)
   
   <img width="1413" alt="Capture d’écran 2020-10-20 à 10 10 45" 
src="https://user-images.githubusercontent.com/5584892/96718126-d7727200-13a7-11eb-83d0-1f899ecc5141.png";>
   
   Here is my hudi MOR configuration : 
   
   ```
        TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",
         PRECOMBINE_FIELD_OPT_KEY -> "ts_ms",
         RECORDKEY_FIELD_OPT_KEY -> table.pk,
         OPERATION_OPT_KEY -> "upsert",
         KEYGENERATOR_CLASS_OPT_KEY-> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
         TABLE_NAME_OPT_KEY -> ("hudi_" + table.name),
         "hoodie.table.name" -> ("hudi_" + table.name),
         "hoodie.upsert.shuffle.parallelism"->  "6",
         "hoodie.insert.shuffle.parallelism"-> "6",
         "hoodie.bulkinsert.shuffle.parallelism"-> "6",
         //"hoodie.parquet.small.file.limit" -> "4194304",
         //"hoodie.index.bloom.num_entries" -> "1200000",
         "hoodie.bulkinsert.sort.mode" -> "NONE"
         "hoodie.compact.inline" -> "true",
         "hoodie.compact.inline.max.delta.commits" -> "10",
         "hoodie.cleaner.commits.retained" -> "10",
         "hoodie.cleaner.fileversions.retained" -> "10",
         "hoodie.keep.min.commits" -> "12",
         "hoodie.keep.max.commits" -> "13"
         //"hoodie.clean.async" -> "false",
         //"hoodie.clean.automatic" ->"true",
         //"hoodie.parquet.compression.codec" -> "snappy"
   ```
   
   Thanks for reading.
   
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : 2.4.6
   
   * Hive version : 1.2
   
   * Hadoop version : 2.7
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to