[ 
https://issues.apache.org/jira/browse/FLINK-26351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qiunan updated FLINK-26351:
---------------------------
    Description: 
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().

You can see the jobGraph parallelism recalculate.
{code:java}
vertexParallelism = determineParallelism(slotAllocator);
JobGraph adjustedJobGraph = jobInformation.copyJobGraph();

for (JobVertex vertex : adjustedJobGraph.getVertices()) {
    JobVertexID id = vertex.getID();

    // use the determined "available parallelism" to use
    // the resources we have access to
    vertex.setParallelism(vertexParallelism.getParallelism(id));
}{code}
 

 

  was:
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().

You can see the jobGraph parallelism recalculate.


 
{code:java}
vertexParallelism = determineParallelism(slotAllocator);
JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); 
 for (JobVertex vertex : adjustedJobGraph.getVertices()) {
     JobVertexID id = vertex.getID();   
      // use the determined "available parallelism" to use
   // the resources we have access to
    vertex.setParallelism(vertexParallelism.getParallelism(id));
  }{code}
 

 


> After scaling a flink task running on k8s, the flink web ui graph always 
> shows the parallelism of the first deployment.
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26351
>                 URL: https://issues.apache.org/jira/browse/FLINK-26351
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0
>            Reporter: qiunan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
> {code:java}
>  @Override
>     public ExecutionGraphInfo requestJob() {
>         return new ExecutionGraphInfo(state.getJob(), 
> exceptionHistory.toArrayList());
>    } {code}
> This executionGraphInfo is task restart build and restore to state.
> AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
> You can see the jobGraph parallelism recalculate.
> {code:java}
> vertexParallelism = determineParallelism(slotAllocator);
> JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
> for (JobVertex vertex : adjustedJobGraph.getVertices()) {
>     JobVertexID id = vertex.getID();
>     // use the determined "available parallelism" to use
>     // the resources we have access to
>     vertex.setParallelism(vertexParallelism.getParallelism(id));
> }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to