[ 
https://issues.apache.org/jira/browse/FLINK-30942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-30942:
-------------------------------
    Description: 
Currently, when using the adaptive batch scheduler, the vertex parallelism 
decided by  forward group may be larger than the global max parallelism(which 
is configured by option {{parallelism.default}} or 
{{execution.batch.adaptive.auto-parallelism.max-parallelism}}, see FLINK-30686 
for details), which will cause the following exception:

{code:java}
Caused by: java.lang.IllegalArgumentException: Vertex's parallelism should be 
smaller than or equal to vertex's max parallelism.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo.setParallelism(DefaultVertexParallelismInfo.java:95)
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.setParallelism(ExecutionJobVertex.java:317)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.changeJobVertexParallelism(AdaptiveBatchScheduler.java:385)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:284)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:183)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:745)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
        ... 30 more
{code}

Following code can reproduce the above exception:

{code:java}
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
configuration.setInteger(
        BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2);
configuration.set(
        
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
        MemorySize.parse("150kb"));
configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
MemorySize.parse("4kb"));
configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);

final StreamExecutionEnvironment env =
        StreamExecutionEnvironment.createLocalEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(4);

final DataStream<Long> source =
        env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                .setParallelism(4)
                .name("source")
                .slotSharingGroup("group1");

source.forward().map(new 
NumberCounter()).name("map").slotSharingGroup("group2");
env.execute();
{code}



  was:
Currently, when using the adaptive batch scheduler, the vertex parallelism 
decided by  forward group may be larger than the global max parallelism(which 
is configured by option {{parallelism.default}} or 
{{execution.batch.adaptive.auto-parallelism.max-parallelism}}, see FLINK-30686 
for details), which will cause the following exception:

{code:java}
Caused by: java.lang.IllegalArgumentException: Vertex's parallelism should be 
smaller than or equal to vertex's max parallelism.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo.setParallelism(DefaultVertexParallelismInfo.java:95)
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.setParallelism(ExecutionJobVertex.java:317)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.changeJobVertexParallelism(AdaptiveBatchScheduler.java:385)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:284)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:183)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:745)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
        ... 30 more
{code}




> Fix the bug that the decided parallelism by adaptive batch scheduler may be 
> larger than the max parallelism
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30942
>                 URL: https://issues.apache.org/jira/browse/FLINK-30942
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>            Reporter: Lijie Wang
>            Assignee: Lijie Wang
>            Priority: Major
>             Fix For: 1.17.0
>
>
> Currently, when using the adaptive batch scheduler, the vertex parallelism 
> decided by  forward group may be larger than the global max parallelism(which 
> is configured by option {{parallelism.default}} or 
> {{execution.batch.adaptive.auto-parallelism.max-parallelism}}, see 
> FLINK-30686 for details), which will cause the following exception:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Vertex's parallelism should be 
> smaller than or equal to vertex's max parallelism.
>       at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>       at 
> org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo.setParallelism(DefaultVertexParallelismInfo.java:95)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.setParallelism(ExecutionJobVertex.java:317)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.changeJobVertexParallelism(AdaptiveBatchScheduler.java:385)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:284)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:183)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:745)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
>       ... 30 more
> {code}
> Following code can reproduce the above exception:
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.setString(RestOptions.BIND_PORT, "0");
> configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
> configuration.setInteger(
>         BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2);
> configuration.set(
>         
> BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
>         MemorySize.parse("150kb"));
> configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
> MemorySize.parse("4kb"));
> configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
> final StreamExecutionEnvironment env =
>         StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(4);
> final DataStream<Long> source =
>         env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
>                 .setParallelism(4)
>                 .name("source")
>                 .slotSharingGroup("group1");
> source.forward().map(new 
> NumberCounter()).name("map").slotSharingGroup("group2");
> env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to