[ 
https://issues.apache.org/jira/browse/HUDI-8629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913027#comment-17913027
 ] 

Y Ethan Guo commented on HUDI-8629:
-----------------------------------

The issue is not related to partial updates.  Basic Merge INTO statement fails 
if the field name is differnet in the source table:
{code:java}
TestMergeIntoTable#Test MergeInto Basic
// The second Merge Into has _ts in the source table and ts in the target 
table, with the assignment statement as ts = s0._ts

spark.sql("set hoodie.payload.combined.schema.validate = false")
 val tableName = generateTableName
 // Create table
 spark.sql(
   s"""
      |create table $tableName (
      |  id int,
      |  name string,
      |  price double,
      |  ts long
      |) using hudi
      | location '${tmp.getCanonicalPath}'
      | tblproperties (
      |  primaryKey ='id',
      |  preCombineField = 'ts'
      | )
""".stripMargin)

 // test with optimized sql merge enabled / disabled.
 spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")

 // First merge with a extra input field 'flag' (insert a new record)
 spark.sql(
   s"""
      | merge into $tableName
      | using (
      |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag
      | ) s0
      | on s0.id = $tableName.id
      | when matched and flag = '1' then update set
      | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
      | when not matched and flag = '1' then insert *
""".stripMargin)
 checkAnswer(s"select id, name, price, ts from $tableName")(
   Seq(1, "a1", 10.0, 1000)
 )

 // Second merge (update the record)
 spark.sql(
   s"""
      | merge into $tableName
      | using (
      |  select 1 as id, 'a1' as name, 10 as price, 1001 as _ts
      | ) s0
      | on s0.id = $tableName.id
      | when matched then update set
      | id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts = 
s0._ts
      | when not matched then insert *
""".stripMargin)
 checkAnswer(s"select id, name, price, ts from $tableName")(
   Seq(1, "a1", 20.0, 1001)
 ) {code}

> MergeInto w/ Partial updates pulls in fields from source not in assignment 
> clause
> ---------------------------------------------------------------------------------
>
>                 Key: HUDI-8629
>                 URL: https://issues.apache.org/jira/browse/HUDI-8629
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: sivabalan narayanan
>            Assignee: Y Ethan Guo
>            Priority: Blocker
>             Fix For: 1.0.1
>
>         Attachments: image-2024-12-02-04-07-54-483.png
>
>
> TestPartialUpdateForMergeInto.Test partial update with MOR and Avro log 
> format  w/ some slight changes. 
>  
> spark.sql(s"set 
> ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> spark.sql(s"set 
> ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
> spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
> $logDataBlockFormat")
> spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
> // Create a table with five data fields
> spark.sql(
> s"""
> |create table $tableName (
> | id int,
> | name string,
> | price long,
> | _ts long,
> | description string
> |) using hudi
> |tblproperties(
> | type ='$tableType',
> | primaryKey = 'id',
> | preCombineField = '_ts'
> |)
> |location '$basePath'
> """.stripMargin)
> spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
> "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")
>  
>  
> spark.sql(
> s"""
> |merge into $tableName t0
> |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts
> |union select 3 as id, 'a3' as name, 25 as price, 1260 as ts) s0
> |on t0.id = s0.id
> |when matched then update set price = s0.price, _ts = s0.ts
> |""".stripMargin)
>  
>  
> While executing this MergeInto statement, we modify the schema to be as 
> follows. 
> !image-2024-12-02-04-07-54-483.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to