[
https://issues.apache.org/jira/browse/HUDI-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16951117#comment-16951117
]
Vinoth Chandar commented on HUDI-298:
-------------------------------------
>I believe this bug has been undiscovered so far, because in the demo datasets
>the partition columns are always at the end.
Also typically atleast for data ingestion, data schema managed by a Schema
registry wont typically allow this kind of reordering. So we have been assuming
this. But the issue seems legit to me. Longer term we need to bring same kind
of avro schema evolution rules into data frame writes to fence these out.
> get the actual Hive Schema and form replaceValue based on that schema so that
> the values are in the same order as originalValue.
Is the Hive Schema gettable without making RPC calls?
> Upsert MOR table but got a NULL value
> -------------------------------------
>
> Key: HUDI-298
> URL: https://issues.apache.org/jira/browse/HUDI-298
> Project: Apache Hudi (incubating)
> Issue Type: Bug
> Reporter: Wenning Ding
> Assignee: Udit Mehrotra
> Priority: Major
>
> I ran into this issue when I tried to update a record in a MOR table.
> I found that this issue depends on the order of columns. If the partition key
> is the last column, then it will work normally. But if the partition key is
> not the last column, then it will return NULL for the updated field.
> Here is a code example:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveModeval
> df = Seq(
> ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
> ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
> ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
> ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
> ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Overwrite)
> .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option("hoodie.compact.inline", "false")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Append)
> .save(tablePath)
> {code}
>
> Then I looked into the hive table, the event name is updated in the _rt table
> as expected:
> {code:java}
> hive> select * from hudi_events_mor_1 where event_id="104";
> OK
> 20191008184508 20191008184508_1_4 104 type1
> ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0_1-4-8_20191008184508.parquet 104
> event_name_123 2015-01-01T12:15:00.512679Z type1
> hive> select * from hudi_events_mor_1_rt where event_id="104";
> OK
> 20191008184558 20191008184558_0_1 104 type1
> ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0 104 event_name_changed
> 2015-01-01T12:15:00.512679Z type1
> {code}
>
> But if I do the exact same thing for another dataframe with different column
> order:
> {code:java}
> val df = Seq(
> ("100", "type1", "event_name_900", "2015-01-01T13:51:39.340396Z"),
> ("101", "type2", "event_name_546", "2015-01-01T12:14:58.597216Z"),
> ("104", "type1", "event_name_123", "2015-01-01T12:15:00.512679Z"),
> ("105", "type2", "event_name_678", "2015-01-01T13:51:42.248818Z")
> ).toDF("event_id", "event_type", "event_name", "event_ts"){code}
> And I also selected event_type as my partition key (this time the partition
> key column is not the last column)
> Then I looked into the hive table:
> {code:java}
> hive> select * from hudi_events_mor_2 where event_id="104";
> OK
> 20191008185116 20191008185116_1_2 104 type1
> 23e3c091-f414-4fce-a51f-70cc012d7af2-0_1-4-13_20191008185116.parquet 104
> event_name_123 2015-01-01T12:15:00.512679Z type1
> hive> select * from hudi_events_mor_2_rt where event_id="104";
> OK
> 20191008185240 20191008185240_0_1 104 type1
> 23e3c091-f414-4fce-a51f-70cc012d7af2-0 104 NULL event_name_changed
> type1
> {code}
> You can see the *NULL* value in the _rt table and also the order of column is
> messed up.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)