[ 
https://issues.apache.org/jira/browse/FLINK-31079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691941#comment-17691941
 ] 

dalongliu commented on FLINK-31079:
-----------------------------------

I have finished the verification work, it works well. Looks good to me. The 
verification step is as follows:

1. create a source table by datagen source in SqlClient, it produces 100 rows, 
50  of which have the same key, creating a data skew case.
{code:java}
CREATE TABLE source (
    f0 INT,
    f1 BIGINT,
    f2 DOUBLE,
    f3 VARCHAR(30),
    f4 INT,
    f5 BIGINT,
    f6 FLOAT, 
    f7 DOUBLE,
    f8 DECIMAL(10, 5),
    f9 DECIMAL(38, 18),
    f10 DATE,
    f11 TIMESTAMP)
WITH (
    'connector' = 'datagen',
     'number-of-rows' = '100',
     'rows-per-second' = '100',
     'duplicated-field-num' = '1',
     'sample-rate' = '0.5') {code}
 

2. set option 
`execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task: 2k` in 
flink-conf.yaml

 

3. run a agg query, the hash key is field `f0', 
{code:java}
create table sink with('connector' = 'print') as select f0, sum(f4), sum(f6) 
from source group by f0; {code}
4. Watch the Flink UI, we can see task 1 only consumes skewed subpartition 
which contains  50 rows, other subpartition is processed by other tasks, the 

!image-2023-02-22-14-00-13-646.png!

> Release Testing: Verify FLINK-29663 Further improvements of adaptive batch 
> scheduler
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-31079
>                 URL: https://issues.apache.org/jira/browse/FLINK-31079
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Lijie Wang
>            Assignee: miamiaoxyz
>            Priority: Blocker
>             Fix For: 1.17.0
>
>         Attachments: image-2023-02-22-14-00-13-646.png
>
>
> This task aims to verify FLINK-29663 which improves the adaptive batch 
> scheduler.
> Before the change of FLINK-29663, adaptive batch scheduler will distribute 
> subpartitoins according to the number of subpartitions, make different 
> downstream subtasks consume roughly the same number of subpartitions. This 
> will lead to imbalance loads of different downstream tasks when the 
> subpartitions contain different amounts of data.
> To solve this problem, in FLINK-29663, we let the adaptive batch scheduler 
> distribute subpartitoins according to the amount of data, so that different 
> downstream subtasks consume roughly the same amount of data. Note that 
> currently it only takes effect for All-To-All edges.
> The documentation of adaptive scheduler can be found 
> [here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler]
> One can verify it by creating intended data skew on All-To-All edges.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to