[
https://issues.apache.org/jira/browse/FLINK-31060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691366#comment-17691366
]
dalongliu commented on FLINK-31060:
-----------------------------------
Verify step, I create a source table using datagen:
{code:java}
CREATE TABLE source (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 VARCHAR(30),
f4 INT,
f5 BIGINT,
f6 FLOAT,
f7 DOUBLE,
f8 DECIMAL(10, 5),
f9 DECIMAL(38, 18),
f10 DATE,
f11 TIMESTAMP,
f12 ARRAY<BIGINT>)
WITH (
'connector' = 'datagen',
'number-of-rows' = '1000',
'rows-per-second' = '100000'); {code}
I have verified the following case:
# test the lower aggregation degree of group by key `f0` by the query `select
f0, sum(f4), sum(f6) from source group by f0`, I saw the local hash agg is
skipped via TM log, it meets the expectation
# test the higher aggregation degree of group by key 'f0' by the query
`select f0, sum(f4), sum(f6) from source group by f0`, I saw all the rows are
aggregated in local hash agg phase via TM log, it meets the expectation
# If the query contains a function that doesn't support adaptive local hash
agg such as `select f0, sum(f4), sum(f6), STDDEV_POP(f7) from source group by
f0`, the adaptive local hash agg is skipped by the planner in compile phase, it
meets the expectation. But the generated code by the planner contains the
variable `localAggSuppressed`, this shouldn't be caused when the adaptive local
hash agg is disabled by planner. It would be better if we could improve the
code generate logic.
# Set the option `table.exec.local-hash-agg.adaptive.enabled = false`,
adaptive local hash agg is disabled by the planner, it meets the expectation
#
> Release Testing: Verify FLINK-30542 Support adaptive local hash aggregate in
> runtime
> ------------------------------------------------------------------------------------
>
> Key: FLINK-31060
> URL: https://issues.apache.org/jira/browse/FLINK-31060
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Runtime
> Affects Versions: 1.17.0
> Reporter: Yunhong Zheng
> Assignee: dalongliu
> Priority: Major
> Fix For: 1.17.0
>
>
> This issue aims to verify FLINK-30542: Support adaptive local hash aggregate
> in runtime.
> Adaptive local hash aggregation is an optimization of local hash aggregation,
> which can adaptively determine whether to continue to do local hash
> aggregation according to the distinct value rate of sampling data. If
> distinct value rate bigger than defined threshold (see parameter:
> 'table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold'), we will
> stop aggregating and just send the input data to the downstream after a
> simple projection. Otherwise, we will continue to do aggregation.
> We can verify it in SQL client after we build the flink-dist package.
> # Create a source table firstly. (Note: the source table need have different
> degree of aggregation, means the distinct count can be controlled by source
> connector, we recommend to modify dataGen table source to produce different
> data with different distinct row number).
> # Verify the result with different distinct value rate. (See:
> table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold)
> # Check the log in 'TM' to see whether the adaptive local hash aggregate
> works.
> If you meet any problems, it's welcome to ping me directly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)