linliu-code opened a new pull request, #17846:
URL: https://github.com/apache/hudi/pull/17846

   ### Change Logs
   This PR fixes the unnecessary scanning of the target table in MERGE INTO 
statement in Spark by only using the source table as the input if the target 
table has a record key (before this change the source and target table is left 
joined, causing all partitions in the target table to be scanned unnecessarily, 
if the source table only contains data targeting a handful of partitions). For 
primary-keyless table the logic is not changed, i.e., the source and target 
table is left joined to get the meta column values for prepped upsert in MERGE 
INTO.
   
   Spark SQL for testing:
   
   create database MIT_partition_pruning5;
   use MIT_partition_pruning5;
   CREATE TABLE merge_source (
       ts BIGINT,
       uuid STRING,
       fare DOUBLE,
       city STRING
   ) USING PARQUET;
   
   INSERT INTO merge_source
   VALUES
   (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330',19.10,'san_francisco');
   
   CREATE TABLE hudi_table (
       ts BIGINT,
       uuid STRING,
       rider STRING,
       driver STRING,
       fare DOUBLE,
       city STRING
   ) USING HUDI
   PARTITIONED BY (city)
   OPTIONS ( 
     primaryKey 'uuid', 
     hoodie.datasource.write.operation 'upsert', 
     hoodie.datasource.write.precombine.field 'ts', 
     hoodie.datasource.write.recordkey.field 'uuid',
     hoodie.table.name 'MIT_partition_pruning'
     );
   
   INSERT INTO hudi_table
   VALUES
   
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
   
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
 ,'san_francisco'),
   
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
 ,'san_francisco'),
   
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco');
   
   
   INSERT INTO hudi_table
   SELECT 
       1695115999911 AS timestamp,  -- Creating unique timestamps based on the 
counter
       uuid() AS uuid,
       CONCAT('rider-', CAST(65 + (counter % 26) AS STRING)) AS rider,
       CONCAT('driver-', CAST(75 + (counter % 26) AS STRING)) AS driver,
       ROUND(rand() * (100 - 10) + 10, 2) AS amount,  -- Random fare between 10 
and 100
       concat('p', CAST((counter % 300) AS STRING)) AS city
   FROM (SELECT explode(sequence(1, 1000000)) AS counter) A;
   
   MERGE INTO hudi_table AS target
   USING merge_source AS source
   ON target.uuid = source.uuid
   and target.city = source.city
   WHEN MATCHED THEN UPDATE SET target.fare = target.fare
   ;
   
   ### Impact
   Improves performance of MERGE INTO on Spark
   
   ### Risk level
   low. Performance improvement only. Existing tests have guarded the 
correctness of MERGE INTO statement.
   
   ### Documentation Update
   
   <!-- Describe any necessary documentation update if there is any new 
feature, config, or user-facing change. If not, put "none".
   
   - The config description must be updated if new configs are added or the 
default value of the configs are changed.
   - Any new feature or user-facing change requires updating the Hudi website. 
Please follow the 
     [instruction](https://hudi.apache.org/contribute/developer-setup#website) 
to make changes to the website. -->
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Enough context is provided in the sections above
   - [ ] Adequate tests were added if applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to