[
https://issues.apache.org/jira/browse/HUDI-8628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913437#comment-17913437
]
Davis Zhang commented on HUDI-8628:
-----------------------------------
The issue I'm able to repro is because the inline clustering uses a writer
schema that is different from the schema of the record payload.
{code:java}
spark.sql(s"set
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}
= true")
spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = parquet")
spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
spark.sql(s"Set ${HoodieClusteringConfig.INLINE_CLUSTERING.key()} = true")
spark.sql(s"Set ${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key()} =
2")
spark.sql(s"Set ${HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key()} =
id,price")
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price long,
| ts long,
| description string
| ) using hudi
| tblproperties(
| type ='$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values " +
"(1, 'a1', 10, 1000, 'a1: desc1')," +
"(2, 'a2', 20, 1200, 'a2: desc2'), " +
"(3, 'a3', 30.0, 1250, 'a3: desc3')")
// Partial updates using MERGE INTO statement with changed fields: "price" and
"ts"
spark.sql(
s"""
| merge into $tableName t0
| using (
| select 1 as id, 'a1' as name1, 12 as price, 1001 as _ts
| union
| select 3 as id, 'a3' as name1, 25 as price, 1260 as _ts
| ) s0
| on t0.id = s0.id
| when matched then update set price = s0.price, ts = s0._ts
""".stripMargin){code}
Clustering is using the table schema which contains all columns.
The payload schema only contains columns involved in the partial insert.
======
At that time the clustering happens, MIT partial update already finishes.
{code:java}
drwxr-xr-x@ 2 zhanyeha staff 64 Jan 15 11:10 history
-rw-r--r--@ 1 zhanyeha staff 0 Jan 15 11:11
20250115191100610.deltacommit.requested
-rw-r--r--@ 1 zhanyeha staff 2444 Jan 15 11:11
20250115191100610.deltacommit.inflight
-rw-r--r--@ 1 zhanyeha staff 2920 Jan 15 11:11
20250115191100610_20250115191105653.deltacommit
-rw-r--r--@ 1 zhanyeha staff 0 Jan 15 11:11
20250115191136341.deltacommit.requested
-rw-r--r--@ 1 zhanyeha staff 2536 Jan 15 11:13
20250115191136341.deltacommit.inflight
-rw-r--r--@ 1 zhanyeha staff 3051 Jan 15 11:14
20250115191136341_20250115191414196.deltacommit
-rw-r--r--@ 1 zhanyeha staff 3277 Jan 15 11:14
20250115191414256.clustering.requested
-rw-r--r--@ 1 zhanyeha staff 0 Jan 15 11:14
20250115191414256.clustering.inflight {code}
MIT commit metadata is said the schema equals to the table schema
{code:java}
➜ timeline avrocat 20250115191136341_20250115191414196.deltacommit | jq
...
"extraMetadata": {
"map": {
"schema":
"{\"type\":\"record\",\"name\":\"h1_record\",\"namespace\":\"hoodie.h1\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"price\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"description\",\"type\":[\"null\",\"string\"],\"default\":null}]}",
"_hoodie.spark.sql.merge.into.prepped": "false"
}
},{code}
There is only 1 parquet file from the initial insert and 1 log file from the MIT
{code:java}
➜ h1 ls -lrta . | grep -v crc
total 896
drwxr-xr-x@ 3 zhanyeha staff 96 Jan 15 11:10 ..
drwxr-xr-x@ 10 zhanyeha staff 320 Jan 15 11:11 .hoodie
-rw-r--r--@ 1 zhanyeha staff 96 Jan 15 11:11 .hoodie_partition_metadata
-rw-r--r--@ 1 zhanyeha staff 435270 Jan 15 11:11
66235a71-ed30-4274-9986-325b28b36399-0_0-26-42_20250115191100610.parquet
drwxr-xr-x@ 9 zhanyeha staff 288 Jan 15 11:14 .
-rw-r--r--@ 1 zhanyeha staff 3508 Jan 15 11:14
.66235a71-ed30-4274-9986-325b28b36399-0_20250115191136341.log.1_0-61-95 {code}
base file content
{code:java}
{"_hoodie_commit_time": "20250115191100610", "_hoodie_commit_seqno":
"20250115191100610_0_0", "_hoodie_record_key": "1", "_hoodie_partition_path":
"", "_hoodie_file_name":
"66235a71-ed30-4274-9986-325b28b36399-0_0-26-42_20250115191100610.parquet",
"id": 1, "name": "a1", "price": 10, "ts": 1000, "description": "a1: desc1"}
{"_hoodie_commit_time": "20250115191100610", "_hoodie_commit_seqno":
"20250115191100610_0_1", "_hoodie_record_key": "3", "_hoodie_partition_path":
"", "_hoodie_file_name":
"66235a71-ed30-4274-9986-325b28b36399-0_0-26-42_20250115191100610.parquet",
"id": 3, "name": "a3", "price": 30, "ts": 1250, "description": "a3: desc3"}
{"_hoodie_commit_time": "20250115191100610", "_hoodie_commit_seqno":
"20250115191100610_0_2", "_hoodie_record_key": "2", "_hoodie_partition_path":
"", "_hoodie_file_name":
"66235a71-ed30-4274-9986-325b28b36399-0_0-26-42_20250115191100610.parquet",
"id": 2, "name": "a2", "price": 20, "ts": 1200, "description": "a2: desc2"}
{code}
log file content (I could not avrocat it but according to the code it should be
what is mentioned below
At the time when org.apache.hudi.common.model.DefaultHoodieRecordPayload is
constructed, the underlying schema of the avro record only contain columns
involved in the previous partial update
{code:java}
:{"type":"record","name":"h1_record","namespace":"hoodie.h1","fields":[
{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},
{"name":"price","type":["null","long"],"default":null},{"name":"ts","type":["null","long"],"default":null}]}{code}
Actual record payload
{code:java}
{"_hoodie_commit_time": "20250115191136341", "_hoodie_commit_seqno":
"20250115191136341_0_1", "_hoodie_record_key": "1", "_hoodie_partition_path":
"", "_hoodie_file_name": "66235a71-ed30-4274-9986-325b28b36399-0", "price": 12,
"ts": 1001}
[2, 34, 50, 48, 50, 53, 48, 49, 49, 53, 49, 57, 49, 49, 51, 54, 51, 52, 49, 2,
42, 50, 48, 50, 53, 48, 49, 49, 53, 49, 57, 49, 49, 51, 54, 51, 52, 49, 95, 48,
95, 49, 2, 2, 49, 2, 0, 2, 76, 54, 54, 50, 51, 53, 97, 55, 49, 45, 101, 100,
51, 48, 45, 52, 50, 55, 52, 45, 57, 57, 56, 54, 45, 51, 50, 53, 98, 50, 56, 98,
51, 54, 51, 57, 57, 45, 48, 2, 24, 2, -46, 15] {"_hoodie_commit_time":
"20250115191136341", "_hoodie_commit_seqno": "20250115191136341_0_2",
"_hoodie_record_key": "3", "_hoodie_partition_path": "", "_hoodie_file_name":
"66235a71-ed30-4274-9986-325b28b36399-0", "price": 25, "ts": 1260}[2, 34, 50,
48, 50, 53, 48, 49, 49, 53, 49, 57, 49, 49, 51, 54, 51, 52, 49, 2, 42, 50, 48,
50, 53, 48, 49, 49, 53, 49, 57, 49, 49, 51, 54, 51, 52, 49, 95, 48, 95, 50, 2,
2, 51, 2, 0, 2, 76, 54, 54, 50, 51, 53, 97, 55, 49, 45, 101, 100, 51, 48, 45,
52, 50, 55, 52, 45, 57, 57, 56, 54, 45, 51, 50, 53, 98, 50, 56, 98, 51, 54, 51,
57, 57, 45, 48, 2, 50, 2, -40, 19] {code}
But at the time when record merge happens within
org.apache.hudi.common.model.DefaultHoodieRecordPayload#combineAndGetUpdateValue,
it is using the table schema not the payload schema above.
{code:java}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
...
}
[2, 34, 50, 48, 50, 53, 48, 49, 49, 53, 49, 57, 49, 49, 51, 54, 51, 52, 49, 2,
42, 50, 48, 50, 53, 48, 49, 49, 53, 49, 57, 49, 49, 51, 54, 51, 52, 49, 95, 48,
95, 49, 2, 2, 49, 2, 0, 2, 76, 54, 54, 50, 51, 53, 97, 55, 49, 45, 101, 100,
51, 48, 45, 52, 50, 55, 52, 45, 57, 57, 56, 54, 45, 51, 50, 53, 98, 50, 56, 98,
51, 54, 51, 57, 57, 45, 48, 2, 24, 2, -46, 15]
{"type":"record","name":"h1_record","namespace":"hoodie.h1","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"id","type":["null","int"],"default":null},{"name":"name","type":["null","string"],"default":null},{"name":"price","type":["null","long"],"default":null},{"name":"ts","type":["null","long"],"default":null},{"name":"description","type":["null","string"],"default":null}]}
{code}
We are using this schema probably because when the clustering client is
configured, the writer schema it uses is just writer schema.
We are using this schema probably because when the clustering client is
configured, the writer schema it uses is just writer schema.
> Merge Into is pulling in additional fields which are not set as per the
> condition
> ----------------------------------------------------------------------------------
>
> Key: HUDI-8628
> URL: https://issues.apache.org/jira/browse/HUDI-8628
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: spark-sql
> Reporter: sivabalan narayanan
> Assignee: Davis Zhang
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.0.1
>
> Attachments: image-2024-12-02-03-58-48-178.png
>
>
> spark.sql(s"set
> ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> spark.sql(s"set
> ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
> spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} =
> $logDataBlockFormat")
> spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
> spark.sql(
> s"""
> |create table $tableName (|
> |id int,|
> |name string,|
> |price long,|
> |ts long,|
> |description string|
> |) using hudi|
> |tblproperties(|
> |type ='$tableType',|
> |primaryKey = 'id',|
> |preCombineField = 'ts'|
> |)|
> |location '$basePath'
> """.stripMargin)
> spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
> "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")|
>
>
> Merge Into:
> // Partial updates using MERGE INTO statement with changed fields: "price"
> and "_ts"
> spark.sql(
> s"""
> |merge into $tableName t0|
> |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts|
> |union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts) s0|
> |on t0.id = s0.id|
> |when matched then update set price = s0.price, ts = s0._ts|
> |""".stripMargin)|
>
> The schema for this merge into command when we reach
> HoodieSparkSqlWriter.deduceWriterSchema is given below.
> i.e.
> val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema,
> latestTableSchemaOpt, internalSchemaOpt, parameters)
>
> !image-2024-12-02-03-58-48-178.png!
>
> the merge into command only instructs to update price and _ts right? So, why
> other fields are also picked up from source(for eg name).
> You can check out the test in TestPartialUpdateForMergeInto.Test partial
> update with MOR and Avro log format
>
> Note: This is partial update support w/ MergeInto btw, not a regular
> MergeInto.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)