[ 
https://issues.apache.org/jira/browse/FLINK-26351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qiunan updated FLINK-26351:
---------------------------
    Description: 
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
@Override
public ExecutionGraphInfo requestJob()
{ // no exception history support is added for now (see FLINK-21439) return new 
ExecutionGraphInfo(state.getJob()); } {code}
This executionGraphInfo is task restart build.
{code:java}
  private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>   
         createExecutionGraphWithAvailableResourcesAsync() {        final 
VertexParallelism vertexParallelism;        final VertexParallelismStore 
adjustedParallelismStore;
        try {            vertexParallelism = 
determineParallelism(slotAllocator);            JobGraph adjustedJobGraph = 
jobInformation.copyJobGraph();
            for (JobVertex vertex : adjustedJobGraph.getVertices()) {           
     JobVertexID id = vertex.getID();
                // use the determined "available parallelism" to use            
    // the resources we have access to                
vertex.setParallelism(vertexParallelism.getParallelism(id));            }
            // use the originally configured max parallelism            // as 
the default for consistent runs            adjustedParallelismStore =           
         computeVertexParallelismStoreForExecution(                            
adjustedJobGraph,                            executionMode,                     
       (vertex) -> {                                
VertexParallelismInformation vertexParallelismInfo =                            
            initialParallelismStore.getParallelismInfo(vertex.getID());         
                       return vertexParallelismInfo.getMaxParallelism();        
                    });        } catch (Exception exception) {            
return FutureUtils.completedExceptionally(exception);        }
        return 
createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)              
  .thenApply(                        executionGraph ->                          
      CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(        
                                executionGraph, vertexParallelism));    } {code}

  was:
In the code,flink web ui graph data from `AdaptiveScheduler.requestJob()`

```

@Override
public ExecutionGraphInfo requestJob() {
// no exception history support is added for now (see FLINK-21439)
return new ExecutionGraphInfo(state.getJob());
}

```


> After scaling a flink task running on k8s, the flink web ui graph always 
> shows the parallelism of the first deployment.
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26351
>                 URL: https://issues.apache.org/jira/browse/FLINK-26351
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0
>            Reporter: qiunan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
> {code:java}
> @Override
> public ExecutionGraphInfo requestJob()
> { // no exception history support is added for now (see FLINK-21439) return 
> new ExecutionGraphInfo(state.getJob()); } {code}
> This executionGraphInfo is task restart build.
> {code:java}
>   private 
> CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> 
>            createExecutionGraphWithAvailableResourcesAsync() {        final 
> VertexParallelism vertexParallelism;        final VertexParallelismStore 
> adjustedParallelismStore;
>         try {            vertexParallelism = 
> determineParallelism(slotAllocator);            JobGraph adjustedJobGraph = 
> jobInformation.copyJobGraph();
>             for (JobVertex vertex : adjustedJobGraph.getVertices()) {         
>        JobVertexID id = vertex.getID();
>                 // use the determined "available parallelism" to use          
>       // the resources we have access to                
> vertex.setParallelism(vertexParallelism.getParallelism(id));            }
>             // use the originally configured max parallelism            // as 
> the default for consistent runs            adjustedParallelismStore =         
>            computeVertexParallelismStoreForExecution(                         
>    adjustedJobGraph,                            executionMode,                
>             (vertex) -> {                                
> VertexParallelismInformation vertexParallelismInfo =                          
>               initialParallelismStore.getParallelismInfo(vertex.getID());     
>                            return vertexParallelismInfo.getMaxParallelism();  
>                           });        } catch (Exception exception) {          
>   return FutureUtils.completedExceptionally(exception);        }
>         return 
> createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)            
>     .thenApply(                        executionGraph ->                      
>           CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(  
>                                       executionGraph, vertexParallelism));    
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to