Lujun-WC commented on issue #8391:
URL: https://github.com/apache/hudi/issues/8391#issuecomment-1499897104
object DwdFoo6Order {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.config("spark.debug.maxToStringFields", "500")
.config("spark.sql.debug.maxToStringFields", "500")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("hive.exec.dynamic.partition", true)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val foo6KafkaDF: Dataset[Row] = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.10.10.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 1500000L)
.option("failOnDataLoss", value = false)
.load()
val query = foo6KafkaDF
.writeStream
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime(s"300 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
//df processing
val df = ...
df.write.format("org.apache.hudi")
.option(TBL_NAME.key(), hudiTable)
.option(TABLE_TYPE.key(), HudiTable.COW)
.option(OPERATION.key(), HudiWriteOpts.UPSERT)
.option(RECORDKEY_FIELD.key(), "order_id")
.option(PRECOMBINE_FIELD.key(), "sort_key")
.option(PARTITIONPATH_FIELD.key(), "cdt,data_source")
.option(WRITE_PAYLOAD_CLASS_NAME.key(),
classOf[DefaultHoodieRecordPayload].getName)
.option("hoodie.write.markers.type", "direct")
.option("hoodie.index.type", "BLOOM")
.option("hoodie.datasource.write.hive_style_partitioning", "true")
.option("hoodie.insert.shuffle.parallelism", "150")
.option("hoodie.bulkinsert.shuffle.parallelism", "150")
.option("hoodie.upsert.shuffle.parallelism", "150")
.option("hoodie.delete.shuffle.parallelism", "150")
.mode(SaveMode.Append).save(basePath)
batchDF.unpersist()
()
}.start()
query.awaitTermination()
}
}
spark submit:
spark-submit --class com.example.DwdFoo6Order \
--master yarn --deploy-mode cluster \
--num-executors 4 --executor-cores 2 --executor-memory 8G \
--packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.kafka:kafka-clients:2.8.0,com.alibaba:druid:1.2.6,mysql:mysql-connector-java:5.1.34
\
./hudi-test.jar
existing data size:
cdt data_source count(1)
2023-03-16 foo6_standard 2
2023-03-17 foo6_standard 2
2023-03-18 foo6_standard 3
2023-03-19 foo6_standard 3
2023-03-20 foo6_standard 1
2023-03-23 foo6_standard 3
2023-03-24 foo6_standard 2
2023-03-25 foo6_standard 1
2023-03-26 foo6_standard 12
2023-03-27 foo6_standard 12
2023-03-28 foo6_standard 41
2023-03-29 foo6_standard 70
2023-03-30 foo6_standard 88
2023-03-31 foo6_standard 301
2023-04-01 foo6_standard 1613
2023-04-02 foo6_standard 1828818
2023-04-03 foo6_standard 1567815
2023-04-04 foo6_standard 2269541
2023-04-05 foo6_standard 2884449
2023-04-06 foo6_standard 1933243
2023-04-07 foo6_standard 304502
The RECORDKEY_FIELD is order_id, which is a self-incrementing long type id.
The cdt represents the creation time of the order, and the creation time for
the same order_id stays constant.
In each data batch (containing less than 100,000 records), 90% of the data
belongs to the current day's partition, while 10% of the data updates the older
partition
The same data takes just 0.1 minutes to write to Hive, but writing to Hudi
is unexpectedly slow. What could be the reason for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]