[ 
https://issues.apache.org/jira/browse/IMPALA-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17912637#comment-17912637
 ] 

Riza Suminto edited comment on IMPALA-2945 at 1/13/25 8:43 PM:
---------------------------------------------------------------

We have discussion in IMPALA-13644 on how to apply IMPALA-2945 probability 
model correctly.
[https://gerrit.cloudera.org/c/22320/4/fe/src/main/java/org/apache/impala/planner/AggregationNode.java#347]
 

To clarify, there are two method that we are comparing here to estimate overall 
preagg output cardinality over N fragment instances:

{*}Method 1{*}, the old way, is PlanFragment.getPerInstanceNdvForCpuCosting().
For each grouping expression expr in exprs, apply the probability model to find 
perInstanceNdv just for that expr.
Then, combine all perInstanceNdv bu multiplying them to get perInstanceNdv for 
the whole grouping expression (exprs).
If this combined perInstanceNdv is greater than inputCardinality, then return 
that inputCardinality instead.

{*}Method 2{*}, the new way, is AggregationNode.estimatePreaggCardinality().
A globalNDV (across all fragment instances) for the whole grouping exprs is 
precomputed by AggregationNode.estimateNumGroups().
globalNdv has been subject to various bounding against inputCardinality and 
tuple-based analysis.
We then apply the probability model over this single globalNdv number, 
effectively treating the grouping expression list as if it is just one grouping 
expression over composite grouping columns.

I believe the Method 1 is flawed in the way it combined the perInstanceNdv of 
grouping expressions. Simple multiplication will most likely always exceed 
inputCardinality, so the whole probability model does not matter.

 

Lets use TPC-DS Q4 over 3000 scale as example and focus on 13:AGGREGATE 
[STREAMING]
{noformat}
|  13:AGGREGATE [STREAMING]
|  |  output: sum(((ss_ext_list_price - ss_ext_wholesale_cost - 
ss_ext_discount_amt) + ss_ext_sales_price) / CAST(2 AS DECIMAL(3,0)))
|  |  group by: c_customer_id, c_first_name, c_last_name, 
c_preferred_cust_flag, c_birth_country, c_login, c_email_address, d_year
|  |  mem-estimate=1.71GB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
|  |  tuple-ids=17 row-size=169B cardinality=1.31G cost=8141744291
|  |  in pipelines: 09(GETNEXT)
{noformat}
[https://gerrit.cloudera.org/c/22047/15/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q04.test#1225]

In this test case:
{noformat}
NumInstances = 120
InputCardinality = 1,621,900,725
PerInstanceInputCardinality = 1,621,900,725 / 120 = 13,515,839
TableCardinality(customer) = 30,000,000
{noformat}
Given table statistics for customer table described at
[https://github.com/apache/impala/blob/master/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/stats-3TB.json#L136]
We will have:
{noformat}
+--------------------------+------------+--------------------+------------+
|     Grouping Column      | globalNdv  | probValExistInInst | perInstNdv |
+--------------------------+------------+--------------------+------------+
| c_customer_id            | 30,000,000 |       0.3627084185 | 10,881,253 |
| c_first_name             |      5,042 |                  1 |      5,042 |
| c_last_name              |      4,963 |                  1 |      4,963 |
| c_preferred_cust_flag    |          2 |                  1 |          2 |
| c_birth_country          |        206 |                  1 |        206 |
| c_login                  |          0 |                  0 |          1 |
| c_email_address          | 28,790,839 |       0.3746534572 | 10,786,587 |
| d_year                   |          1 |                  1 |          1 |
| (composite_all_of_above) | 30,000,000 |       0.3627084185 | 10,881,253 |
+--------------------------+------------+--------------------+------------+
{noformat}
Note that d_year expression only have 1 globalNdv because existence of equality 
predicate below 13:AGGREGATE
{noformat}
|  |  10:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
|  |     HDFS partitions=1/1 files=1 size=2.17MB
|  |     predicates: tpcds_partitioned_parquet_snap.date_dim.d_year = CAST(2000 
AS INT)
{noformat}
customer table is full-scanned with some runtime filters applied.

With {*}method 1{*}, we will have combined perInstanceNdv = (10,881,253 * 5,042 
* 4,963 * 2 * 206 * 1 * 10,786,587 * 1) = 1,210,061,068,421,320,000,000,000. 
PlanFragment.getPerInstanceNdvForCpuCosting() will cap it at inputCardinality 
and return 1,621,900,725 instead.
If we multiply again by NumInstances, we'll get 194,628,087,000 as preagg 
output cardinality.

With {*}method 2{*}, we will have globalNdv(c_customer_id, c_first_name, 
c_last_name, c_preferred_cust_flag, c_birth_country, c_login, c_email_address, 
d_year) = 30,000,000.00, which is equals to TableCardinality(customer). 
AggregationNode.estimatePreaggCardinality() will estimate perInstanceNdv = 
10,881,253.
If we multiply again by NumInstances, we'll get 1,305,750,307 as preagg output 
cardinality.

 

I run Q4 in real cluster with same cluster size and data scale. Today, Impala 
planner estimate output cardinality of 13:AGGREGATE is 298.65M (full profile at 
[^tpcds_q4_baseline.txt]). Actual output cardinality, however, is 29.79M.
{noformat}
Operator                    #Hosts  #Inst   Avg Time   Max Time    #Rows  Est. 
#Rows   Peak Mem  Est. Peak Mem  Detail                                         
                                                                   
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
...
52:AGGREGATE                    10    120  222.067ms  396.001ms   29.79M       
1.62B   72.11 MB        2.28 GB  FINALIZE                                       
                                                                   
51:EXCHANGE                     10    120   12.833ms   95.999ms   29.79M       
1.62B    5.92 MB       30.29 MB  
HASH(c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year)
 
F03:EXCHANGE SENDER             10    120  686.468ms  928.004ms                 
        9.13 MB       81.16 MB                                                  
                                                                  
13:AGGREGATE                    10    120    3s781ms    5s624ms   29.79M       
1.62B   72.13 MB        1.71 GB  STREAMING                                      
                                                                   
12:HASH JOIN                    10    120    1s309ms    2s896ms    1.62B       
1.62B   33.12 KB              0  INNER JOIN, PARTITIONED
{noformat}
This shows that number from method 2 is still overestimate, but closer to 
actual output cardinality.


was (Author: rizaon):
We have discussion in IMPALA-13644 on how to apply IMPALA-2945 probability 
model correctly.
[https://gerrit.cloudera.org/c/22320/4/fe/src/main/java/org/apache/impala/planner/AggregationNode.java#347]
 

To clarify, there are two method that we are comparing here to estimate overall 
preagg output cardinality over N fragment instances:

{*}Method 1{*}, the old way, is PlanFragment.getPerInstanceNdvForCpuCosting().
For each grouping expression expr in exprs, apply the probability model to find 
perInstanceNdv just for that expr.
Then, combine all perInstanceNdv bu multiplying them to get perInstanceNdv for 
the whole grouping expression (exprs).
If this combined perInstanceNdv is greater than inputCardinality, then return 
that inputCardinality instead.

{*}Method 2{*}, the new way, is AggregationNode.estimatePreaggCardinality().
A globalNDV (across all fragment instances) for the whole grouping exprs is 
precomputed by AggregationNode.estimateNumGroups().
globalNdv has been subject to various bounding against inputCardinality and 
tuple-based analysis.
We then apply the probability model over this single globalNdv number, 
effectively treating the grouping expression list as if it is just one grouping 
expression over composite grouping columns.

I believe the Method 1 is flawed in the way it combined the perInstanceNdv of 
grouping expressions. Simple multiplication will most likely always exceed 
inputCardinality, so the whole probability model does not matter.

 

Lets use TPC-DS Q4 over 3000 scale as example and focus on 13:AGGREGATE 
[STREAMING]
{noformat}
|  13:AGGREGATE [STREAMING]
|  |  output: sum(((ss_ext_list_price - ss_ext_wholesale_cost - 
ss_ext_discount_amt) + ss_ext_sales_price) / CAST(2 AS DECIMAL(3,0)))
|  |  group by: c_customer_id, c_first_name, c_last_name, 
c_preferred_cust_flag, c_birth_country, c_login, c_email_address, d_year
|  |  mem-estimate=1.71GB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
|  |  tuple-ids=17 row-size=169B cardinality=1.31G cost=8141744291
|  |  in pipelines: 09(GETNEXT)
{noformat}
[https://gerrit.cloudera.org/c/22047/15/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q04.test#1225]

In this test case:
{noformat}
NumInstances = 120
InputCardinality = 1,659,515,219
PerInstanceInputCardinality = 1,659,515,219 / 120 = 13,829,293
TableCardinality(customer) = 30,000,000
{noformat}
Given table statistics for customer table described at
[https://github.com/apache/impala/blob/master/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/stats-3TB.json#L136]
We will have:
{noformat}
+--------------------------+------------+--------------------+------------+
|     Grouping Column      | globalNdv  | probValExistInInst | perInstNdv |
+--------------------------+------------+--------------------+------------+
| c_customer_id            | 30,000,000 |       0.3693324751 | 11,079,974 |
| c_first_name             |      5,042 |                  1 |      5,042 |
| c_last_name              |      4,963 |                  1 |      4,963 |
| c_preferred_cust_flag    |          2 |                  1 |          2 |
| c_birth_country          |        206 |                  1 |        206 |
| c_login                  |          0 |                  0 |          1 |
| c_email_address          | 28,790,839 |       0.3814248564 | 10,981,542 |
| d_year                   |          1 |                  1 |          1 |
| (composite_all_of_above) | 30,000,000 |       0.3693324751 | 11,079,974 |
+--------------------------+------------+--------------------+------------+
{noformat}
Note that d_year expression only have 1 globalNdv because existence of equality 
predicate below 13:AGGREGATE
{noformat}
|  |  10:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
|  |     HDFS partitions=1/1 files=1 size=2.17MB
|  |     predicates: tpcds_partitioned_parquet_snap.date_dim.d_year = CAST(2000 
AS INT)
{noformat}
customer table is full-scanned with some runtime filters applied.

With {*}method 1{*}, we will have combined perInstanceNdv = (11,079,974 * 5,042 
* 4,963 * 2 * 206 * 1 * 10,981,542 * 1 * 11,079,974) = 
1,254,429,897,152,900,000,000,000. 
PlanFragment.getPerInstanceNdvForCpuCosting() will cap it at inputCardinality 
and return 1,659,515,219 instead.
If we multiply again by NumInstances, we'll get 199,141,826,280 as preagg 
output cardinality.

With {*}method 2{*}, we will have globalNdv(c_customer_id, c_first_name, 
c_last_name, c_preferred_cust_flag, c_birth_country, c_login, c_email_address, 
d_year) = 30,000,000.00, which is equals to TableCardinality(customer). 
AggregationNode.estimatePreaggCardinality() will estimate perInstanceNdv = 
11,079,974.25.
If we multiply again by NumInstances, we'll get 1,329,596,910 as preagg output 
cardinality.

 

I run Q4 in real cluster with same cluster size and data scale. Today, Impala 
planner estimate output cardinality of 13:AGGREGATE is 298.65M (full profile at 
[^tpcds_q4_baseline.txt]). Actual output cardinality, however, is 29.79M.
{noformat}
Operator                    #Hosts  #Inst   Avg Time   Max Time    #Rows  Est. 
#Rows   Peak Mem  Est. Peak Mem  Detail                                         
                                                                   
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
...
52:AGGREGATE                    10    120  222.067ms  396.001ms   29.79M       
1.62B   72.11 MB        2.28 GB  FINALIZE                                       
                                                                   
51:EXCHANGE                     10    120   12.833ms   95.999ms   29.79M       
1.62B    5.92 MB       30.29 MB  
HASH(c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year)
 
F03:EXCHANGE SENDER             10    120  686.468ms  928.004ms                 
        9.13 MB       81.16 MB                                                  
                                                                  
13:AGGREGATE                    10    120    3s781ms    5s624ms   29.79M       
1.62B   72.13 MB        1.71 GB  STREAMING                                      
                                                                   
12:HASH JOIN                    10    120    1s309ms    2s896ms    1.62B       
1.62B   33.12 KB              0  INNER JOIN, PARTITIONED
{noformat}
This shows that number from method 2 is still overestimate, but closer to 
actual output cardinality.

> Pre-aggregation cardinality estimates do not take into account data 
> distribution
> --------------------------------------------------------------------------------
>
>                 Key: IMPALA-2945
>                 URL: https://issues.apache.org/jira/browse/IMPALA-2945
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Frontend
>    Affects Versions: Impala 2.0, Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, 
> Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0, Impala 
> 2.11.0, Impala 2.12.0
>            Reporter: Mostafa Mokhtar
>            Assignee: Riza Suminto
>            Priority: Major
>              Labels: planner, resource-management
>         Attachments: ESTIMATE_DUPLICATE_IN_PREAGG_FALSE.txt, 
> ESTIMATE_DUPLICATE_IN_PREAGG_TRUE.txt, tpcds_q4_baseline.txt
>
>
> When computing the per-host memory estimate for local aggregations, the 
> planner does not take into account that data is randomly distributed across 
> nodes leading to significant underestimation in some cases. The suggested fix 
> is to use min(agg input cardinality, NDV * #hosts) as the per-node 
> cardinality used for the per-node memory estimate.
> Impact: In the query below, the planner significantly underestimates the 
> per-node memory of agg node 03 to be 3.8GB but the actual is 24.77.
> Query
> {code}
> select sum(l_extendedprice) / 7.0 as avg_yearly
> from
>   lineitem,
>   part
> where
>   p_partkey = l_partkey
>   and p_brand = 'Brand#23'
>   and p_container = 'MED BOX'
>   and l_quantity < (
>     select
>       0.2 * avg(l_quantity)
>     from
>       lineitem
>     where
>       l_partkey = p_partkey
>   )
> {code}
> Plan
> {code}
> 12:AGGREGATE [FINALIZE]
> |  output: sum:merge(l_extendedprice)
> |  hosts=20 per-host-mem=unavailable
> |  tuple-ids=6 row-size=16B cardinality=1
> |
> 11:EXCHANGE [UNPARTITIONED]
> |  hosts=20 per-host-mem=unavailable
> |  tuple-ids=6 row-size=16B cardinality=1
> |
> 06:AGGREGATE
> |  output: sum(l_extendedprice)
> |  hosts=20 per-host-mem=10.00MB
> |  tuple-ids=6 row-size=16B cardinality=1
> |
> 05:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
> |  hash predicates: l_partkey = p_partkey
> |  other join predicates: l_quantity < 0.2 * avg(l_quantity)
> |  hosts=20 per-host-mem=125.18MB
> |  tuple-ids=0,1 row-size=80B cardinality=29992141
> |
> |--10:EXCHANGE [HASH(p_partkey)]
> |  |  hosts=20 per-host-mem=0B
> |  |  tuple-ids=0,1 row-size=80B cardinality=29992141
> |  |
> |  04:HASH JOIN [INNER JOIN, BROADCAST]
> |  |  hash predicates: l_partkey = p_partkey
> |  |  hosts=20 per-host-mem=58.30MB
> |  |  tuple-ids=0,1 row-size=80B cardinality=29992141
> |  |
> |  |--09:EXCHANGE [BROADCAST]
> |  |  |  hosts=20 per-host-mem=0B
> |  |  |  tuple-ids=1 row-size=56B cardinality=1000000
> |  |  |
> |  |  01:SCAN HDFS [tpch_1000_decimal_parquet.part, RANDOM]
> |  |     partitions=1/1 files=40 size=6.38GB
> |  |     predicates: p_brand = 'Brand#23', p_container = 'MED BOX'
> |  |     table stats: 200000000 rows total
> |  |     column stats: all
> |  |     hosts=20 per-host-mem=264.00MB
> |  |     tuple-ids=1 row-size=56B cardinality=1000000
> |  |
> |  00:SCAN HDFS [tpch_1000_decimal_parquet.lineitem, RANDOM]
> |     partitions=1/1 files=880 size=216.61GB
> |     table stats: 5999989709 rows total
> |     column stats: all
> |     hosts=20 per-host-mem=264.00MB
> |     tuple-ids=0 row-size=24B cardinality=5999989709
> |
> 08:AGGREGATE [FINALIZE]
> |  output: avg:merge(l_quantity)
> |  group by: l_partkey
> |  hosts=20 per-host-mem=167.89MB
> |  tuple-ids=4 row-size=16B cardinality=200052064
> |
> 07:EXCHANGE [HASH(l_partkey)]
> |  hosts=20 per-host-mem=0B
> |  tuple-ids=3 row-size=16B cardinality=200052064
> |
> 03:AGGREGATE
> |  output: avg(l_quantity)
> |  group by: l_partkey
> |  hosts=20 per-host-mem=3.28GB
> |  tuple-ids=3 row-size=16B cardinality=200052064
> |
> 02:SCAN HDFS [tpch_1000_decimal_parquet.lineitem, RANDOM]
>    partitions=1/1 files=880 size=216.61GB
>    table stats: 5999989709 rows total
>    column stats: all
>    hosts=20 per-host-mem=176.00MB
>    tuple-ids=2 row-size=16B cardinality=5999989709
> {code}
> Summary
> |Operator             ||#Hosts||   Avg Time||   Max Time||    #Rows  ||Est. 
> #Rows||   Peak Mem  ||Est. Peak Mem  ||Detail                   |      
> |12:AGGREGATE              |1  |256.620ms|  256.620ms|        1|           1| 
>   92.00 KB|        -1.00 B|  FINALIZE                       |
> |11:EXCHANGE               |1  |184.430us|  184.430us|       20|           1| 
>          0|        -1.00 B|  UNPARTITIONED                  |
> |06:AGGREGATE             |20  |364.045ms|    1s508ms|       20|           1| 
>    9.37 MB|       10.00 MB|                                 |
> |05:HASH JOIN             |20  |279.175ms|  304.600ms|  523.09K|      29.99M| 
>  155.04 MB|      125.18 MB|  RIGHT SEMI JOIN, PARTITIONED   |
> |I--10:EXCHANGE           |20   |22.448ms|   32.954ms|    5.98M|      29.99M| 
>          0|              0|  HASH(p_partkey)                |
> |I  04:HASH JOIN          |20   |25s417ms|   35s579ms|    5.98M|      29.99M| 
>  146.02 MB|       58.30 MB|  INNER JOIN, BROADCAST          |
> |I  I--09:EXCHANGE        |20   |16.270ms|   35.329ms|  199.30K|       1.00M| 
>          0|              0|  BROADCAST                     | 
> |I  I  01:SCAN HDFS       |20  |218.505ms|  331.299ms|  199.30K|       1.00M| 
>  173.43 MB|      264.00 MB|  tpch_1000_decimal_parquet.part| 
> |I  00:SCAN HDFS          |20    |1s365ms|    1s822ms|    6.00B|       6.00B| 
>    1.92 GB|      264.00 MB|  tpch_1000_decimal_parquet.l... |
> |08:AGGREGATE             |20   |29s706ms|   35s917ms|  200.00M|     200.05M| 
>    1.64 GB|      167.89 MB|  FINALIZE|                       
> |07:EXCHANGE              |20    |5s081ms|    8s410ms|    3.11B|     200.05M| 
>          0|              0|  HASH(l_partkey)|                
> |03:AGGREGATE             |20      |4m10s      5m12s |   3.11B|     200.05M|  
> 24.77 GB|        3.28 GB|                                 
> |02:SCAN HDFS             |20    |1s544ms|    2s517ms|    6.00B|       6.00B| 
>  838.85 MB|      176.00 MB|  tpch_1000_decimal_parquet.l... |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to