bowenliang123 opened a new pull request, #7932:
URL: https://github.com/apache/iceberg/pull/7932

   - Use Reblance instead of Repartition for distribution in Spark 3.2 and 3.3, 
to avoid small partitioned files cased with AQE decided partition numbers.
   
   Before: 
   Having `REPARTITION_BY_NUM` in `+- Exchange hashpartitioning(lab_numr#270, 
busi_date#271, 200), REPARTITION_BY_NUM,`
   
   ```
   +----------------------------------------------------+
   |                        plan                        |
   +----------------------------------------------------+
   | == Parsed Logical Plan ==
   'InsertIntoStatement 'UnresolvedRelation [gfpersonas_platform, 
t_ptr_label_ice_bowen], [], false, false, false
   +- 'Project [*]
      +- 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice], [], false
   
   == Analyzed Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false
   +- Project [obj_id#42, lab_val#43, lab_numr#44, busi_date#45]
      +- SubqueryAlias spark_catalog.gfpersonas_platform.t_ptr_label_ice
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Optimized Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false, 
IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, 
format=PARQUET)
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false
      +- RebalancePartitions [lab_numr#44, busi_date#45]
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Physical Plan ==
   AppendData 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732,
 IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, 
format=PARQUET)
   +- AdaptiveSparkPlan isFinalPlan=false
      +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], 
false, 0
         +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), 
REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
            +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, 
groupedBy=] RuntimeFilters: []
    |
   +----------------------------------------------------+
   ```
   
   After: 
   Having `REBALANCE_PARTITIONS_BY_COL` in `Exchange 
hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, `
   
   ```
   +----------------------------------------------------+
   |                        plan                        |
   +----------------------------------------------------+
   | == Parsed Logical Plan ==
   'InsertIntoStatement 'UnresolvedRelation [gfpersonas_platform, 
t_ptr_label_ice_bowen], [], false, false, false
   +- 'Project [*]
      +- 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice], [], false
   
   == Analyzed Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false
   +- Project [obj_id#42, lab_val#43, lab_numr#44, busi_date#45]
      +- SubqueryAlias spark_catalog.gfpersonas_platform.t_ptr_label_ice
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Optimized Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false, 
IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, 
format=PARQUET)
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false
      +- RebalancePartitions [lab_numr#44, busi_date#45]
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Physical Plan ==
   AppendData 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732,
 IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, 
format=PARQUET)
   +- AdaptiveSparkPlan isFinalPlan=false
      +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], 
false, 0
         +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), 
REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
            +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] 
spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, 
groupedBy=] RuntimeFilters: []
    |
   +----------------------------------------------------+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to