[ 
https://issues.apache.org/jira/browse/FLINK-37576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-37576:
-------------------------------
    Description: 
When we submit a job via JobGraph directly, it will raise if we have a 
broadcast edge. IMO, We should correctly set the \{{isBroadcast}} field in ctr 
of 
{{{}AllToAllBlockingResultInfo{}}}.
For example:
{code:java}
public class TestJob {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // set min parallelism great then 1
        
conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", 
"2");
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
        // env.disableOperatorChaining();
        // set to batch mode
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>());
        JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, 
2, 10);
    }
} {code}
 
{code:java}
Caused by: java.lang.IllegalArgumentException: 
java.lang.IllegalArgumentException
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
    at 
org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235)
    at 
org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185)
    at 
org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417)
    at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    ... 29 more {code}

  was:
When we submit a job via JobGraph directly, it will raise if we have a 
broadcast edge. 

For example:
{code:java}
public class TestJob {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // set min parallelism great then 1
        
conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", 
"2");
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
        // env.disableOperatorChaining();
        // set to batch mode
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>());
        JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, 
2, 10);
    }
} {code}
 
{code:java}
Caused by: java.lang.IllegalArgumentException: 
java.lang.IllegalArgumentException
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
    at 
org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235)
    at 
org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185)
    at 
org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417)
    at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    ... 29 more {code}


> Batch job failed when submit JobGraph contains broadcast edge
> -------------------------------------------------------------
>
>                 Key: FLINK-37576
>                 URL: https://issues.apache.org/jira/browse/FLINK-37576
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.0.0
>            Reporter: Weijie Guo
>            Priority: Major
>
> When we submit a job via JobGraph directly, it will raise if we have a 
> broadcast edge. IMO, We should correctly set the \{{isBroadcast}} field in 
> ctr of 
> {{{}AllToAllBlockingResultInfo{}}}.
> For example:
> {code:java}
> public class TestJob {
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         // set min parallelism great then 1
>         
> conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism", 
> "2");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         // env.disableOperatorChaining();
>         // set to batch mode
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>());
>         JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf, 
> 2, 10);
>     }
> } {code}
>  
> {code:java}
> Caused by: java.lang.IllegalArgumentException: 
> java.lang.IllegalArgumentException
>     at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
>     at 
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235)
>     at 
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185)
>     at 
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417)
>     at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>     ... 29 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to