spyzzz opened a new issue #2193: URL: https://github.com/apache/hudi/issues/2193
**_Tips before filing an issue_** Hello, here is my use case : i've to read millions of message from kafka and then write into hudi table on hdfs. I use structured streaming to do it, with the option maxOffsetsPerTrigger to 500000. 500K records represents 50MB <img width="1395" alt="Capture d’écran 2020-10-21 à 14 16 22" src="https://user-images.githubusercontent.com/5584892/96718272-0e488800-13a8-11eb-9974-d837c9106acb.png"> The kafka topic got 6 partitions, so i use 6 executor core to read in // Every 'micro batch' (i set it to 120s for now, but it can be change) hudi is write 5 files of approx 5MB. I tried multiple config to avoid writing to much small files but i didnt managed to make it work now.  The things is, every next batch the processing time is bigger and bigger because hudi has to read and parse saved data i thinks. Every batch has 50s to pull data from kafka (this is stable) and then the hudi work is going bigger and bigger, especially in the step : Obtain key ranges for file slices (range pruning=on) <img width="1413" alt="Capture d’écran 2020-10-20 à 10 10 45" src="https://user-images.githubusercontent.com/5584892/96718126-d7727200-13a7-11eb-83d0-1f899ecc5141.png"> Here is my hudi MOR configuration : ``` TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ", PRECOMBINE_FIELD_OPT_KEY -> "ts_ms", RECORDKEY_FIELD_OPT_KEY -> table.pk, OPERATION_OPT_KEY -> "upsert", KEYGENERATOR_CLASS_OPT_KEY-> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", TABLE_NAME_OPT_KEY -> ("hudi_" + table.name), "hoodie.table.name" -> ("hudi_" + table.name), "hoodie.upsert.shuffle.parallelism"-> "6", "hoodie.insert.shuffle.parallelism"-> "6", "hoodie.bulkinsert.shuffle.parallelism"-> "6", //"hoodie.parquet.small.file.limit" -> "4194304", //"hoodie.index.bloom.num_entries" -> "1200000", "hoodie.bulkinsert.sort.mode" -> "NONE" "hoodie.compact.inline" -> "true", "hoodie.compact.inline.max.delta.commits" -> "10", "hoodie.cleaner.commits.retained" -> "10", "hoodie.cleaner.fileversions.retained" -> "10", "hoodie.keep.min.commits" -> "12", "hoodie.keep.max.commits" -> "13" //"hoodie.clean.async" -> "false", //"hoodie.clean.automatic" ->"true", //"hoodie.parquet.compression.codec" -> "snappy" ``` Thanks for reading. **Environment Description** * Hudi version : 0.6.0 * Spark version : 2.4.6 * Hive version : 1.2 * Hadoop version : 2.7 * Storage (HDFS/S3/GCS..) : HDFS * Running on Docker? (yes/no) : no ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
