baisui1981 opened a new issue, #5465: URL: https://github.com/apache/hudi/issues/5465
**Describe the problem you faced** i have make a hudi hive table sink using Flink StreamApi reference from demo [HoodieFlinkStreamer.java](https://github.com/apache/hudi/blob/799c78e6888ef6a375c6779c3cfe7067756d4be9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java) but found the final hive table partition name assiged value is illegal, seems that have ignore the config value `hiveSyncPartitionFields` which is setted on the instance of `FlinkStreamerConfig` **To Reproduce** Steps to reproduce the behavior: by building instance of `FlinkStreamerConfig` as below: ``` java cfg.sourceAvroSchemaPath = "hdfs://namenode/user/admin/default/20220429173545/base/meta/schema.avsc" cfg.targetBasePath = "hdfs://namenode/user/admin/default/20220429173545/base/hudi" cfg.targetTableName = "base" cfg.tableType = "COPY_ON_WRITE" cfg.preCombine = true cfg.sourceOrderingField = "update_date" cfg.recordKeyField = "base_id" cfg.keygenType = "TIMESTAMP" cfg.partitionPathField = "start_time" cfg.hiveSyncPartitionFields = "pt" cfg.hiveSyncPartitionExtractorClass = "org.apache.hudi.hive.SlashEncodedHourPartitionValueExtractor" cfg.setString("hoodie.deltastreamer.keygen.timebased.timestamp.type" , "EPOCHMILLISECONDS") cfg.setString("hoodie.deltastreamer.keygen.timebased.output.dateformat" , "yyyy/MM/dd/HH") cfg.setString("hoodie.deltastreamer.keygen.timebased.timezone" , "Asia/Shanghai") cfg.writeRateLimit = 200l cfg.hiveSyncEnabled = true cfg.hiveSyncDb = "default" cfg.hiveSyncTable = "base" cfg.hiveSyncMode = "hms" cfg.hiveSyncMetastoreUri = "thrift://192.168.28.201:9083" cfg.payloadClassName = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload" cfg.compactionTargetIo = 512000 cfg.compactionTriggerStrategy = "num_commits" cfg.compactionDeltaCommits = 5 cfg.compactionDeltaSeconds = 3600 cfg.cleanAsyncEnabled = true cfg.cleanRetainCommits = 10 cfg.archiveMinCommits = 20 cfg.archiveMaxCommits = 30 ``` the key config property is `cfg.hiveSyncPartitionFields = "pt" ` After launch the Flink job , and after the process of checkpoint , found the Hive table struct ( by `desc base`) as below: ``` +--------------------------+-----------------------+-----------------------+--+ | col_name | data_type | comment | +--------------------------+-----------------------+-----------------------+--+ | _hoodie_commit_time | string | | | _hoodie_commit_seqno | string | | | _hoodie_record_key | string | | | _hoodie_partition_path | string | | | _hoodie_file_name | string | | | base_id | int | | | start_time | bigint | | | update_date | date | | | update_time | bigint | | | price | decimal(5,2) | | | json_content | string | | | col_blob | binary | | | col_text | string | | | start_time | string | | | | NULL | NULL | | # Partition Information | NULL | NULL | | # col_name | data_type | comment | | | NULL | NULL | | start_time | string | | +--------------------------+-----------------------+-----------------------+--+ ``` table `base` create table ddl: ``` sql CREATE TABLE `base` ( `base_id` int(11) NOT NULL , `start_time` datetime DEFAULT NULL, `update_date` date DEFAULT NULL, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `price` decimal(5,2) DEFAULT NULL, `json_content` json DEFAULT NULL, `col_blob` blob, `col_text` text, PRIMARY KEY (`base_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ``` **Expected behavior** as my expection the partition name of hive table shall be 'pt' ,show as below: ``` +--------------------------+-----------------------+-----------------------+--+ | col_name | data_type | comment | +--------------------------+-----------------------+-----------------------+--+ | _hoodie_commit_time | string | | | _hoodie_commit_seqno | string | | | _hoodie_record_key | string | | | _hoodie_partition_path | string | | | _hoodie_file_name | string | | | base_id | int | | | start_time | bigint | | | update_date | date | | | update_time | bigint | | | price | decimal(5,2) | | | json_content | string | | | col_blob | binary | | | col_text | string | | | pt | string | | | | NULL | NULL | | # Partition Information | NULL | NULL | | # col_name | data_type | comment | | | NULL | NULL | | pt | string | | +--------------------------+-----------------------+-----------------------+--+ ``` And I have check the hudi source code , found the key point is https://github.com/apache/hudi/blob/eef3f9c74acfe0ebec77694044b416696cfc7c2d/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java#L86 it is seems that `hiveSyncConfig.partitionFields` value assigned from `FilePathUtils.extractPartitionKeys(conf)` is an error? then fix the assigment to : ``` java hiveSyncConfig.partitionFields = Arrays.asList(org.apache.hadoop.util.StringUtils.split(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS))); ``` and repack the relevant jar package, relaunch the flink job , seems take effect **Environment Description** * Hudi version : 0.10.1 * Spark version : * Hive version : 2.3.1 * Hadoop version : 2.7.3 * Storage (HDFS/S3/GCS..) :HDFS * Running on Docker? (yes/no) :yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
