[
https://issues.apache.org/jira/browse/HUDI-8629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913027#comment-17913027
]
Y Ethan Guo commented on HUDI-8629:
-----------------------------------
The issue is not related to partial updates. Basic Merge INTO statement fails
if the field name is differnet in the source table:
{code:java}
TestMergeIntoTable#Test MergeInto Basic
// The second Merge Into has _ts in the source table and ts in the target
table, with the assignment statement as ts = s0._ts
spark.sql("set hoodie.payload.combined.schema.validate = false")
val tableName = generateTableName
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
| primaryKey ='id',
| preCombineField = 'ts'
| )
""".stripMargin)
// test with optimized sql merge enabled / disabled.
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
// First merge with a extra input field 'flag' (insert a new record)
spark.sql(
s"""
| merge into $tableName
| using (
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag
| ) s0
| on s0.id = $tableName.id
| when matched and flag = '1' then update set
| id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
| when not matched and flag = '1' then insert *
""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)
// Second merge (update the record)
spark.sql(
s"""
| merge into $tableName
| using (
| select 1 as id, 'a1' as name, 10 as price, 1001 as _ts
| ) s0
| on s0.id = $tableName.id
| when matched then update set
| id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts =
s0._ts
| when not matched then insert *
""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 20.0, 1001)
) {code}
> MergeInto w/ Partial updates pulls in fields from source not in assignment
> clause
> ---------------------------------------------------------------------------------
>
> Key: HUDI-8629
> URL: https://issues.apache.org/jira/browse/HUDI-8629
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: sivabalan narayanan
> Assignee: Y Ethan Guo
> Priority: Blocker
> Fix For: 1.0.1
>
> Attachments: image-2024-12-02-04-07-54-483.png
>
>
> TestPartialUpdateForMergeInto.Test partial update with MOR and Avro log
> format w/ some slight changes.
>
> spark.sql(s"set
> ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> spark.sql(s"set
> ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
> spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} =
> $logDataBlockFormat")
> spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
> // Create a table with five data fields
> spark.sql(
> s"""
> |create table $tableName (
> | id int,
> | name string,
> | price long,
> | _ts long,
> | description string
> |) using hudi
> |tblproperties(
> | type ='$tableType',
> | primaryKey = 'id',
> | preCombineField = '_ts'
> |)
> |location '$basePath'
> """.stripMargin)
> spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
> "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")
>
>
> spark.sql(
> s"""
> |merge into $tableName t0
> |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts
> |union select 3 as id, 'a3' as name, 25 as price, 1260 as ts) s0
> |on t0.id = s0.id
> |when matched then update set price = s0.price, _ts = s0.ts
> |""".stripMargin)
>
>
> While executing this MergeInto statement, we modify the schema to be as
> follows.
> !image-2024-12-02-04-07-54-483.png!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)