[
https://issues.apache.org/jira/browse/FLINK-31079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691941#comment-17691941
]
dalongliu commented on FLINK-31079:
-----------------------------------
I have finished the verification work, it works well. Looks good to me. The
verification step is as follows:
1. create a source table by datagen source in SqlClient, it produces 100 rows,
50 of which have the same key, creating a data skew case.
{code:java}
CREATE TABLE source (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 VARCHAR(30),
f4 INT,
f5 BIGINT,
f6 FLOAT,
f7 DOUBLE,
f8 DECIMAL(10, 5),
f9 DECIMAL(38, 18),
f10 DATE,
f11 TIMESTAMP)
WITH (
'connector' = 'datagen',
'number-of-rows' = '100',
'rows-per-second' = '100',
'duplicated-field-num' = '1',
'sample-rate' = '0.5') {code}
2. set option
`execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task: 2k` in
flink-conf.yaml
3. run a agg query, the hash key is field `f0',
{code:java}
create table sink with('connector' = 'print') as select f0, sum(f4), sum(f6)
from source group by f0; {code}
4. Watch the Flink UI, we can see task 1 only consumes skewed subpartition
which contains 50 rows, other subpartition is processed by other tasks, the
!image-2023-02-22-14-00-13-646.png!
> Release Testing: Verify FLINK-29663 Further improvements of adaptive batch
> scheduler
> ------------------------------------------------------------------------------------
>
> Key: FLINK-31079
> URL: https://issues.apache.org/jira/browse/FLINK-31079
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Lijie Wang
> Assignee: miamiaoxyz
> Priority: Blocker
> Fix For: 1.17.0
>
> Attachments: image-2023-02-22-14-00-13-646.png
>
>
> This task aims to verify FLINK-29663 which improves the adaptive batch
> scheduler.
> Before the change of FLINK-29663, adaptive batch scheduler will distribute
> subpartitoins according to the number of subpartitions, make different
> downstream subtasks consume roughly the same number of subpartitions. This
> will lead to imbalance loads of different downstream tasks when the
> subpartitions contain different amounts of data.
> To solve this problem, in FLINK-29663, we let the adaptive batch scheduler
> distribute subpartitoins according to the amount of data, so that different
> downstream subtasks consume roughly the same amount of data. Note that
> currently it only takes effect for All-To-All edges.
> The documentation of adaptive scheduler can be found
> [here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler]
> One can verify it by creating intended data skew on All-To-All edges.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)