[ https://issues.apache.org/jira/browse/FLINK-37576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weijie Guo updated FLINK-37576: ------------------------------- Priority: Blocker (was: Major) > Batch job failed when submit JobGraph contains broadcast edge > ------------------------------------------------------------- > > Key: FLINK-37576 > URL: https://issues.apache.org/jira/browse/FLINK-37576 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 2.0.0 > Reporter: Weijie Guo > Priority: Blocker > > When we submit a job via JobGraph directly, it will raise if we have a > broadcast edge. IMO, We should correctly set the {{isBroadcast}} field in > constructor of > {{{}AllToAllBlockingResultInfo{}}}. > For example: > {code:java} > public class TestJob { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > // set min parallelism great then 1 > > conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", > "2"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > // env.disableOperatorChaining(); > // set to batch mode > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>()); > JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, > 2, 10); > } > } {code} > > {code:java} > Caused by: java.lang.IllegalArgumentException: > java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) > at > org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235) > at > org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185) > at > org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82) > at > org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195) > at > org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126) > at > org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237) > at > org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) > ... 29 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)