[ https://issues.apache.org/jira/browse/FLINK-37576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-37576: ------------------------------- Description: When we submit a job via JobGraph directly, it will raise if we have a broadcast edge. IMO, We should correctly set the \{{isBroadcast}} field in ctr of {{{}AllToAllBlockingResultInfo{}}}. For example: {code:java} public class TestJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // set min parallelism great then 1 conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", "2"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); // env.disableOperatorChaining(); // set to batch mode env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>()); JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, 2, 10); } } {code} {code:java} Caused by: java.lang.IllegalArgumentException: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235) at org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185) at org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82) at org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195) at org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126) at org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237) at org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ... 29 more {code} was: When we submit a job via JobGraph directly, it will raise if we have a broadcast edge. For example: {code:java} public class TestJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // set min parallelism great then 1 conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", "2"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); // env.disableOperatorChaining(); // set to batch mode env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>()); JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, 2, 10); } } {code} {code:java} Caused by: java.lang.IllegalArgumentException: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235) at org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185) at org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82) at org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195) at org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126) at org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237) at org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ... 29 more {code} > Batch job failed when submit JobGraph contains broadcast edge > ------------------------------------------------------------- > > Key: FLINK-37576 > URL: https://issues.apache.org/jira/browse/FLINK-37576 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 2.0.0 > Reporter: Weijie Guo > Priority: Major > > When we submit a job via JobGraph directly, it will raise if we have a > broadcast edge. IMO, We should correctly set the \{{isBroadcast}} field in > ctr of > {{{}AllToAllBlockingResultInfo{}}}. > For example: > {code:java} > public class TestJob { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > // set min parallelism great then 1 > > conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", > "2"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > // env.disableOperatorChaining(); > // set to batch mode > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>()); > JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, > 2, 10); > } > } {code} > > {code:java} > Caused by: java.lang.IllegalArgumentException: > java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) > at > org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235) > at > org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185) > at > org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82) > at > org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195) > at > org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126) > at > org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237) > at > org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) > ... 29 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)