Wenning Ding created HUDI-298:
---------------------------------

             Summary: Upsert MOR table but got a NULL value
                 Key: HUDI-298
                 URL: https://issues.apache.org/jira/browse/HUDI-298
             Project: Apache Hudi (incubating)
          Issue Type: Bug
            Reporter: Wenning Ding


I ran into this issue when I tried to update a record in a MOR table.

I found that this issue depends on the order of columns. If the partition key 
is the last column, then it will work normally. But if the partition key is not 
the last column, then it will return NULL for the updated field.

Here is a code example:
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveModeval 

df = Seq(
  ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
  ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
  ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
  ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_type")

var tableName = "hudi_events_mor_1"
var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName

// write hudi dataset
df.write.format("org.apache.hudi")
  .option(HoodieWriteConfig.TABLE_NAME, tableName)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
  .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
  .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
  .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
  .mode(SaveMode.Overwrite)
  .save(tablePath)

// update a record with event_name "event_name_123" => "event_name_changed"
val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
val df2 = df1.filter($"event_id" === "104")
val df3 = df2.withColumn("event_name", lit("event_name_changed"))

// update hudi dataset
df3.write.format("org.apache.hudi")
   .option(HoodieWriteConfig.TABLE_NAME, tableName)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
   .option("hoodie.compact.inline", "false")
   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
   .mode(SaveMode.Append)
   .save(tablePath)
{code}
 

Then I looked into the hive table, the event name is updated in the _rt table 
as expected:
{code:java}
hive> select * from hudi_events_mor_1 where event_id="104";
OK
20191008184508  20191008184508_1_4      104     type1   
ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0_1-4-8_20191008184508.parquet     104     
event_name_123  2015-01-01T12:15:00.512679Z     type1

hive> select * from hudi_events_mor_1_rt where event_id="104";
OK
20191008184558  20191008184558_0_1      104     type1   
ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0  104     event_name_changed      
2015-01-01T12:15:00.512679Z     type1
{code}
 

But if I do the exact same thing for another dataframe with different column 
order:
{code:java}
val df = Seq(
  ("100", "type1", "event_name_900", "2015-01-01T13:51:39.340396Z"),
  ("101", "type2", "event_name_546", "2015-01-01T12:14:58.597216Z"),
  ("104", "type1", "event_name_123", "2015-01-01T12:15:00.512679Z"),
  ("105", "type2", "event_name_678", "2015-01-01T13:51:42.248818Z")
  ).toDF("event_id", "event_type", "event_name", "event_ts"){code}
And I also selected event_type as my partition key (this time the partition key 
column is not the last column)

Then I looked into the hive table:
{code:java}
hive> select * from hudi_events_mor_2 where event_id="104";
OK
20191008185116  20191008185116_1_2      104     type1   
23e3c091-f414-4fce-a51f-70cc012d7af2-0_1-4-13_20191008185116.parquet    104     
event_name_123  2015-01-01T12:15:00.512679Z     type1

hive> select * from hudi_events_mor_2_rt where event_id="104";
OK
20191008185240  20191008185240_0_1      104     type1   
23e3c091-f414-4fce-a51f-70cc012d7af2-0  104     NULL    event_name_changed      
type1
{code}
You can see the *NULL* value in the _rt table and also the order of column is 
messed up.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to