[
https://issues.apache.org/jira/browse/HUDI-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Teng Huo updated HUDI-4384:
---------------------------
Attachment: Screenshot 2022-07-12 at 16.39.58.png
> Hive style partition not work and record key loss prefix using ComplexKey in
> bulk_insert
> ----------------------------------------------------------------------------------------
>
> Key: HUDI-4384
> URL: https://issues.apache.org/jira/browse/HUDI-4384
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Teng Huo
> Priority: Major
> Attachments: Screenshot 2022-07-12 at 16.39.58.png
>
>
> When using bulk_insert in 0.11.1,
> "hoodie.datasource.write.hive_style_partitioning" won't work
> When using "org.apache.hudi.keygen.ComplexKeyGenerator", there is no prefix
> in column "_hoodie_record_key"
> There is a Gitlab issue reported: https://github.com/apache/hudi/issues/6070
> And we can reproduce this bug with code
> {code:java}
> def main(args: Array[String]): Unit = {
> val avroSchema = new Schema.Parser().parse(new
> File("~/hudi/docker/demo/config/schema.avsc"))
> val schema =
> SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
> val df =
> spark.read.schema(schema).json("file://~/hudi/docker/demo/data/batch_1.json")
> val options = Map (
> "hoodie.datasource.write.keygenerator.class" ->
> "org.apache.hudi.keygen.ComplexKeyGenerator",
> "hoodie.bulkinsert.sort.mode" -> "GLOBAL_SORT",
> "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
> "hoodie.datasource.write.precombine.field" -> "ts",
> "hoodie.datasource.write.recordkey.field" -> "key",
> "hoodie.datasource.write.partitionpath.field" -> "year",
> "hoodie.datasource.write.hive_style_partitioning" -> "true",
> "hoodie.datasource.hive_sync.enable" -> "false",
> "hoodie.datasource.hive_sync.partition_fields" -> "year",
> "hoodie.datasource.hive_sync.partition_extractor_class" ->
> "org.apache.hudi.hive.MultiPartKeysValueExtractor"
> )
> bulkInsert(df, options)
> insert(df, options)
> }
> def bulkInsert(df: DataFrame, options: Map[String, String]): Unit = {
> val allOptions: Map[String, String] = options ++ Map (
> "hoodie.datasource.write.operation" -> "bulk_insert",
> "hoodie.table.name" -> "test_hudi_bulk_table"
> )
> df.write.format("hudi")
> .options(allOptions)
> .mode(SaveMode.Overwrite)
> .save("file://~/test_hudi_bulk_table")
> }
> def insert(df: DataFrame, options: Map[String, String]): Unit = {
> val allOptions: Map[String, String] = options ++ Map (
> "hoodie.datasource.write.operation" -> "insert",
> "hoodie.table.name" -> "test_hudi_insert_table"
> )
> df.write.format("hudi")
> .options(allOptions)
> .mode(SaveMode.Overwrite)
> .save("file://~/test_hudi_insert_table")
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)