[ 
https://issues.apache.org/jira/browse/FLINK-31060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691366#comment-17691366
 ] 

dalongliu commented on FLINK-31060:
-----------------------------------

Verify step, I create a source table using datagen:

 
{code:java}
CREATE TABLE source (
    f0 INT,
    f1 BIGINT,
    f2 DOUBLE,
    f3 VARCHAR(30),
    f4 INT,
    f5 BIGINT,
    f6 FLOAT,
    f7 DOUBLE,
    f8 DECIMAL(10, 5),
    f9 DECIMAL(38, 18),
    f10 DATE,
    f11 TIMESTAMP,
    f12 ARRAY<BIGINT>)
WITH (
    'connector' = 'datagen',
     'number-of-rows' = '1000',
     'rows-per-second' = '100000'); {code}
 

 

I have verified the following case:
 # test the lower aggregation degree of group by key `f0` by the query `select 
f0, sum(f4), sum(f6) from source group by f0`, I saw the local hash agg is 
skipped via TM log, it meets the expectation
 # test the higher aggregation degree of group by key 'f0' by the query  
`select f0, sum(f4), sum(f6) from source group by f0`, I saw all the rows are 
aggregated in local hash agg phase via TM log, it meets  the expectation
 # If the query contains a function that doesn't support adaptive local hash 
agg such as `select f0, sum(f4), sum(f6), STDDEV_POP(f7) from source group by 
f0`, the adaptive local hash agg is skipped by the planner in compile phase, it 
meets the expectation. But the generated code by the planner contains the 
variable `localAggSuppressed`, this shouldn't be caused when the adaptive local 
hash agg is disabled by planner. It would be better if we could improve the 
code generate logic.
 # Set the option `table.exec.local-hash-agg.adaptive.enabled = false`, 
adaptive local hash agg is disabled by the planner, it meets the expectation
 #  

> Release Testing: Verify FLINK-30542 Support adaptive local hash aggregate in 
> runtime
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-31060
>                 URL: https://issues.apache.org/jira/browse/FLINK-31060
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.0
>            Reporter: Yunhong Zheng
>            Assignee: dalongliu
>            Priority: Major
>             Fix For: 1.17.0
>
>
> This issue aims to verify FLINK-30542: Support adaptive local hash aggregate 
> in runtime.
> Adaptive local hash aggregation is an optimization of local hash aggregation, 
> which can adaptively determine whether to continue to do local hash 
> aggregation according to the distinct value rate of sampling data. If 
> distinct value rate bigger than defined threshold (see parameter: 
> 'table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold'), we will 
> stop aggregating and just send the input data to the downstream after a 
> simple projection. Otherwise, we will continue to do aggregation.
> We can verify it in SQL client after we build the flink-dist package.
>  # Create a source table firstly. (Note: the source table need have different 
> degree of aggregation, means the distinct count can be controlled by source 
> connector, we recommend to modify dataGen table source to produce different 
> data with different distinct row number).
>  # Verify the result with different distinct value rate. (See: 
> table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold)
>  # Check the log in 'TM' to see whether the adaptive local hash aggregate 
> works.
> If you meet any problems, it's welcome to ping me directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to