[
https://issues.apache.org/jira/browse/IMPALA-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17912637#comment-17912637
]
Riza Suminto edited comment on IMPALA-2945 at 1/13/25 8:43 PM:
---------------------------------------------------------------
We have discussion in IMPALA-13644 on how to apply IMPALA-2945 probability
model correctly.
[https://gerrit.cloudera.org/c/22320/4/fe/src/main/java/org/apache/impala/planner/AggregationNode.java#347]
To clarify, there are two method that we are comparing here to estimate overall
preagg output cardinality over N fragment instances:
{*}Method 1{*}, the old way, is PlanFragment.getPerInstanceNdvForCpuCosting().
For each grouping expression expr in exprs, apply the probability model to find
perInstanceNdv just for that expr.
Then, combine all perInstanceNdv bu multiplying them to get perInstanceNdv for
the whole grouping expression (exprs).
If this combined perInstanceNdv is greater than inputCardinality, then return
that inputCardinality instead.
{*}Method 2{*}, the new way, is AggregationNode.estimatePreaggCardinality().
A globalNDV (across all fragment instances) for the whole grouping exprs is
precomputed by AggregationNode.estimateNumGroups().
globalNdv has been subject to various bounding against inputCardinality and
tuple-based analysis.
We then apply the probability model over this single globalNdv number,
effectively treating the grouping expression list as if it is just one grouping
expression over composite grouping columns.
I believe the Method 1 is flawed in the way it combined the perInstanceNdv of
grouping expressions. Simple multiplication will most likely always exceed
inputCardinality, so the whole probability model does not matter.
Lets use TPC-DS Q4 over 3000 scale as example and focus on 13:AGGREGATE
[STREAMING]
{noformat}
| 13:AGGREGATE [STREAMING]
| | output: sum(((ss_ext_list_price - ss_ext_wholesale_cost -
ss_ext_discount_amt) + ss_ext_sales_price) / CAST(2 AS DECIMAL(3,0)))
| | group by: c_customer_id, c_first_name, c_last_name,
c_preferred_cust_flag, c_birth_country, c_login, c_email_address, d_year
| | mem-estimate=1.71GB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
| | tuple-ids=17 row-size=169B cardinality=1.31G cost=8141744291
| | in pipelines: 09(GETNEXT)
{noformat}
[https://gerrit.cloudera.org/c/22047/15/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q04.test#1225]
In this test case:
{noformat}
NumInstances = 120
InputCardinality = 1,621,900,725
PerInstanceInputCardinality = 1,621,900,725 / 120 = 13,515,839
TableCardinality(customer) = 30,000,000
{noformat}
Given table statistics for customer table described at
[https://github.com/apache/impala/blob/master/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/stats-3TB.json#L136]
We will have:
{noformat}
+--------------------------+------------+--------------------+------------+
| Grouping Column | globalNdv | probValExistInInst | perInstNdv |
+--------------------------+------------+--------------------+------------+
| c_customer_id | 30,000,000 | 0.3627084185 | 10,881,253 |
| c_first_name | 5,042 | 1 | 5,042 |
| c_last_name | 4,963 | 1 | 4,963 |
| c_preferred_cust_flag | 2 | 1 | 2 |
| c_birth_country | 206 | 1 | 206 |
| c_login | 0 | 0 | 1 |
| c_email_address | 28,790,839 | 0.3746534572 | 10,786,587 |
| d_year | 1 | 1 | 1 |
| (composite_all_of_above) | 30,000,000 | 0.3627084185 | 10,881,253 |
+--------------------------+------------+--------------------+------------+
{noformat}
Note that d_year expression only have 1 globalNdv because existence of equality
predicate below 13:AGGREGATE
{noformat}
| | 10:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
| | HDFS partitions=1/1 files=1 size=2.17MB
| | predicates: tpcds_partitioned_parquet_snap.date_dim.d_year = CAST(2000
AS INT)
{noformat}
customer table is full-scanned with some runtime filters applied.
With {*}method 1{*}, we will have combined perInstanceNdv = (10,881,253 * 5,042
* 4,963 * 2 * 206 * 1 * 10,786,587 * 1) = 1,210,061,068,421,320,000,000,000.
PlanFragment.getPerInstanceNdvForCpuCosting() will cap it at inputCardinality
and return 1,621,900,725 instead.
If we multiply again by NumInstances, we'll get 194,628,087,000 as preagg
output cardinality.
With {*}method 2{*}, we will have globalNdv(c_customer_id, c_first_name,
c_last_name, c_preferred_cust_flag, c_birth_country, c_login, c_email_address,
d_year) = 30,000,000.00, which is equals to TableCardinality(customer).
AggregationNode.estimatePreaggCardinality() will estimate perInstanceNdv =
10,881,253.
If we multiply again by NumInstances, we'll get 1,305,750,307 as preagg output
cardinality.
I run Q4 in real cluster with same cluster size and data scale. Today, Impala
planner estimate output cardinality of 13:AGGREGATE is 298.65M (full profile at
[^tpcds_q4_baseline.txt]). Actual output cardinality, however, is 29.79M.
{noformat}
Operator #Hosts #Inst Avg Time Max Time #Rows Est.
#Rows Peak Mem Est. Peak Mem Detail
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
...
52:AGGREGATE 10 120 222.067ms 396.001ms 29.79M
1.62B 72.11 MB 2.28 GB FINALIZE
51:EXCHANGE 10 120 12.833ms 95.999ms 29.79M
1.62B 5.92 MB 30.29 MB
HASH(c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year)
F03:EXCHANGE SENDER 10 120 686.468ms 928.004ms
9.13 MB 81.16 MB
13:AGGREGATE 10 120 3s781ms 5s624ms 29.79M
1.62B 72.13 MB 1.71 GB STREAMING
12:HASH JOIN 10 120 1s309ms 2s896ms 1.62B
1.62B 33.12 KB 0 INNER JOIN, PARTITIONED
{noformat}
This shows that number from method 2 is still overestimate, but closer to
actual output cardinality.
was (Author: rizaon):
We have discussion in IMPALA-13644 on how to apply IMPALA-2945 probability
model correctly.
[https://gerrit.cloudera.org/c/22320/4/fe/src/main/java/org/apache/impala/planner/AggregationNode.java#347]
To clarify, there are two method that we are comparing here to estimate overall
preagg output cardinality over N fragment instances:
{*}Method 1{*}, the old way, is PlanFragment.getPerInstanceNdvForCpuCosting().
For each grouping expression expr in exprs, apply the probability model to find
perInstanceNdv just for that expr.
Then, combine all perInstanceNdv bu multiplying them to get perInstanceNdv for
the whole grouping expression (exprs).
If this combined perInstanceNdv is greater than inputCardinality, then return
that inputCardinality instead.
{*}Method 2{*}, the new way, is AggregationNode.estimatePreaggCardinality().
A globalNDV (across all fragment instances) for the whole grouping exprs is
precomputed by AggregationNode.estimateNumGroups().
globalNdv has been subject to various bounding against inputCardinality and
tuple-based analysis.
We then apply the probability model over this single globalNdv number,
effectively treating the grouping expression list as if it is just one grouping
expression over composite grouping columns.
I believe the Method 1 is flawed in the way it combined the perInstanceNdv of
grouping expressions. Simple multiplication will most likely always exceed
inputCardinality, so the whole probability model does not matter.
Lets use TPC-DS Q4 over 3000 scale as example and focus on 13:AGGREGATE
[STREAMING]
{noformat}
| 13:AGGREGATE [STREAMING]
| | output: sum(((ss_ext_list_price - ss_ext_wholesale_cost -
ss_ext_discount_amt) + ss_ext_sales_price) / CAST(2 AS DECIMAL(3,0)))
| | group by: c_customer_id, c_first_name, c_last_name,
c_preferred_cust_flag, c_birth_country, c_login, c_email_address, d_year
| | mem-estimate=1.71GB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
| | tuple-ids=17 row-size=169B cardinality=1.31G cost=8141744291
| | in pipelines: 09(GETNEXT)
{noformat}
[https://gerrit.cloudera.org/c/22047/15/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-q04.test#1225]
In this test case:
{noformat}
NumInstances = 120
InputCardinality = 1,659,515,219
PerInstanceInputCardinality = 1,659,515,219 / 120 = 13,829,293
TableCardinality(customer) = 30,000,000
{noformat}
Given table statistics for customer table described at
[https://github.com/apache/impala/blob/master/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/stats-3TB.json#L136]
We will have:
{noformat}
+--------------------------+------------+--------------------+------------+
| Grouping Column | globalNdv | probValExistInInst | perInstNdv |
+--------------------------+------------+--------------------+------------+
| c_customer_id | 30,000,000 | 0.3693324751 | 11,079,974 |
| c_first_name | 5,042 | 1 | 5,042 |
| c_last_name | 4,963 | 1 | 4,963 |
| c_preferred_cust_flag | 2 | 1 | 2 |
| c_birth_country | 206 | 1 | 206 |
| c_login | 0 | 0 | 1 |
| c_email_address | 28,790,839 | 0.3814248564 | 10,981,542 |
| d_year | 1 | 1 | 1 |
| (composite_all_of_above) | 30,000,000 | 0.3693324751 | 11,079,974 |
+--------------------------+------------+--------------------+------------+
{noformat}
Note that d_year expression only have 1 globalNdv because existence of equality
predicate below 13:AGGREGATE
{noformat}
| | 10:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
| | HDFS partitions=1/1 files=1 size=2.17MB
| | predicates: tpcds_partitioned_parquet_snap.date_dim.d_year = CAST(2000
AS INT)
{noformat}
customer table is full-scanned with some runtime filters applied.
With {*}method 1{*}, we will have combined perInstanceNdv = (11,079,974 * 5,042
* 4,963 * 2 * 206 * 1 * 10,981,542 * 1 * 11,079,974) =
1,254,429,897,152,900,000,000,000.
PlanFragment.getPerInstanceNdvForCpuCosting() will cap it at inputCardinality
and return 1,659,515,219 instead.
If we multiply again by NumInstances, we'll get 199,141,826,280 as preagg
output cardinality.
With {*}method 2{*}, we will have globalNdv(c_customer_id, c_first_name,
c_last_name, c_preferred_cust_flag, c_birth_country, c_login, c_email_address,
d_year) = 30,000,000.00, which is equals to TableCardinality(customer).
AggregationNode.estimatePreaggCardinality() will estimate perInstanceNdv =
11,079,974.25.
If we multiply again by NumInstances, we'll get 1,329,596,910 as preagg output
cardinality.
I run Q4 in real cluster with same cluster size and data scale. Today, Impala
planner estimate output cardinality of 13:AGGREGATE is 298.65M (full profile at
[^tpcds_q4_baseline.txt]). Actual output cardinality, however, is 29.79M.
{noformat}
Operator #Hosts #Inst Avg Time Max Time #Rows Est.
#Rows Peak Mem Est. Peak Mem Detail
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
...
52:AGGREGATE 10 120 222.067ms 396.001ms 29.79M
1.62B 72.11 MB 2.28 GB FINALIZE
51:EXCHANGE 10 120 12.833ms 95.999ms 29.79M
1.62B 5.92 MB 30.29 MB
HASH(c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year)
F03:EXCHANGE SENDER 10 120 686.468ms 928.004ms
9.13 MB 81.16 MB
13:AGGREGATE 10 120 3s781ms 5s624ms 29.79M
1.62B 72.13 MB 1.71 GB STREAMING
12:HASH JOIN 10 120 1s309ms 2s896ms 1.62B
1.62B 33.12 KB 0 INNER JOIN, PARTITIONED
{noformat}
This shows that number from method 2 is still overestimate, but closer to
actual output cardinality.
> Pre-aggregation cardinality estimates do not take into account data
> distribution
> --------------------------------------------------------------------------------
>
> Key: IMPALA-2945
> URL: https://issues.apache.org/jira/browse/IMPALA-2945
> Project: IMPALA
> Issue Type: Improvement
> Components: Frontend
> Affects Versions: Impala 2.0, Impala 2.3.0, Impala 2.5.0, Impala 2.4.0,
> Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0, Impala
> 2.11.0, Impala 2.12.0
> Reporter: Mostafa Mokhtar
> Assignee: Riza Suminto
> Priority: Major
> Labels: planner, resource-management
> Attachments: ESTIMATE_DUPLICATE_IN_PREAGG_FALSE.txt,
> ESTIMATE_DUPLICATE_IN_PREAGG_TRUE.txt, tpcds_q4_baseline.txt
>
>
> When computing the per-host memory estimate for local aggregations, the
> planner does not take into account that data is randomly distributed across
> nodes leading to significant underestimation in some cases. The suggested fix
> is to use min(agg input cardinality, NDV * #hosts) as the per-node
> cardinality used for the per-node memory estimate.
> Impact: In the query below, the planner significantly underestimates the
> per-node memory of agg node 03 to be 3.8GB but the actual is 24.77.
> Query
> {code}
> select sum(l_extendedprice) / 7.0 as avg_yearly
> from
> lineitem,
> part
> where
> p_partkey = l_partkey
> and p_brand = 'Brand#23'
> and p_container = 'MED BOX'
> and l_quantity < (
> select
> 0.2 * avg(l_quantity)
> from
> lineitem
> where
> l_partkey = p_partkey
> )
> {code}
> Plan
> {code}
> 12:AGGREGATE [FINALIZE]
> | output: sum:merge(l_extendedprice)
> | hosts=20 per-host-mem=unavailable
> | tuple-ids=6 row-size=16B cardinality=1
> |
> 11:EXCHANGE [UNPARTITIONED]
> | hosts=20 per-host-mem=unavailable
> | tuple-ids=6 row-size=16B cardinality=1
> |
> 06:AGGREGATE
> | output: sum(l_extendedprice)
> | hosts=20 per-host-mem=10.00MB
> | tuple-ids=6 row-size=16B cardinality=1
> |
> 05:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
> | hash predicates: l_partkey = p_partkey
> | other join predicates: l_quantity < 0.2 * avg(l_quantity)
> | hosts=20 per-host-mem=125.18MB
> | tuple-ids=0,1 row-size=80B cardinality=29992141
> |
> |--10:EXCHANGE [HASH(p_partkey)]
> | | hosts=20 per-host-mem=0B
> | | tuple-ids=0,1 row-size=80B cardinality=29992141
> | |
> | 04:HASH JOIN [INNER JOIN, BROADCAST]
> | | hash predicates: l_partkey = p_partkey
> | | hosts=20 per-host-mem=58.30MB
> | | tuple-ids=0,1 row-size=80B cardinality=29992141
> | |
> | |--09:EXCHANGE [BROADCAST]
> | | | hosts=20 per-host-mem=0B
> | | | tuple-ids=1 row-size=56B cardinality=1000000
> | | |
> | | 01:SCAN HDFS [tpch_1000_decimal_parquet.part, RANDOM]
> | | partitions=1/1 files=40 size=6.38GB
> | | predicates: p_brand = 'Brand#23', p_container = 'MED BOX'
> | | table stats: 200000000 rows total
> | | column stats: all
> | | hosts=20 per-host-mem=264.00MB
> | | tuple-ids=1 row-size=56B cardinality=1000000
> | |
> | 00:SCAN HDFS [tpch_1000_decimal_parquet.lineitem, RANDOM]
> | partitions=1/1 files=880 size=216.61GB
> | table stats: 5999989709 rows total
> | column stats: all
> | hosts=20 per-host-mem=264.00MB
> | tuple-ids=0 row-size=24B cardinality=5999989709
> |
> 08:AGGREGATE [FINALIZE]
> | output: avg:merge(l_quantity)
> | group by: l_partkey
> | hosts=20 per-host-mem=167.89MB
> | tuple-ids=4 row-size=16B cardinality=200052064
> |
> 07:EXCHANGE [HASH(l_partkey)]
> | hosts=20 per-host-mem=0B
> | tuple-ids=3 row-size=16B cardinality=200052064
> |
> 03:AGGREGATE
> | output: avg(l_quantity)
> | group by: l_partkey
> | hosts=20 per-host-mem=3.28GB
> | tuple-ids=3 row-size=16B cardinality=200052064
> |
> 02:SCAN HDFS [tpch_1000_decimal_parquet.lineitem, RANDOM]
> partitions=1/1 files=880 size=216.61GB
> table stats: 5999989709 rows total
> column stats: all
> hosts=20 per-host-mem=176.00MB
> tuple-ids=2 row-size=16B cardinality=5999989709
> {code}
> Summary
> |Operator ||#Hosts|| Avg Time|| Max Time|| #Rows ||Est.
> #Rows|| Peak Mem ||Est. Peak Mem ||Detail |
> |12:AGGREGATE |1 |256.620ms| 256.620ms| 1| 1|
> 92.00 KB| -1.00 B| FINALIZE |
> |11:EXCHANGE |1 |184.430us| 184.430us| 20| 1|
> 0| -1.00 B| UNPARTITIONED |
> |06:AGGREGATE |20 |364.045ms| 1s508ms| 20| 1|
> 9.37 MB| 10.00 MB| |
> |05:HASH JOIN |20 |279.175ms| 304.600ms| 523.09K| 29.99M|
> 155.04 MB| 125.18 MB| RIGHT SEMI JOIN, PARTITIONED |
> |I--10:EXCHANGE |20 |22.448ms| 32.954ms| 5.98M| 29.99M|
> 0| 0| HASH(p_partkey) |
> |I 04:HASH JOIN |20 |25s417ms| 35s579ms| 5.98M| 29.99M|
> 146.02 MB| 58.30 MB| INNER JOIN, BROADCAST |
> |I I--09:EXCHANGE |20 |16.270ms| 35.329ms| 199.30K| 1.00M|
> 0| 0| BROADCAST |
> |I I 01:SCAN HDFS |20 |218.505ms| 331.299ms| 199.30K| 1.00M|
> 173.43 MB| 264.00 MB| tpch_1000_decimal_parquet.part|
> |I 00:SCAN HDFS |20 |1s365ms| 1s822ms| 6.00B| 6.00B|
> 1.92 GB| 264.00 MB| tpch_1000_decimal_parquet.l... |
> |08:AGGREGATE |20 |29s706ms| 35s917ms| 200.00M| 200.05M|
> 1.64 GB| 167.89 MB| FINALIZE|
> |07:EXCHANGE |20 |5s081ms| 8s410ms| 3.11B| 200.05M|
> 0| 0| HASH(l_partkey)|
> |03:AGGREGATE |20 |4m10s 5m12s | 3.11B| 200.05M|
> 24.77 GB| 3.28 GB|
> |02:SCAN HDFS |20 |1s544ms| 2s517ms| 6.00B| 6.00B|
> 838.85 MB| 176.00 MB| tpch_1000_decimal_parquet.l... |
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]