[ 
https://issues.apache.org/jira/browse/HUDI-298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-298:
--------------------------------
    Labels: pull-request-available  (was: )

> Upsert MOR table but got a NULL value
> -------------------------------------
>
>                 Key: HUDI-298
>                 URL: https://issues.apache.org/jira/browse/HUDI-298
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>            Reporter: Wenning Ding
>            Assignee: Udit Mehrotra
>            Priority: Major
>              Labels: pull-request-available
>
> I ran into this issue when I tried to update a record in a MOR table.
> I found that this issue depends on the order of columns. If the partition key 
> is the last column, then it will work normally. But if the partition key is 
> not the last column, then it will return NULL for the updated field.
> Here is a code example:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveModeval 
> df = Seq(
>   ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
>   ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option("hoodie.compact.inline", "false")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
>  
> Then I looked into the hive table, the event name is updated in the _rt table 
> as expected:
> {code:java}
> hive> select * from hudi_events_mor_1 where event_id="104";
> OK
> 20191008184508        20191008184508_1_4      104     type1   
> ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0_1-4-8_20191008184508.parquet     104   
>   event_name_123  2015-01-01T12:15:00.512679Z     type1
> hive> select * from hudi_events_mor_1_rt where event_id="104";
> OK
> 20191008184558        20191008184558_0_1      104     type1   
> ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0  104     event_name_changed      
> 2015-01-01T12:15:00.512679Z     type1
> {code}
>  
> But if I do the exact same thing for another dataframe with different column 
> order:
> {code:java}
> val df = Seq(
>   ("100", "type1", "event_name_900", "2015-01-01T13:51:39.340396Z"),
>   ("101", "type2", "event_name_546", "2015-01-01T12:14:58.597216Z"),
>   ("104", "type1", "event_name_123", "2015-01-01T12:15:00.512679Z"),
>   ("105", "type2", "event_name_678", "2015-01-01T13:51:42.248818Z")
>   ).toDF("event_id", "event_type", "event_name", "event_ts"){code}
> And I also selected event_type as my partition key (this time the partition 
> key column is not the last column)
> Then I looked into the hive table:
> {code:java}
> hive> select * from hudi_events_mor_2 where event_id="104";
> OK
> 20191008185116        20191008185116_1_2      104     type1   
> 23e3c091-f414-4fce-a51f-70cc012d7af2-0_1-4-13_20191008185116.parquet    104   
>   event_name_123  2015-01-01T12:15:00.512679Z     type1
> hive> select * from hudi_events_mor_2_rt where event_id="104";
> OK
> 20191008185240        20191008185240_0_1      104     type1   
> 23e3c091-f414-4fce-a51f-70cc012d7af2-0  104     NULL    event_name_changed    
>   type1
> {code}
> You can see the *NULL* value in the _rt table and also the order of column is 
> messed up.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to