[ 
https://issues.apache.org/jira/browse/FLINK-26351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qiunan updated FLINK-26351:
---------------------------
    Description: 
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().

You can see the jobGraph parallelism recalculate.


 
{code:java}
vertexParallelism = determineParallelism(slotAllocator);
JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); 
 for (JobVertex vertex : adjustedJobGraph.getVertices()) {
     JobVertexID id = vertex.getID();   
      // use the determined "available parallelism" to use
   // the resources we have access to
    vertex.setParallelism(vertexParallelism.getParallelism(id));
  }{code}
 

 

  was:
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().

You can see the jobGraph parallelism recalculate.
private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
            createExecutionGraphWithAvailableResourcesAsync() {
        final VertexParallelism vertexParallelism;
        final VertexParallelismStore adjustedParallelismStore;
​
        try {
            vertexParallelism = determineParallelism(slotAllocator);
            JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
​
            for (JobVertex vertex : adjustedJobGraph.getVertices()) {
                JobVertexID id = vertex.getID();
​
                // use the determined "available parallelism" to use
                // the resources we have access to
                vertex.setParallelism(vertexParallelism.getParallelism(id));
            }
​
            // use the originally configured max parallelism
            // as the default for consistent runs
            adjustedParallelismStore =
                    computeVertexParallelismStoreForExecution(
                            adjustedJobGraph,
                            executionMode,
                            (vertex) -> {
                                VertexParallelismInformation 
vertexParallelismInfo =
                                        
initialParallelismStore.getParallelismInfo(vertex.getID());
                                return 
vertexParallelismInfo.getMaxParallelism();
                            });
        } catch (Exception exception) {
            return FutureUtils.completedExceptionally(exception);
        }
​
        return 
createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
                .thenApply(
                        executionGraph ->
                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                                        executionGraph, vertexParallelism));
    }


> After scaling a flink task running on k8s, the flink web ui graph always 
> shows the parallelism of the first deployment.
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26351
>                 URL: https://issues.apache.org/jira/browse/FLINK-26351
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0
>            Reporter: qiunan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
> {code:java}
>  @Override
>     public ExecutionGraphInfo requestJob() {
>         return new ExecutionGraphInfo(state.getJob(), 
> exceptionHistory.toArrayList());
>    } {code}
> This executionGraphInfo is task restart build and restore to state.
> AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
> You can see the jobGraph parallelism recalculate.
>  
> {code:java}
> vertexParallelism = determineParallelism(slotAllocator);
> JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); 
>  for (JobVertex vertex : adjustedJobGraph.getVertices()) {
>      JobVertexID id = vertex.getID();   
>       // use the determined "available parallelism" to use
>    // the resources we have access to
>     vertex.setParallelism(vertexParallelism.getParallelism(id));
>   }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to