haohuaijin commented on issue #16919:
URL: https://github.com/apache/datafusion/issues/16919#issuecomment-3124127867

   By default, datafusion will use the number of cpu cores to execute queries 
concurrently.
   For example, if the cpu core is 10 and the parquet files are 15, datafusion 
will divide the 15 files into 10 groups and then execute them concurrently.
   So for above issue, because there are 15 files but only 10 cpu cores, 
datafusion groups the files, which results in the data in one group not being 
sorted by `col_1` and `col_2`.
   Before create table set the target_partitions equal to file number, will 
generate the except plan.
   For example
   ```sql
   SET datafusion.execution.target_partitions = 15;
   
   CREATE EXTERNAL TABLE example (
       col_1 VARCHAR(50) NOT NULL,
       col_2 BIGINT NOT NULL,
       col_3 VARCHAR(50),
       col_4 VARCHAR(50),
       col_5 VARCHAR(50),
       col_6 VARCHAR(100) NOT NULL,
       col_7 VARCHAR(50),
       col_8 DOUBLE
   ) 
   WITH ORDER (col_1 ASC, col_2 ASC) 
   STORED AS PARQUET 
   LOCATION '/tmp/redacted/*.parquet';
   
   EXPLAIN FORMAT INDENT COPY (
       SELECT 
           col_1,
           col_2,
           col_3,
           col_4,
           col_5,
           col_6,
           first_value(col_7) AS col_7,
           first_value(col_8) AS col_8
       FROM 
           example 
       GROUP BY 
           col_1, col_2, col_3, col_4, col_5, col_6 
       ORDER BY 
           col_1 ASC, col_2 ASC
   ) 
   TO '/tmp/result.parquet' 
   STORED AS PARQUET 
   OPTIONS (compression 'zstd(1)');
   
   ```
   ```
   

   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                     |
   

   | logical_plan  | CopyTo: format=parquet output_url=/tmp/result.parquet 
options: (format.compression zstd(1))                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                          |
   |               |   Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC 
NULLS LAST                                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                        |
   |               |     Projection: example.col_1, example.col_2, 
example.col_3, example.col_4, example.col_5, example.col_6, 
first_value(example.col_7) AS col_7, first_value(example.col_8) AS col_8        
                                                                                
                                                                                
                                                                                
                                                                                
                                                      |
   |               |       Aggregate: groupBy=[[example.col_1, example.col_2, 
example.col_3, example.col_4, example.col_5, example.col_6]], 
aggr=[[first_value(example.col_7), first_value(example.col_8)]]                 
                                                                                
                                                                                
                                                                                
                                                                                
                                         |
   |               |         TableScan: example projection=[col_1, col_2, 
col_3, col_4, col_5, col_6, col_7, col_8]                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                           |
   | physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[])             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                     |
   |               |   SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, 
col_2@1 ASC NULLS LAST]                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                            |
   |               |     ProjectionExec: expr=[col_1@0 as col_1, col_2@1 as 
col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6, 
first_value(example.col_7)@6 as col_7, first_value(example.col_8)@7 as col_8]   
                                                                                
                                                                                
                                                                                
                                                                                
                          |
   |               |       AggregateExec: mode=FinalPartitioned, gby=[col_1@0 
as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as 
col_5, col_6@5 as col_6], aggr=[first_value(example.col_7), 
first_value(example.col_8)], ordering_mode=PartiallySorted([0, 1])              
                                                                                
                                                                                
                                                                                
                                                |
   |               |         SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 
ASC NULLS LAST], preserve_partitioning=[true]                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                        |
   |               |           CoalesceBatchesExec: target_batch_size=8192      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                     |
   |               |             RepartitionExec: partitioning=Hash([col_1@0, 
col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 15), input_partitions=15          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                       |
   |               |               AggregateExec: mode=Partial, gby=[col_1@0 as 
col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, 
col_6@5 as col_6], aggr=[first_value(example.col_7), 
first_value(example.col_8)], ordering_mode=PartiallySorted([0, 1])              
                                                                                
                                                                                
                                                                                
                                                 |
   |               |                 DataSourceExec: file_groups={15 groups: 
[[Users/huaijinhao/Downloads/datav4-15/reproducible_data_0.parquet], 
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_1.parquet], 
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_10.parquet], 
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_11.parquet], 
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_12.parquet], ...]}, 
projection=[col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8], 
output_ordering=[col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST], 
file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                     |
   

   ```
   
   another way is enable `split_file_groups_by_statistics` to make file in each 
group not overlap on sort key. But the parquet file generated by python dot not 
have min/max statistics, so i not able to verify this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to