conanxjp opened a new issue #3324:
URL: https://github.com/apache/hudi/issues/3324
**Describe the problem you faced**
We are trying to POC Hudi with our existing streaming job which consumes
from AWS Kinesis Data Stream, and deliver as parquet in S3. The job is running
continuously in an AWS EMR cluster. With the EMR size, and input rate of 70
record/s, the streaming job should be able to process above 200 record/s. But
when output as Hudi, it can only process ~ 15 record/s.
What have I tried:
turn off compaction with `"hoodie.parquet.small.file.limit" -> "0"`
use `insert` instead of `upsert`
use SIMPLE INDEX with `"hoodie.index.type" -> "SIMPLE"`
Didn't find any effects from the above setting, so I used desired
options(listed in the end), and I did some investigations to isolate the
operation that causes the delay in my code.
I am using structured streaming with a batch function (1 min interval), the
batch function does the following:
```
df.withColumn("parsed_data", from_json(col("data"), schema))
.select("parsed_data.*")
.withColumn("date", col("timestamp").cast(DateType))
.write
.format("org.apache.hudi")
.options(hudiOptions)
.options(compactOptions)
.mode(Append)
.save(outputPath)
```
The `parsed_data` contains ~ 1000 columns, and the `select("parsed_data.*")`
is just to move all columns up a level.
But with my tests, the number of columns projected is clearly the bottle
neck of the slow performance. For example, if I do this, I can get the over 200
r/s processing rate.
```
df.withColumn("parsed_data", from_json(col("data"), schema))
.withColumn("date", col("parsed_data.timestamp").cast(DateType))
.select("parsed_data.uuid", "parsed_data.timestamp", "date")
.write
.format("org.apache.hudi")
.options(hudiOptions)
.options(compactOptions)
.mode(Append)
.save(outputPath)
```
If I do an explicit select and controls the number of columns selected, I
can clearly observe the difference in performance:
500 columns ~ 37 record/s
250 columns ~ 68 record/s
128 columns ~ 104 record/s
64 columns ~ 160 record/s
Roughly an O(n) performance with the number of columns selected. (deviation
from ideal linear correlation should be from the constant testing input rates
and overhead).
This behavior is not normal, and I also did this:
```
df.withColumn("parsed_data", from_json(col("data"), schema))
.withColumn("date", col("parsed_data.timestamp").cast(DateType))
.select("parsed_data.uuid", "parsed_data.timestamp", "date", "parsed_data")
.write
.format("org.apache.hudi")
.options(hudiOptions)
.options(compactOptions)
.mode(Append)
.save(outputPath)
```
It can also performs well even with the columns nested in the `parsed_data`
column. And the existing version without Hudi do the `select *` without
performance issue as well.
Here are the DAG of the slow stage(5)

And the slow job(13)

I have gone through most of the issues related to performance and tried
them, so far didn't find anything that can fix this.
Please help to give me some advice on what could be the reason for this
performance issue.
Thank you!
Here are some configs:
```
val hudiOptions: Map[String, String] = Map(
TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
RECORDKEY_FIELD_OPT_KEY -> "uuid",
PARTITIONPATH_FIELD_OPT_KEY -> "date",
PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
OPERATION_OPT_KEY -> "upsert",
TABLE_NAME -> "hudi_test",
"hoodie.upsert.shuffle.parallelism"-> "6",
"hoodie.insert.shuffle.parallelism"-> "6",
"hoodie.bulkinsert.shuffle.parallelism"-> "6",
"hoodie.parquet.small.file.limit" -> "104857600",
"hoodie.index.type" -> "BLOOM"
)
```
```
val compactOptions: Map[String, String] = Map(
"hoodie.compact.inline.max.delta.commits" -> "5",
"hoodie.cleaner.commits.retained" -> "4",
"hoodie.cleaner.fileversions.retained" -> "4",
"hoodie.keep.min.commits" -> "5",
"hoodie.keep.max.commits" -> "6",
"hoodie.parquet.compression.codec" -> "snappy"
)
```
```
--deploy-mode cluster \
--num-executors 7 \
--executor-cores 1 \
--executor-memory 2g \
--conf spark.memory.fraction=0.1 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.executorAllocationRatio=1 \
--conf spark.dynamicAllocation.executorIdleTimeout=200s \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=200s \
--conf spark.sql.hive.convertMetastoreParquet=false \
```
**Environment Description**
* Hudi version : 0.6.0 or 0.7.0
* Spark version : 3.0.1 or 3.1.1
* Hive version : 3.1.2
* Hadoop version : 3.2.1
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]