[
https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504890#comment-17504890
]
Lijie Wang commented on FLINK-26330:
------------------------------------
Hi [~nsemmler] , I will try to answer the two questions:
-> I am not sure how exactly output partitions are distributed to downstream
tasks. I think this uses a round-robin mechanism.
Yes, round-robin, and the subpartitions consumed by the same Reducer are
consecutive. For example, 16 subpartitions evenly distributed to 8 Reducers
should be 0,1 | 2,3 | 4,5 | 6,7 | 8,9 | 10, 11 | 12, 13 | 14, 15
-> I am not sure how this differs when you use multiple Mappers. Will this even
out the distribution, because each Mapper distributes its volume differently? I
would have to look into this further.
Currently, subpartitions with the same index of different Mappers will be
consumed by the same Reducer. For example, if we have 2 Mapper, both the
subpartitions [0,1] of Mapper1 and the subpartitions [0,1] of Mapper2 will be
consumed by Reducer1.
> Test Adaptive Batch Scheduler manually
> --------------------------------------
>
> Key: FLINK-26330
> URL: https://issues.apache.org/jira/browse/FLINK-26330
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Lijie Wang
> Assignee: Niklas Semmler
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.15.0
>
>
> Documentation: [https://github.com/apache/flink/pull/18757]
> Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
> 1. Whether the automatically decided parallelism is correct
> 2. Whether the job result is correct
>
> *For example:*
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.set(
> JobManagerOptions.SCHEDULER,
> JobManagerOptions.SchedulerType.AdaptiveBatch);
> configuration.setInteger(
> JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
> configuration.set(
> JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK,
> MemorySize.parse("8kb"));
> configuration.setInteger("parallelism.default", -1);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 1000).setParallelism(1)
> .keyBy(num -> num % 10)
> .sum(0)
> .addSink(new PrintSinkFunction<>());
> env.execute(); {code}
> You can run above job and check:
>
> 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3.
> Jobmanager logs show following logs:
> {code:java}
> Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed
> (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code}
> 2. The job result should be:
> {code:java}
> 50500
> 49600
> 49700
> 49800
> 49900
> 50000
> 50100
> 50200
> 50300
> 50400 {code}
>
> You can change the amout of data produced by source and config options of
> adaptive batch scheduler according your wishes.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)