2010YOUY01 commented on issue #18341:
URL: https://github.com/apache/datafusion/issues/18341#issuecomment-3466350575

   I think it might be related to the issue, there is a discord discussion for 
slow tpch q1 
https://discord.com/channels/885562378132000778/1290751484807352412/1432863136612089959
   
   From the plan, there is only one hash repartition operator, and no 
round-robin repartition. I believe they're necessary if the data source is not 
very small (and we have to push them further down as explained in this issue, 
otherwise they're useless)
   The reason is the push downed filter ` l_shipdate <= date '1998-09-02'` may 
filter out most of the input parquet scanner partition.
   
   (Note below is just a imaginary case, I haven't verified that's the actual 
reason for slow Q1, but I think it's possible, so it's necessary to fix the 
`RepartitionExec` issue)
   
   e.g.  Let's say `datafusion.execution.target_partitions=4`, so there are 4 
parallel parquet scanner, also 4 parallel aggregate operator in the upstream.
   
   partition 1: l_shipdate has range [1990, 2000]
   partition 2: l_shipdate has range [2000, 2005]
   partition 3: l_shipdate has range [2005, 2010]
   partition 4: l_shipdate has range [2010, 2020]
   
   The consequence is partition 2,3,4 has no output data in the parquet reader, 
then only one partition is busy, and the available CPUs can't be fully utilized.
   
   
   
   <details>
   <summary> TPCH Q1 plan </summary>
   
   ```
   > CREATE EXTERNAL TABLE IF NOT EXISTS lineitem
   STORED AS parquet
   LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem';
   
   > explain select
       l_returnflag,
       l_linestatus,
       sum(l_quantity) as sum_qty,
       sum(l_extendedprice) as sum_base_price,
       sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
       sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
       avg(l_quantity) as avg_qty,
       avg(l_extendedprice) as avg_price,
       avg(l_discount) as avg_disc,
       count(*) as count_order
   from
       lineitem
   where
           l_shipdate <= date '1998-09-02'
   group by
       l_returnflag,
       l_linestatus
   order by
       l_returnflag,
       l_linestatus;
   +---------------+-------------------------------+
   | plan_type     | plan                          |
   +---------------+-------------------------------+
   | physical_plan | ┌───────────────────────────┐ |
   |               | │  SortPreservingMergeExec  │ |
   |               | │    --------------------   │ |
   |               | │   l_returnflag ASC NULLS  │ |
   |               | │     LAST, l_linestatus    │ |
   |               | │       ASC NULLS LAST      │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │          SortExec         │ |
   |               | │    --------------------   │ |
   |               | │  l_returnflag@0 ASC NULLS │ |
   |               | │    LAST, l_linestatus@1   │ |
   |               | │       ASC NULLS LAST      │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │       ProjectionExec      │ |
   |               | │    --------------------   │ |
   |               | │         avg_disc:         │ |
   |               | │  avg(lineitem.l_discount) │ |
   |               | │                           │ |
   |               | │         avg_price:        │ |
   |               | │        avg(lineitem       │ |
   |               | │        .l_extendedp       │ |
   |               | │           rice)           │ |
   |               | │                           │ |
   |               | │          avg_qty:         │ |
   |               | │  avg(lineitem.l_quantity) │ |
   |               | │                           │ |
   |               | │        count_order:       │ |
   |               | │      count(Int64(1))      │ |
   |               | │                           │ |
   |               | │       l_linestatus:       │ |
   |               | │        l_linestatus       │ |
   |               | │                           │ |
   |               | │       l_returnflag:       │ |
   |               | │        l_returnflag       │ |
   |               | │                           │ |
   |               | │      sum_base_price:      │ |
   |               | │        sum(lineitem       │ |
   |               | │        .l_extendedp       │ |
   |               | │           rice)           │ |
   |               | │                           │ |
   |               | │        sum_charge:        │ |
   |               | │        sum(lineitem       │ |
   |               | │        .l_extendedp       │ |
   |               | │ rice * Int64(1) - lineitem│ |
   |               | │            ...            │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │       AggregateExec       │ |
   |               | │    --------------------   │ |
   |               | │           aggr:           │ |
   |               | │ sum(lineitem.l_quantity), │ |
   |               | │        sum(lineitem       │ |
   |               | │      .l_extendedpric      │ |
   |               | │      e), sum(lineitem     │ |
   |               | │      .l_extendedprice     │ |
   |               | │    * Int64(1) - lineitem  │ |
   |               | │     .l_discount), sum     │ |
   |               | │         (lineitem         │ |
   |               | │       .l_extendedpri      │ |
   |               | │  ce * Int64(1) - lineitem │ |
   |               | │  .l_discount * Int64(1)   │ |
   |               | │   + lineitem.l_tax), avg  │ |
   |               | │   (lineitem.l_quantity),  │ |
   |               | │        avg(lineitem       │ |
   |               | │       .l_extendedpri      │ |
   |               | │     ce), avg(lineitem     │ |
   |               | │       .l_discount),       │ |
   |               | │          count(1)         │ |
   |               | │                           │ |
   |               | │         group_by:         │ |
   |               | │ l_returnflag, l_linestatus│ |
   |               | │                           │ |
   |               | │           mode:           │ |
   |               | │      FinalPartitioned     │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │    CoalesceBatchesExec    │ |
   |               | │    --------------------   │ |
   |               | │     target_batch_size:    │ |
   |               | │            8192           │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │      RepartitionExec      │ |
   |               | │    --------------------   │ |
   |               | │ partition_count(in->out): │ |
   |               | │          14 -> 14         │ |
   |               | │                           │ |
   |               | │    partitioning_scheme:   │ |
   |               | │   Hash([l_returnflag@0,   │ |
   |               | │    l_linestatus@1], 14)   │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │       AggregateExec       │ |
   |               | │    --------------------   │ |
   |               | │           aggr:           │ |
   |               | │ sum(lineitem.l_quantity), │ |
   |               | │        sum(lineitem       │ |
   |               | │      .l_extendedpric      │ |
   |               | │      e), sum(lineitem     │ |
   |               | │      .l_extendedprice     │ |
   |               | │    * Int64(1) - lineitem  │ |
   |               | │     .l_discount), sum     │ |
   |               | │         (lineitem         │ |
   |               | │       .l_extendedpri      │ |
   |               | │  ce * Int64(1) - lineitem │ |
   |               | │  .l_discount * Int64(1)   │ |
   |               | │   + lineitem.l_tax), avg  │ |
   |               | │   (lineitem.l_quantity),  │ |
   |               | │        avg(lineitem       │ |
   |               | │       .l_extendedpri      │ |
   |               | │     ce), avg(lineitem     │ |
   |               | │       .l_discount),       │ |
   |               | │          count(1)         │ |
   |               | │                           │ |
   |               | │         group_by:         │ |
   |               | │ l_returnflag, l_linestatus│ |
   |               | │                           │ |
   |               | │       mode: Partial       │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │       ProjectionExec      │ |
   |               | │    --------------------   │ |
   |               | │      __common_expr_1:     │ |
   |               | │ l_extendedprice * (Some(1)│ |
   |               | │    ,20,0 - l_discount)    │ |
   |               | │                           │ |
   |               | │        l_discount:        │ |
   |               | │         l_discount        │ |
   |               | │                           │ |
   |               | │      l_extendedprice:     │ |
   |               | │      l_extendedprice      │ |
   |               | │                           │ |
   |               | │       l_linestatus:       │ |
   |               | │        l_linestatus       │ |
   |               | │                           │ |
   |               | │        l_quantity:        │ |
   |               | │         l_quantity        │ |
   |               | │                           │ |
   |               | │       l_returnflag:       │ |
   |               | │        l_returnflag       │ |
   |               | │                           │ |
   |               | │        l_tax: l_tax       │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │    CoalesceBatchesExec    │ |
   |               | │    --------------------   │ |
   |               | │     target_batch_size:    │ |
   |               | │            8192           │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │         FilterExec        │ |
   |               | │    --------------------   │ |
   |               | │         predicate:        │ |
   |               | │  l_shipdate <= 1998-09-02 │ |
   |               | └─────────────┬─────────────┘ |
   |               | ┌─────────────┴─────────────┐ |
   |               | │       DataSourceExec      │ |
   |               | │    --------------------   │ |
   |               | │         files: 21         │ |
   |               | │      format: parquet      │ |
   |               | │                           │ |
   |               | │         predicate:        │ |
   |               | │  l_shipdate <= 1998-09-02 │ |
   |               | └───────────────────────────┘ |
   |               |                               |
   +---------------+-------------------------------+
   1 row(s) fetched.
   Elapsed 0.040 seconds.
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to