linliu-code opened a new pull request, #17846:
URL: https://github.com/apache/hudi/pull/17846
### Change Logs
This PR fixes the unnecessary scanning of the target table in MERGE INTO
statement in Spark by only using the source table as the input if the target
table has a record key (before this change the source and target table is left
joined, causing all partitions in the target table to be scanned unnecessarily,
if the source table only contains data targeting a handful of partitions). For
primary-keyless table the logic is not changed, i.e., the source and target
table is left joined to get the meta column values for prepped upsert in MERGE
INTO.
Spark SQL for testing:
create database MIT_partition_pruning5;
use MIT_partition_pruning5;
CREATE TABLE merge_source (
ts BIGINT,
uuid STRING,
fare DOUBLE,
city STRING
) USING PARQUET;
INSERT INTO merge_source
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330',19.10,'san_francisco');
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
PARTITIONED BY (city)
OPTIONS (
primaryKey 'uuid',
hoodie.datasource.write.operation 'upsert',
hoodie.datasource.write.precombine.field 'ts',
hoodie.datasource.write.recordkey.field 'uuid',
hoodie.table.name 'MIT_partition_pruning'
);
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco');
INSERT INTO hudi_table
SELECT
1695115999911 AS timestamp, -- Creating unique timestamps based on the
counter
uuid() AS uuid,
CONCAT('rider-', CAST(65 + (counter % 26) AS STRING)) AS rider,
CONCAT('driver-', CAST(75 + (counter % 26) AS STRING)) AS driver,
ROUND(rand() * (100 - 10) + 10, 2) AS amount, -- Random fare between 10
and 100
concat('p', CAST((counter % 300) AS STRING)) AS city
FROM (SELECT explode(sequence(1, 1000000)) AS counter) A;
MERGE INTO hudi_table AS target
USING merge_source AS source
ON target.uuid = source.uuid
and target.city = source.city
WHEN MATCHED THEN UPDATE SET target.fare = target.fare
;
### Impact
Improves performance of MERGE INTO on Spark
### Risk level
low. Performance improvement only. Existing tests have guarded the
correctness of MERGE INTO statement.
### Documentation Update
<!-- Describe any necessary documentation update if there is any new
feature, config, or user-facing change. If not, put "none".
- The config description must be updated if new configs are added or the
default value of the configs are changed.
- Any new feature or user-facing change requires updating the Hudi website.
Please follow the
[instruction](https://hudi.apache.org/contribute/developer-setup#website)
to make changes to the website. -->
### Contributor's checklist
- [ ] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [ ] Enough context is provided in the sections above
- [ ] Adequate tests were added if applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]