[ 
https://issues.apache.org/jira/browse/FLINK-37576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-37576:
-------------------------------
    Priority: Blocker  (was: Major)

> Batch job failed when submit JobGraph contains broadcast edge
> -------------------------------------------------------------
>
>                 Key: FLINK-37576
>                 URL: https://issues.apache.org/jira/browse/FLINK-37576
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.0.0
>            Reporter: Weijie Guo
>            Priority: Blocker
>
> When we submit a job via JobGraph directly, it will raise if we have a 
> broadcast edge. IMO, We should correctly set the {{isBroadcast}} field in 
> constructor of 
> {{{}AllToAllBlockingResultInfo{}}}.
> For example:
> {code:java}
> public class TestJob {
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         // set min parallelism great then 1
>         
> conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", 
> "2");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         // env.disableOperatorChaining();
>         // set to batch mode
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>());
>         JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, 
> 2, 10);
>     }
> } {code}
>  
> {code:java}
> Caused by: java.lang.IllegalArgumentException: 
> java.lang.IllegalArgumentException
>     at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
>     at 
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235)
>     at 
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185)
>     at 
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417)
>     at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>     ... 29 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to