MikeBuh opened a new issue #3751:
URL: https://github.com/apache/hudi/issues/3751
Hello,
We would like your assistance in order to better understand how to tweak our
current setup to achieve better performance. Please find our case scenario
below:
We are in the process of building a data lake using Hudi in order to allow
us to have updated records with near real time availability. The source data
resides on S3 and consists of multiple small files in Avro format, in essence
there is a file for each Kafka message that is sent to us from an external
source. To process the data and persist it to Hudi we have a Spark application
running on EMR which consumes the data via structured streaming and does some
basic filtering and conversions on it before performing an UPSERT operation.
Reading the data into the application, the schema is obtained through
Confluent schema registry and AvroDeserializer.
`val inputDF =
spark.readStream.format("avro").schema(schema).load(s"$dataSource/$topicName/")`
After this some basic processing is performed to remove some of the fields
and compute the key, calendardate, and eventtime from the input message.
Eventually data is written to Hudi in the following way:
```
val hudiOptions: Map[String, String] = Map(
HoodieWriteConfig.TABLE_NAME -> s"hudi_$topicName",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "calendardate",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "eventtime",
HoodieWriteConfig.UPSERT_PARALLELISM -> "10",
HoodieWriteConfig.INSERT_PARALLELISM -> "10"
)
val hudiCompactOptions: Map[String, String] = Map(
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES -> "10485760",
HoodieCompactionConfig.INLINE_COMPACT_PROP -> "true",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP -> "15",
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP -> "12",
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP -> "15",
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP -> "18"
)
val processBatch: (DataFrame, Long) => Unit = (df,_) => {
df.write
.format("org.apache.hudi")
.options(hudiOptions)
.options(hudiCompactOptions)
.mode(SaveMode.Append)
.save(s"$destination/$topicName")
}
df.writeStream.trigger(Trigger.ProcessingTime("120
seconds")).foreachBatch(processBatch).start()
```
Execution Performance
> Data Input: 40GB across 650K files
> Hardware: 1 x Master 4C 32G | 4 x Core 4C 32G
> Allocated Resources: driver-memory 8g | executor-memory 4g |
executor-cores 2 | num-executors 2
> Execution Time: 2hrs+
At the time being and following the above, we have reason to believe that
higher performance could be achieved by tweaking various parameters in order to
better suite the data input.
Last but not least, an instance of this app will run for each topic that we
have and the heavier ones have around 6GB of data each day split between 95K
files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]