[ 
https://issues.apache.org/jira/browse/HUDI-8628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17902400#comment-17902400
 ] 

Jonathan Vexler commented on HUDI-8628:
---------------------------------------

I added inline clustering to the above test, and while the commit metadata 
contains the extra column, the parquet file itself does not. Additionally, on 
the next write, the commit metadata and parquet schema were correct. I tested 
with a second base file for the clustering, and that did not expose any bugs.

To add clustering, I added:
spark.sql(s"Set ${HoodieClusteringConfig.INLINE_CLUSTERING.key()} = true")
spark.sql(s"Set ${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key()} = 
2")
spark.sql(s"Set ${HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key()} = 
id,price")
 
To test multiple base files, I added 
spark.sql(s"set hoodie.parquet.small.file.limit = 0")
spark.sql(s"insert into $tableName values (4, 'a4', 10, 1000, 'a4: desc4')," +
"(5, 'a5', 20, 1200, 'a5: desc5'), (6, 'a6', 30, 1250, 'a6: desc6')")
and changed the inline clustering max commits from 2 to 3

> Merge Into is pulling in additional fields which are not set as per the 
> condition 
> ----------------------------------------------------------------------------------
>
>                 Key: HUDI-8628
>                 URL: https://issues.apache.org/jira/browse/HUDI-8628
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: spark-sql
>            Reporter: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 1.0.0
>
>         Attachments: image-2024-12-02-03-58-48-178.png
>
>
> spark.sql(s"set 
> ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> spark.sql(s"set 
> ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
> spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
> $logDataBlockFormat")
> spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
> spark.sql(
> s"""
> |create table $tableName (|
> |id int,|
> |name string,|
> |price long,|
> |_ts long,|
> |description string|
> |) using hudi|
> |tblproperties(|
> |type ='$tableType',|
> |primaryKey = 'id',|
> |preCombineField = '_ts'|
> |)|
> |location '$basePath'
> """.stripMargin)
> spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
> "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")|
>  
>  
> Merge Into:
> // Partial updates using MERGE INTO statement with changed fields: "price" 
> and "_ts"
> spark.sql(
> s"""
> |merge into $tableName t0|
> |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts|
> |union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts) s0|
> |on t0.id = s0.id|
> |when matched then update set price = s0.price, _ts = s0._ts|
> |""".stripMargin)|
>  
> The schema for this merge into command when we reach 
> HoodieSparkSqlWriter.deduceWriterSchema is given below. 
> i.e. 
> val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, 
> latestTableSchemaOpt, internalSchemaOpt, parameters)
>  
> !image-2024-12-02-03-58-48-178.png!
>  
> the merge into command only instructs to update price and _ts right? So, why 
> other fields are also picked up from source(for eg name). 
> You can check out the test in TestPartialUpdateForMergeInto.Test partial 
> update with MOR and Avro log format
>  
> Note: This is partial update support w/ MergeInto btw, not a regular 
> MergeInto.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to