[
https://issues.apache.org/jira/browse/FLINK-31114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lijie Wang updated FLINK-31114:
-------------------------------
Priority: Critical (was: Major)
> Batch job fails with IllegalStateException when using adaptive batch scheduler
> ------------------------------------------------------------------------------
>
> Key: FLINK-31114
> URL: https://issues.apache.org/jira/browse/FLINK-31114
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Reporter: Lijie Wang
> Assignee: Lijie Wang
> Priority: Critical
> Fix For: 1.17.0
>
>
> This is caused by FLINK-30942. Currently, if two job vertices have the same
> input and the same parallelism(even the parallelism is -1), they will share
> partitions. However after FLINK-30942, the scheduler may change the job
> vertices' parallelism before scheduling, resulting in two job vertices having
> the same parallelism in compilation phase (in which case will share
> partitions), but different parallelism in the scheduling phase, and then
> cause the following exception:
> {code:java}
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: Consumers must have the same max parallelism.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> ... 37 more
> Caused by: java.lang.IllegalStateException: Consumers must have the same max
> parallelism.
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at
> org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219)
> at
> org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501)
> at
> org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472)
> at
> org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431)
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> ... 38 more
> {code}
> Putting the following test into {{AdaptiveBatchSchedulerITCase}} can
> reproduce the problem:
> {code:java}
> @Test
> void testDifferentConsumerParallelism() throws Exception {
> final Configuration configuration = createConfiguration();
> final StreamExecutionEnvironment env =
>
> StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(8);
> final DataStream<Long> source1 =
> env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
> .setParallelism(8)
> .name("source1")
> .slotSharingGroup("group1");
> final DataStream<Long> source2 =
> env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
> .setParallelism(8)
> .name("source2")
> .slotSharingGroup("group2");
> source1.forward()
> .union(source2)
> .map(new NumberCounter())
> .name("map1")
> .slotSharingGroup("group3");
> source2.map(new
> NumberCounter()).name("map2").slotSharingGroup("group4");
> env.execute();
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)