[
https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502314#comment-17502314
]
Lijie Wang commented on FLINK-26330:
------------------------------------
About 0.75~1.5:
For a simple example, we assume that the adjusted parallelism is 8 (M = 3), and
the original parallelism should be 6~12 (Because 4~5 is closer to 4, 13~16 is
closer to 16). When the parallelism is adjusted from 6 to 8, the average amount
of data processed by task becomes 0.75 times of the original. When the
parallelism is adjusted from 12 to 8, the average amount of data processed by
task becomes 1.5 times of the original. So the range is 0.75 ~ 1.5. The same is
true when M is any other value.
Finally, thank you again for pointing out the uneven distribution problem, it's
really valuable :)
> Test Adaptive Batch Scheduler manually
> --------------------------------------
>
> Key: FLINK-26330
> URL: https://issues.apache.org/jira/browse/FLINK-26330
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Lijie Wang
> Assignee: Niklas Semmler
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.15.0
>
>
> Documentation: [https://github.com/apache/flink/pull/18757]
> Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
> 1. Whether the automatically decided parallelism is correct
> 2. Whether the job result is correct
>
> *For example:*
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.set(
> JobManagerOptions.SCHEDULER,
> JobManagerOptions.SchedulerType.AdaptiveBatch);
> configuration.setInteger(
> JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
> configuration.set(
> JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK,
> MemorySize.parse("8kb"));
> configuration.setInteger("parallelism.default", -1);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 1000).setParallelism(1)
> .keyBy(num -> num % 10)
> .sum(0)
> .addSink(new PrintSinkFunction<>());
> env.execute(); {code}
> You can run above job and check:
>
> 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3.
> Jobmanager logs show following logs:
> {code:java}
> Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed
> (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code}
> 2. The job result should be:
> {code:java}
> 50500
> 49600
> 49700
> 49800
> 49900
> 50000
> 50100
> 50200
> 50300
> 50400 {code}
>
> You can change the amout of data produced by source and config options of
> adaptive batch scheduler according your wishes.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)