linliu-code opened a new pull request, #17868: URL: https://github.com/apache/hudi/pull/17868
### Change Logs Cherry-picked: https://github.com/apache/hudi/pull/12934 This PR fixes the unnecessary scanning of the target table in MERGE INTO statement in Spark by only using the source table as the input if the target table has a record key (before this change the source and target table is left joined, causing all partitions in the target table to be scanned unnecessarily, if the source table only contains data targeting a handful of partitions). For primary-keyless table the logic is not changed, i.e., the source and target table is left joined to get the meta column values for prepped upsert in MERGE INTO. Spark SQL for testing: create database MIT_partition_pruning5; use MIT_partition_pruning5; CREATE TABLE merge_source ( ts BIGINT, uuid STRING, fare DOUBLE, city STRING ) USING PARQUET; INSERT INTO merge_source VALUES (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330',19.10,'san_francisco'); CREATE TABLE hudi_table ( ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING ) USING HUDI PARTITIONED BY (city) OPTIONS ( primaryKey 'uuid', hoodie.datasource.write.operation 'upsert', hoodie.datasource.write.precombine.field 'ts', hoodie.datasource.write.recordkey.field 'uuid', hoodie.table.name 'MIT_partition_pruning' ); INSERT INTO hudi_table VALUES (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'), (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'), (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'), (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'); INSERT INTO hudi_table SELECT 1695115999911 AS timestamp, -- Creating unique timestamps based on the counter uuid() AS uuid, CONCAT('rider-', CAST(65 + (counter % 26) AS STRING)) AS rider, CONCAT('driver-', CAST(75 + (counter % 26) AS STRING)) AS driver, ROUND(rand() * (100 - 10) + 10, 2) AS amount, -- Random fare between 10 and 100 concat('p', CAST((counter % 300) AS STRING)) AS city FROM (SELECT explode(sequence(1, 1000000)) AS counter) A; MERGE INTO hudi_table AS target USING merge_source AS source ON target.uuid = source.uuid and target.city = source.city WHEN MATCHED THEN UPDATE SET target.fare = target.fare ; ### Impact Improves performance of MERGE INTO on Spark ### Risk level low. Performance improvement only. Existing tests have guarded the correctness of MERGE INTO statement. ### Documentation Update <!-- Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none". - The config description must be updated if new configs are added or the default value of the configs are changed. - Any new feature or user-facing change requires updating the Hudi website. Please follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make changes to the website. --> ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Enough context is provided in the sections above - [ ] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
