conanxjp opened a new issue #3324:
URL: https://github.com/apache/hudi/issues/3324


   **Describe the problem you faced**
   We are trying to POC Hudi with our existing streaming job which consumes 
from AWS Kinesis Data Stream, and deliver as parquet in S3. The job is running 
continuously in an AWS EMR cluster. With the EMR size, and input rate of 70 
record/s, the streaming job should be able to process above 200 record/s. But 
when output as Hudi, it can only process ~ 15 record/s.
   
   What have I tried:
   
   turn off compaction with `"hoodie.parquet.small.file.limit" -> "0"`
   use `insert` instead of `upsert`
   use SIMPLE INDEX with `"hoodie.index.type" -> "SIMPLE"`
   
   Didn't find any effects from the above setting, so I used desired 
options(listed in the end), and I did some investigations to isolate the 
operation that causes the delay in my code.
   
   I am using structured streaming with a batch function (1 min interval), the 
batch function does the following:
   ```
   df.withColumn("parsed_data", from_json(col("data"), schema))
     .select("parsed_data.*")
     .withColumn("date", col("timestamp").cast(DateType))
     .write
     .format("org.apache.hudi")
     .options(hudiOptions)
     .options(compactOptions)
     .mode(Append)
     .save(outputPath)
   ```
   The `parsed_data` contains ~ 1000 columns, and the `select("parsed_data.*")` 
is just to move all columns up a level.
   
   But with my tests, the number of columns projected is clearly the bottle 
neck of the slow performance. For example, if I do this, I can get the over 200 
r/s processing rate.
   ```
   df.withColumn("parsed_data", from_json(col("data"), schema))
     .withColumn("date", col("parsed_data.timestamp").cast(DateType))
     .select("parsed_data.uuid", "parsed_data.timestamp", "date")
     .write
     .format("org.apache.hudi")
     .options(hudiOptions)
     .options(compactOptions)
     .mode(Append)
     .save(outputPath)
   ```
   
   If I do an explicit select and controls the number of columns selected, I 
can clearly observe the difference in performance:
   
   500 columns ~ 37 record/s
   250 columns ~ 68 record/s
   128 columns ~ 104 record/s
   64 columns ~ 160 record/s
   
   Roughly an O(n) performance with the number of columns selected. (deviation 
from ideal linear correlation should be from the constant testing input rates 
and overhead).
   
   This behavior is not normal, and I also did this:
   ```
   df.withColumn("parsed_data", from_json(col("data"), schema))
     .withColumn("date", col("parsed_data.timestamp").cast(DateType))
     .select("parsed_data.uuid", "parsed_data.timestamp", "date", "parsed_data")
     .write
     .format("org.apache.hudi")
     .options(hudiOptions)
     .options(compactOptions)
     .mode(Append)
     .save(outputPath)
   ```
   It can also performs well even with the columns nested in the `parsed_data` 
column. And the existing version without Hudi do the `select *` without 
performance issue as well.
   
   Here are the DAG of the slow stage(5)
   
   ![Screen Shot 2021-07-21 at 6 57 39 
PM](https://user-images.githubusercontent.com/25044067/126570361-86369ed3-3b4e-4a60-9a8c-c21df4bd4e31.png)
   
   And the slow job(13)
   ![Screen Shot 2021-07-21 at 3 32 44 
PM](https://user-images.githubusercontent.com/25044067/126570380-2b09a13a-0fc7-4a1f-8d21-c0dcd5f3927f.png)
   
   I have gone through most of the issues related to performance and tried 
them, so far didn't find anything that can fix this.
   Please help to give me some advice on what could be the reason for this 
performance issue.
   
   Thank you!
   
   
   Here are some configs:
   
   ```
   val hudiOptions: Map[String, String] = Map(
     TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
     RECORDKEY_FIELD_OPT_KEY -> "uuid",
     PARTITIONPATH_FIELD_OPT_KEY -> "date",
     PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
     OPERATION_OPT_KEY -> "upsert",
     TABLE_NAME -> "hudi_test",
     "hoodie.upsert.shuffle.parallelism"->  "6",
     "hoodie.insert.shuffle.parallelism"-> "6",
     "hoodie.bulkinsert.shuffle.parallelism"-> "6",
     "hoodie.parquet.small.file.limit" -> "104857600",
     "hoodie.index.type" -> "BLOOM"
   )
   ```
   
   ```
   val compactOptions: Map[String, String] = Map(
     "hoodie.compact.inline.max.delta.commits" -> "5",
     "hoodie.cleaner.commits.retained" -> "4",
     "hoodie.cleaner.fileversions.retained" -> "4",
     "hoodie.keep.min.commits" -> "5",
     "hoodie.keep.max.commits" -> "6",
     "hoodie.parquet.compression.codec" -> "snappy"
   )
   ```
   
   ```
   --deploy-mode cluster \
   --num-executors 7 \
   --executor-cores 1 \
   --executor-memory 2g \
   --conf spark.memory.fraction=0.1 \
   --conf spark.executor.memoryOverhead=1024 \
   --conf spark.dynamicAllocation.enabled=true \
   --conf spark.dynamicAllocation.executorAllocationRatio=1 \
   --conf spark.dynamicAllocation.executorIdleTimeout=200s \
   --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=200s \
   --conf spark.sql.hive.convertMetastoreParquet=false \
   ```
   
   
   **Environment Description**
   
   * Hudi version : 0.6.0 or 0.7.0
   
   * Spark version : 3.0.1 or 3.1.1
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to