alamb commented on issue #12088:
URL: https://github.com/apache/datafusion/issues/12088#issuecomment-2299627654

   Here is a subset of our internal content (thanks @NGA-TRAN  who originally 
wrote this)
   
   
   
   # Introduction
   
   A SQL/InfluxQL query in InfluxDB 3.0 is executed based on a `query plan`. To 
see the plan without running the query, add the keyword `EXPLAIN`
   
   ```SQL
   EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
   ```
   
   The output will look like
   
   ```SQL
   
+---------------+--------------------------------------------------------------------------+
   | plan_type     | plan                                                       
              |
   
+---------------+--------------------------------------------------------------------------+
   | logical_plan  | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST   
              |
   |               |   TableScan: h2o projection=[city, min_temp, time]         
              |
   | physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 
DESC]             |
   |               |   UnionExec                                                
              |
   |               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]     
              |
   |               |       ParquetExec: file_groups={...}, projection=[city, 
min_temp, time]  |
   |               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]     
              |
   |               |       ParquetExec: file_groups={...}, projection=[city, 
min_temp, time]  |
   |               |                                                            
              |
   
+---------------+--------------------------------------------------------------------------+
   ```
   Figure 1: A simplified output of an explain
   
   There are two major plans: logical plan and physical plan
   
   - **Logical Plan:** is a plan generated for a specific SQL (or InfluxQL) 
without the knowledge of the underlying data organization and the cluster 
configuration. Because InfluxDB 3.0 is built on top of 
[DataFusion](https://github.com/apache/arrow-datafusion), this logical plan is 
the same as if you use DataFusion with any data format or storage.
     
   - **Physical Plan:** is a plan generated from its corresponding logical plan 
plus the consideration of the cluster configurations (e.g number of CPUs) and 
the underlying data organization (e.g number of files, if the files overlap or 
not, etc). This physical plan is very specfic to your Influx cluster 
configuration and your data. If you load the same data to different clusters 
with different cofiguarions, the same query may be generate different query 
plans. Similarly, the same query on the same cluster at different time can have 
different plans depending on your data at that time.
   
   Understanding a query plan can help to explain why your query is slow. For 
example, when the plan shows your query reads many files, it signals you to 
either add more filter in the query to read less data or to modify your cluster 
configuration/design to make fewer but larger files. This document focuses on 
how to read a query plan. How to make a query run faster depends on the reason 
it is slow and beyond the scope of this document.
   
   # How to read a query plan
   
   ## Query plan is a tree
   
   A query plan is an upside down tree and we always read from bottom up. The 
physical plan in Figure 1 in tree format will look like
   
   ```
                      ┌─────────────────────────┐                  
                      │ SortPreservingMergeExec │                  
                      └─────────────────────────┘                  
                                   ▲                               
                                   │                               
                      ┌─────────────────────────┐                  
                      │        UnionExec        │                  
                      └─────────────────────────┘                  
                                   ▲                               
                ┌──────────────────┴─────────────────┐             
                │                                    │             
   ┌─────────────────────────┐          ┌─────────────────────────┐
   │        SortExec         │          │        SortExec         │
   └─────────────────────────┘          └─────────────────────────┘
                ▲                                    ▲             
                │                                    │             
                │                                    │             
   ┌─────────────────────────┐          ┌─────────────────────────┐
   │       ParquetExec       │          │       ParquetExec       │
   └─────────────────────────┘          └─────────────────────────┘
   ```
   Figure 2: The tree structure of physical plan in Figure 1
   
   Each node in the tree/plan ends with `Exec` and is sometimes also called an 
`operator` or `ExecutionPlan` where data is processed, transformed and sent up.
   
   First, data in parquet files are read in parallel through the two 
`ParquetExec`, which each outputs a stream of data to its corresponding 
`SortExec`. The `SortExc` is responsible for sorting the data in `city` 
ascendingly and `time` descendingly. The sorted outputs from the two `SortExec` 
are then unioned by the `UnionExec`  which is then (sort) merged by the 
`SortPreservingMergeExec` to return the sorted data.
   
   ## How to understand a large query plan
   
   A large query plan may look intimidating but if you follow these steps, you 
can quickly understand what the plan does.
   
   1. As always, read from bottom up, one operator at a time.
   2. Understand the job of this operator which mostly can be found from 
[DataFusion Physical Plan 
documentation](https://docs.rs/datafusion/latest/datafusion/physical_plan/index.html).
 
   3. What the input data of the operator looks like and how large/small it may 
be.
   4. How much data that operator may send out and what it would look like.
   
   If you can answer those questions, you will be able to estimate how much 
work that plan has to do. However, the `explain` just shows you the plan 
without executing it. If you want to know exactly how long a plan and each of 
its operators take, you need other tools.
   
   
   ##### Tools that show the exact runtime for each operator
   
   (INTERNAL STUFF FROM IOX)
   
   2. Run `explain analyze` to get the explain with runtime added
   
   ##### More information for debugging
   If the plan has to read too many files, not all of them will be shown in the 
explain. To see them, use `explain verbose`. Like `explain`, `explain verbose` 
does not run the query and thus you won't get runtime. What you get is all 
information that is cut off from the explain and all intermidiate physical 
plans IOx querier and DataFusion generate before returning the final physical 
plan. This is very helpful for debugging to see when an operator is added to or 
removed from a plan.
   
   ## Example of typical plan of leading edge data
   
   Let us delve into an example that covers typical operators as well as IOx 
specific ones on leadng edge data. 
   
   
   ##### Data Organization
   (EXPLAIN clickbench file here)
   
   
   ##### Query and query plan
   
   TODO change this example
   
   ```sql
   EXPLAIN SELECT city, count(1) FROM   h2o WHERE  time >= to_timestamp(200) 
AND time < to_timestamp(700) AND state = 'MA' GROUP BY city ORDER BY city ASC;
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: h2o.city ASC NULLS LAST                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |   Aggregate: groupBy=[[h2o.city]], 
aggr=[[COUNT(Int64(1))]]                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                     |
   |               |     TableScan: h2o projection=[city], 
full_filters=[h2o.time >= TimestampNanosecond(200, None), h2o.time < 
TimestampNanosecond(700, None), h2o.state = Dictionary(Int32, Utf8("MA"))]      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                             |
   | physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST]           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |   SortExec: expr=[city@0 ASC NULLS LAST]                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |     AggregateExec: mode=FinalPartitioned, gby=[city@0 as 
city], aggr=[COUNT(Int64(1))]                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |       CoalesceBatchesExec: target_batch_size=8192          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |         RepartitionExec: partitioning=Hash([city@0], 4), 
input_partitions=4                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |           AggregateExec: mode=Partial, gby=[city@0 as 
city], aggr=[COUNT(Int64(1))]                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=3                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                           |
   |               |               UnionExec                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                 ProjectionExec: expr=[city@0 as city]      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                 |
   |               |                     FilterExec: time@2 >= 200 AND time@2 < 
700 AND state@1 = MA                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                       ParquetExec: file_groups={2 groups: 
[[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/243db601-f3f1-401b-afda-82160d8cc1a8.parquet],
 
[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/f5fb7c7d-16ac-49ba-a811-69578d05843f.parquet]]},
 projection=[city, state, time], output_ordering=[state@1 ASC, city@0 ASC, 
time@2 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, 
pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA 
AND MA <= state_max@3                                           |
   |               |                 ProjectionExec: expr=[city@1 as city]      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                   DeduplicateExec: [state@2 ASC,city@1 
ASC,time@3 ASC]                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |                     SortPreservingMergeExec: [state@2 
ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]                                  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  |
   |               |                       UnionExec                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                         SortExec: expr=[state@2 ASC,city@1 
ASC,time@3 ASC,__chunk_order@0 ASC]                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                           CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                         |
   |               |                             FilterExec: time@3 >= 200 AND 
time@3 < 700 AND state@2 = MA                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              |
   |               |                               RecordBatchesExec: chunks=1, 
projection=[__chunk_order, city, state, time]                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                         CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                           |
   |               |                           FilterExec: time@3 >= 200 AND 
time@3 < 700 AND state@2 = MA                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                |
   |               |                             ParquetExec: file_groups={2 
groups: 
[[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet],
 
[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/9255eb7f-2b51-427b-9c9b-926199c85bdf.parquet]]},
 projection=[__chunk_order, city, state, time], output_ordering=[state@2 ASC, 
city@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@5 >= 200 AND 
time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND 
time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3 |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   ```
   Figure 3: A typical query plan of leading edge (most recent) data
   
   
   Let us begin reading bottom up. The bottom or leaf nodes are always either 
`ParquetExec` or `RecordBatchExec`. There are 3 of them in this plan and let us 
go over one by one.
   
   
   ### Three bottom leaf operators: two `ParquetExec` & one `RecordBatchesExec`
   
   **First ParqetExec**
   
   ```sql                                                                       
                                                            |
   |               |                             ParquetExec: file_groups={2 
groups: 
[[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet],
 
[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/9255eb7f-2b51-427b-9c9b-926199c85bdf.parquet]]},
 projection=[__chunk_order, city, state, time], output_ordering=[state@2 ASC, 
city@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@5 >= 200 AND 
time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND 
time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3 |
   ```
   Figure 4: First `ParquetExec`
   
   - This `ParquetExec` includes 2 groups of files. Each group can contain one 
or many files but, in this example, there is one file in each group. Files in 
each group are read sequencially but groups will be executed in parallel. So 
for this example, 2 files will be read in parallel.
   - 
`1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet`
 this is the path of the file in S3. It is in the structure 
`namespace_id/table_id/partition_hash_id/uuid_of_the_file.parquet`. They tell 
you a lot:
      - Which namespace (aka database) and table of this query
      - Which partition this file belongs so you know how many partitions this 
query reads
      - Which file it is for you to devle into its information in the catalog 
(e.g. size, number of rows, ...) or download for local debug as needed. Here 
are intructions to download files from 
[Serverless](https://github.com/influxdata/docs.influxdata.io/blob/main/content/operations/specifications/iox_runbooks/querier/querier-diagnose-repro-an-issue.md#iii-download-files-and-reproduce-the-problem-locally)
  and 
[Dedicated](https://github.com/influxdata/docs.influxdata.io/blob/main/content/operations/specifications/iox_runbooks/querier/querier-cst-access.md#down-load-files-and-rebuild-catalog-to-reproduce-the-issue-locally)
    - `projection=[__chunk_order, city, state, time]` : there are many columns 
in this table but only these 4 columns are read. `__chunk_order` is an 
artifical column the IOx code generates to keep the chunks/files in order for 
deduplication.
    - `output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 
ASC]` : the output of this `ParquetExec` will be sorted on `state ASC, city 
ASC, time ASC, __chunk_order ASC`. The reason they are in that order becasue 
the file is already sorted like that.
    - `predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA` : filter in 
the query that will be used for data pruning
    - `pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 
<= MA AND MA <= state_max@3` : the actual pruning predicate transformed from 
the predicate above. This is used to filter files outside that predicate. At 
this time (Dec 2023), we only filter files based on `time`. Note that this 
predicate is for pruning **files of the chosen parttions**. Before this 
physical plan is generated, there is a `partition pruning` step where 
partitions are pruned using the predicates on parititoning columns (a near 
future document about `Partitionning data for better query performance` will 
explain this in detail). 
   
   **RecordbatchesExec**
   
   ```sql
   |               |                               RecordBatchesExec: chunks=1, 
projection=[__chunk_order, city, state, time]                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   ```
   Figure 5: `RecordBacthesExec`
   
   Data from ingester can be in many chunks, in this example there is only one. 
Like `ParquetExec`, only data of 4 columns are sent to the output. The action 
of **filtering columns** is called `projection pushdown` and, thus, it is named 
`projection` here.
   
   **Second ParquetExec**
   
   ```sql
   |               |                       ParquetExec: file_groups={2 groups: 
[[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/243db601-f3f1-401b-afda-82160d8cc1a8.parquet],
 
[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/f5fb7c7d-16ac-49ba-a811-69578d05843f.parquet]]},
 projection=[city, state, time], output_ordering=[state@1 ASC, city@0 ASC, 
time@2 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, 
pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA 
AND MA <= state_max@3                                           |
   ```
   Figure 6: Second `ParquetExec`
   
   The readings are similar to the one above. Note that these files and the 
ones in the first `ParquetExec` belong to the same (InfluxDB) partition (e.g. 
the same day)
   
   ### Data-scanning structures
   
   So the question is why we split parquet files into different ParquetExec 
while they are in the same partition? There are many reasons but two major ones 
are:
   1. Split the non-overlaps from the overlaps so we only need to apply the 
expensive deduplication operation on the overlaps. This is the case of this 
example.
   2. Split the non-overlaps to increase parallel execution
   
   ##### When we know there are ovelaps?
   
   ```sql
   |               |                   DeduplicateExec: [state@2 ASC,city@1 
ASC,time@3 ASC]                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |                     SortPreservingMergeExec: [state@2 
ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]                                  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  |
   |               |                       UnionExec                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                         SortExec: expr=[state@2 ASC,city@1 
ASC,time@3 ASC,__chunk_order@0 ASC]                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                           CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                         |
   |               |                             FilterExec: time@3 >= 200 AND 
time@3 < 700 AND state@2 = MA                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              |
   |               |                               RecordBatchesExec: chunks=1, 
projection=[__chunk_order, city, state, time]                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                         CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                           |
   |               |                           FilterExec: time@3 >= 200 AND 
time@3 < 700 AND state@2 = MA                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                |
   |               |                             ParquetExec: file_groups={2 
groups: 
[[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/2cbb3992-4607-494d-82e4-66c480123189.parquet],
 
[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/9255eb7f-2b51-427b-9c9b-926199c85bdf.parquet]]},
 projection=[__chunk_order, city, state, time], output_ordering=[state@2 ASC, 
city@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@5 >= 200 AND 
time@5 < 700 AND state@4 = MA, pruning_predicate=time_max@0 >= 200 AND 
time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3 |
   
   ```
   Figure 7: `DeduplicationExec` is a signal of overlapped data
   
   This structure tells us that since there are `DeduplicationExec`, data 
underneath it overlaps. More specifically, data in 2 files overlaps or/and 
overlap with the data from the Ingesters.
   
   - `FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA`: This is the 
place we filter out everything that meets the conditions `time@3 >= 200 AND 
time@3 < 700 AND state@2 = MA`. The pruning before just prune data when 
possible, it does not guarantee all of them are pruned. We need this filter to 
do the fully filtering job.
   - `CoalesceBatchesExec: target_batch_size=8192` is just a way to group 
smaller data to larger groups if possible. Refer to DF documentation for how it 
works
   - `SortExec: expr=[state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]` : 
this sorts data on `state ASC, city ASC, time ASC, __chunk_order ASC`. Note 
that this sort operator is only applied on data from ingesters because data 
from files is already sorted on that order.
   - `UnionExec` is simply a place to pull many streams together. It does not 
merge anything.
   - `SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 
ASC,__chunk_order@0 ASC]` : this merges the already sorted data. When you see 
this, you know data below must be sorted. The output data is in one sorted 
stream.
   - `DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC]` : this deduplicated 
sorted data coming from strictly one input stream. That is why you often see 
under `DeduplicateExec` is `SortPreservingMergeExec` but it is not a must. As 
long as the input to `DeduplicateExec` is one sorted stream of data, it will 
work correctly.
   
   ##### When we know there are no overlaps?
   
   ```sql
   |               |                 ProjectionExec: expr=[city@0 as city]      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                 |
   |               |                     FilterExec: time@2 >= 200 AND time@2 < 
700 AND state@1 = MA                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |                       ParquetExec: file_groups={2 groups: 
[[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/243db601-f3f1-401b-afda-82160d8cc1a8.parquet],
 
[1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/f5fb7c7d-16ac-49ba-a811-69578d05843f.parquet]]},
 projection=[city, state, time], output_ordering=[state@1 ASC, city@0 ASC, 
time@2 ASC], predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA, 
pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA 
AND MA <= state_max@3                                           |
   |
   ```
   Figure 8: No `DeduplicateExec` means files not overlap
   
   Deduplicate is not in this structure and above it means the files here do 
not overlap
   
   - `ProjectionExec: expr=[city@0 as city]` : this will filter column data and 
only send out data of column `city`
   
   
   ### Other operators
   
   Now let us look at the rest of the plan
   
   ```sql
   | physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST]           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |   SortExec: expr=[city@0 ASC NULLS LAST]                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |     AggregateExec: mode=FinalPartitioned, gby=[city@0 as 
city], aggr=[COUNT(Int64(1))]                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |       CoalesceBatchesExec: target_batch_size=8192          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |         RepartitionExec: partitioning=Hash([city@0], 4), 
input_partitions=4                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |           AggregateExec: mode=Partial, gby=[city@0 as 
city], aggr=[COUNT(Int64(1))]                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=3                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                           |
   |               |               UnionExec 
   ```
   Figure 9: The rest of the plan structure
   
   
   - `UnionExec`: union data streams. Note that the number of output streams is 
the same is the numner of input streams. The operator above it is responsible 
to do actual merge or split streams further. This UnionExec is just here as an 
intermediate steps of the merge/split.
   - `RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3`: 
This split 3 input streams into 4 output streams in round-robin fashion. The 
reason to split is to increase the parallel execution.
   - `AggregateExec: mode=Partial, gby=[city@0 as city], 
aggr=[COUNT(Int64(1))]`:  This group data as specified in `city, count(1)`. 
Becasue there are 4 input streams, each stream is aggregated separately and 
hence the output also 4 streams which means the output data is not fully 
aggregated and the `mode=Partial` signals us that.
   - `RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4` : 
this repartitions data on hash(`city`) into 4 streams which means the same city 
will go into the same stream
   - `AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], 
aggr=[COUNT(Int64(1))]`: Since  rows of same city are in the same stream, we 
only need to do the final aggregation.
   - `SortExec: expr=[city@0 ASC NULLS LAST]` : Sort 4 streams of data each on 
`city` per the query request
   - `SortPreservingMergeExec: [city@0 ASC NULLS LAST]`: (Sort) merge 4 sorted 
streams to return the final results
   
   # For your questions and references
   
   If you see the plan reads a lof of files and does deduplication on all of 
them you may want ask: "do all of them overlap or not?" The asnwer is either 
yes or no depending on the situation. There are other reasons that we 
deduplicate non-overlap files due to memory limitation but they will be topics 
for future documentation.
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to