[ 
https://issues.apache.org/jira/browse/HUDI-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949708#comment-16949708
 ] 

Udit Mehrotra commented on HUDI-298:
------------------------------------

This is an issue with how on-the-fly merge is being performed between the 
_*Log*_ file and the base _*Parquet*_ file. I am working on the bug fix, hence 
assigning it to myself.

Here is the analysis:
 * This where the new merged record, is being copied on-the-fly to the original 
record to provide the updated results: 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java#L112]
 * Here the *originalValue* i.e. the Original ArrayWritable is coming from the 
actual _*ParquetReader*_ which is returning the Array Writable where columns 
are ordered based on _*Hive Schema*_ i.e. the order in which they appear in the 
Hive Table.
 * Now Hive's column order is not the same, as the order in which the columns 
appear in the _*Parquet File Schema*_. Hive moves over the partition columns 
towards the end, and hence they appear last in the list. Thus in the 
*originalValue* the partition columns values appear at the end.
 * Now in the *replaceValue* Array Writable which is to replace the 
*originalValue* we are forming it based on ***writerSchema* which is coming 
from the parquet file itself. Now in the *writerSchema* the partition column 
will appear in original position, and not necessarily at the end. 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java#L104]
 * Thus the columns in the *originalValue* and *replaceValue* do not match, and 
hence wrong values get copied in wrong columns, resulting in incorrect results.
 * I believe this bug has been undiscovered so far, because in the demo 
datasets the partition columns are always at the end. Thus, this mismatched 
ordering was not causing issues till now. But as soon as you have partition 
columns somewhere in the middle of your dataset it will break RT tables.

I am working on a fix to get the actual *Hive Schema* and form *replaceValue* 
based on that schema so that the values are in the same order as 
*originalValue*.

[~vinoth] [~vbalaji] let me know if this seems correct to you.

> Upsert MOR table but got a NULL value
> -------------------------------------
>
>                 Key: HUDI-298
>                 URL: https://issues.apache.org/jira/browse/HUDI-298
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>            Reporter: Wenning Ding
>            Assignee: Udit Mehrotra
>            Priority: Major
>
> I ran into this issue when I tried to update a record in a MOR table.
> I found that this issue depends on the order of columns. If the partition key 
> is the last column, then it will work normally. But if the partition key is 
> not the last column, then it will return NULL for the updated field.
> Here is a code example:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveModeval 
> df = Seq(
>   ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
>   ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option("hoodie.compact.inline", "false")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
>  
> Then I looked into the hive table, the event name is updated in the _rt table 
> as expected:
> {code:java}
> hive> select * from hudi_events_mor_1 where event_id="104";
> OK
> 20191008184508        20191008184508_1_4      104     type1   
> ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0_1-4-8_20191008184508.parquet     104   
>   event_name_123  2015-01-01T12:15:00.512679Z     type1
> hive> select * from hudi_events_mor_1_rt where event_id="104";
> OK
> 20191008184558        20191008184558_0_1      104     type1   
> ec3761e6-22eb-47f2-b87d-d47484ff8d5f-0  104     event_name_changed      
> 2015-01-01T12:15:00.512679Z     type1
> {code}
>  
> But if I do the exact same thing for another dataframe with different column 
> order:
> {code:java}
> val df = Seq(
>   ("100", "type1", "event_name_900", "2015-01-01T13:51:39.340396Z"),
>   ("101", "type2", "event_name_546", "2015-01-01T12:14:58.597216Z"),
>   ("104", "type1", "event_name_123", "2015-01-01T12:15:00.512679Z"),
>   ("105", "type2", "event_name_678", "2015-01-01T13:51:42.248818Z")
>   ).toDF("event_id", "event_type", "event_name", "event_ts"){code}
> And I also selected event_type as my partition key (this time the partition 
> key column is not the last column)
> Then I looked into the hive table:
> {code:java}
> hive> select * from hudi_events_mor_2 where event_id="104";
> OK
> 20191008185116        20191008185116_1_2      104     type1   
> 23e3c091-f414-4fce-a51f-70cc012d7af2-0_1-4-13_20191008185116.parquet    104   
>   event_name_123  2015-01-01T12:15:00.512679Z     type1
> hive> select * from hudi_events_mor_2_rt where event_id="104";
> OK
> 20191008185240        20191008185240_0_1      104     type1   
> 23e3c091-f414-4fce-a51f-70cc012d7af2-0  104     NULL    event_name_changed    
>   type1
> {code}
> You can see the *NULL* value in the _rt table and also the order of column is 
> messed up.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to