[ 
https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501166#comment-17501166
 ] 

Lijie Wang commented on FLINK-26330:
------------------------------------

Thanks for your efforts and feedback, [~nsemmler]

 

-> However, the number of bytes received by the downstream tasks can exceed the 
data volume set by ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.

Yes, it does. Because {{ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK}} 
actually represents the *average* amount of data processed by each task (rather 
than the maximum amount of data), there may be cases where the amount of data 
actually processed by a task will exceed this value (especially with data 
skew). And I agree to make this clear in the documentation, otherwise it will 
confuse users.

 

-> Looking at the example above, I found that the three aggregators received 
the following data volume of records: 18 B, 4.12 KB and 16.6 KB or in records 
0, 200 and 801.

The adaptive batch scheduler currently cannot handle the data-skew produced by 
{{keyBy}} (which can also occur with the default scheduler). But it's in our 
future plan(See 
[FLIP-187|https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Auto-rebalancingofworkloads]
 for details).

 

-> Unfortunately even in this scenario the records are unevenly distributed.

In this case, the upstream task produces 4 subpartitions (250 records per 
subpartition), which are consumed by 3 downstream task, so that there will be 
one task consuming 2 subpartitions(500 records). This can be improved by 
increasing   {{ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM,}} when increased to 
40, records processed by different tasks will be: 325 325 351

 

I'm going to add a paragraph to the documentation to describe this behavior and 
clarify the meaning of  {{{}ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK{}}}: 
the *average* amount of data processed per task.

> Test Adaptive Batch Scheduler manually
> --------------------------------------
>
>                 Key: FLINK-26330
>                 URL: https://issues.apache.org/jira/browse/FLINK-26330
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Lijie Wang
>            Assignee: Niklas Semmler
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.15.0
>
>
> Documentation: [https://github.com/apache/flink/pull/18757]
> Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
> 1. Whether the automatically decided parallelism is correct
> 2. Whether the job result is correct
>  
> *For example:*
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.set(
>         JobManagerOptions.SCHEDULER, 
> JobManagerOptions.SchedulerType.AdaptiveBatch);
> configuration.setInteger(
>         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
> configuration.set(
>         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK,
>         MemorySize.parse("8kb"));
> configuration.setInteger("parallelism.default", -1);
> final StreamExecutionEnvironment env =
>         StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 1000).setParallelism(1)
>         .keyBy(num -> num % 10)
>         .sum(0)
>         .addSink(new PrintSinkFunction<>());
> env.execute(); {code}
> You can run above job and check:
>  
> 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3. 
> Jobmanager logs show following logs:
> {code:java}
> Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed 
> (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code}
> 2. The job result should be:
> {code:java}
> 50500
> 49600
> 49700
> 49800
> 49900
> 50000
> 50100
> 50200
> 50300
> 50400 {code}
>  
> You can change the amout of data produced by source and config options of 
> adaptive batch scheduler according your wishes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to