[
https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500837#comment-17500837
]
Niklas Semmler commented on FLINK-26330:
----------------------------------------
I've run the test. Indeed, the results are correct and the
AdaptiveBatchScheduler scales according to the input data.
However, the number of bytes received by the downstream tasks can exceed the
data volume set by ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.
Looking at the example above, I found that the three aggregators received the
following data volume of records: 18 B, 4.12 KB and 16.6 KB or in records 0,
200 and 801.
To test this further I replaced the keyBy->sum with a simple mapper:
{code}env.fromSequence(0, 1000).setParallelism(1)
.disableChaining()
.map(x -> x + 1)
.print();{code}
Unfortunately even in this scenario the records are unevenly distributed. The
tasks received 5.17 KB, 5.15 KB and 10.3 KB or 251, 250 and 500 records. Adding
a shuffle or a rebalance before the map did nothing to solve the problem. Same
goes for increasing the number of input numbers to 100000.
I also tried the same experiments with a fix parallelism. It did nothing to
solve the keyed data stream, but it resolved the map scenario.
I think it would be good to describe this behavior better and maybe to give
some hints on how the data volume should be set. Otherwise this risks to
produce an OOM exception for users thinking that the data volume per task
should be roughly equal to the available RAM of the TaskManager.
> Test Adaptive Batch Scheduler manually
> --------------------------------------
>
> Key: FLINK-26330
> URL: https://issues.apache.org/jira/browse/FLINK-26330
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Lijie Wang
> Assignee: Niklas Semmler
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.15.0
>
>
> Documentation: [https://github.com/apache/flink/pull/18757]
> Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
> 1. Whether the automatically decided parallelism is correct
> 2. Whether the job result is correct
>
> *For example:*
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.set(
> JobManagerOptions.SCHEDULER,
> JobManagerOptions.SchedulerType.AdaptiveBatch);
> configuration.setInteger(
> JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
> configuration.set(
> JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK,
> MemorySize.parse("8kb"));
> configuration.setInteger("parallelism.default", -1);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 1000).setParallelism(1)
> .keyBy(num -> num % 10)
> .sum(0)
> .addSink(new PrintSinkFunction<>());
> env.execute(); {code}
> You can run above job and check:
>
> 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3.
> Jobmanager logs show following logs:
> {code:java}
> Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed
> (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code}
> 2. The job result should be:
> {code:java}
> 50500
> 49600
> 49700
> 49800
> 49900
> 50000
> 50100
> 50200
> 50300
> 50400 {code}
>
> You can change the amout of data produced by source and config options of
> adaptive batch scheduler according your wishes.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)