baisui1981 opened a new issue, #5465:
URL: https://github.com/apache/hudi/issues/5465

   **Describe the problem you faced**
   
   i have make a hudi hive table sink using Flink StreamApi reference from demo 
[HoodieFlinkStreamer.java](https://github.com/apache/hudi/blob/799c78e6888ef6a375c6779c3cfe7067756d4be9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java)
   
   but found the final hive table partition name assiged value is illegal, 
seems that have ignore the config value `hiveSyncPartitionFields` which is 
setted on the instance of `FlinkStreamerConfig`
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   by building instance of `FlinkStreamerConfig`  as below:
   ``` java
       cfg.sourceAvroSchemaPath = 
"hdfs://namenode/user/admin/default/20220429173545/base/meta/schema.avsc"    
       cfg.targetBasePath = 
"hdfs://namenode/user/admin/default/20220429173545/base/hudi"    
       cfg.targetTableName = "base"    
       cfg.tableType = "COPY_ON_WRITE"    
       cfg.preCombine = true    
       cfg.sourceOrderingField = "update_date"    
       cfg.recordKeyField = "base_id"    
       cfg.keygenType = "TIMESTAMP"    
       cfg.partitionPathField = "start_time"    
       cfg.hiveSyncPartitionFields = "pt"    
       cfg.hiveSyncPartitionExtractorClass = 
"org.apache.hudi.hive.SlashEncodedHourPartitionValueExtractor"    
       cfg.setString("hoodie.deltastreamer.keygen.timebased.timestamp.type" , 
"EPOCHMILLISECONDS")    
       cfg.setString("hoodie.deltastreamer.keygen.timebased.output.dateformat" 
, "yyyy/MM/dd/HH")    
       cfg.setString("hoodie.deltastreamer.keygen.timebased.timezone" , 
"Asia/Shanghai")    
       cfg.writeRateLimit = 200l    
       cfg.hiveSyncEnabled = true    
       cfg.hiveSyncDb = "default"    
       cfg.hiveSyncTable = "base"    
       cfg.hiveSyncMode = "hms"    
       cfg.hiveSyncMetastoreUri = "thrift://192.168.28.201:9083"    
       cfg.payloadClassName = 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"    
       cfg.compactionTargetIo = 512000    
       cfg.compactionTriggerStrategy = "num_commits"    
       cfg.compactionDeltaCommits = 5    
       cfg.compactionDeltaSeconds = 3600    
       cfg.cleanAsyncEnabled = true    
       cfg.cleanRetainCommits = 10    
       cfg.archiveMinCommits = 20    
       cfg.archiveMaxCommits = 30    
   ```
   
   the key config property is `cfg.hiveSyncPartitionFields = "pt"  `
   
   After launch the Flink job , and after the process of checkpoint , found the 
Hive table struct ( by `desc base`) as below:
   
   ```
   
+--------------------------+-----------------------+-----------------------+--+
   |         col_name         |       data_type       |        comment        |
   
+--------------------------+-----------------------+-----------------------+--+
   | _hoodie_commit_time      | string                |                       |
   | _hoodie_commit_seqno     | string                |                       |
   | _hoodie_record_key       | string                |                       |
   | _hoodie_partition_path   | string                |                       |
   | _hoodie_file_name        | string                |                       |
   | base_id                  | int                   |                       |
   | start_time               | bigint                |                       |
   | update_date              | date                  |                       |
   | update_time              | bigint                |                       |
   | price                    | decimal(5,2)          |                       |
   | json_content             | string                |                       |
   | col_blob                 | binary                |                       |
   | col_text                 | string                |                       |
   | start_time              | string                |                       |
   |                          | NULL                  | NULL                  |
   | # Partition Information  | NULL                  | NULL                  |
   | # col_name               | data_type             | comment               |
   |                          | NULL                  | NULL                  |
   | start_time                 | string                |                       
|
   
+--------------------------+-----------------------+-----------------------+--+
   ```
   
   table `base` create table ddl:
   ``` sql
   CREATE TABLE `base` (
     `base_id` int(11) NOT NULL ,
     `start_time` datetime DEFAULT NULL,
     `update_date` date DEFAULT NULL,
     `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
     `price` decimal(5,2) DEFAULT NULL,
     `json_content` json DEFAULT NULL,
     `col_blob` blob,
     `col_text` text,
     PRIMARY KEY (`base_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8
   ```
   
   **Expected behavior**
   as my expection the partition name of  hive table shall be 'pt' ,show as 
below:
   ```
   
+--------------------------+-----------------------+-----------------------+--+
   |         col_name         |       data_type       |        comment        |
   
+--------------------------+-----------------------+-----------------------+--+
   | _hoodie_commit_time      | string                |                       |
   | _hoodie_commit_seqno     | string                |                       |
   | _hoodie_record_key       | string                |                       |
   | _hoodie_partition_path   | string                |                       |
   | _hoodie_file_name        | string                |                       |
   | base_id                  | int                   |                       |
   | start_time               | bigint                |                       |
   | update_date              | date                  |                       |
   | update_time              | bigint                |                       |
   | price                    | decimal(5,2)          |                       |
   | json_content             | string                |                       |
   | col_blob                 | binary                |                       |
   | col_text                 | string                |                       |
   | pt                       | string                |                       |
   |                          | NULL                  | NULL                  |
   | # Partition Information  | NULL                  | NULL                  |
   | # col_name               | data_type             | comment               |
   |                          | NULL                  | NULL                  |
   | pt                       | string                |                       |
   
+--------------------------+-----------------------+-----------------------+--+
   ```
   
   And I have check the hudi source code , found the key point is 
https://github.com/apache/hudi/blob/eef3f9c74acfe0ebec77694044b416696cfc7c2d/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java#L86
   
   it is seems that `hiveSyncConfig.partitionFields` value assigned from 
`FilePathUtils.extractPartitionKeys(conf)` is an error?
   
   then fix the  assigment to :
   
   ``` java
   hiveSyncConfig.partitionFields = 
   
Arrays.asList(org.apache.hadoop.util.StringUtils.split(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)));
   ```
   and repack the relevant jar package, relaunch the flink job , seems take 
effect 
   
   **Environment Description**
   
   * Hudi version : 0.10.1
   
   * Spark version :
   
   * Hive version : 2.3.1
   
   * Hadoop version : 2.7.3
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :yes
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to