[ 
https://issues.apache.org/jira/browse/FLINK-26351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qiunan updated FLINK-26351:
---------------------------
    Description: 
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().

You can see the jobGraph parallelism recalculate.
private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
            createExecutionGraphWithAvailableResourcesAsync() {
        final VertexParallelism vertexParallelism;
        final VertexParallelismStore adjustedParallelismStore;
​
        try {
            vertexParallelism = determineParallelism(slotAllocator);
            JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
​
            for (JobVertex vertex : adjustedJobGraph.getVertices()) {
                JobVertexID id = vertex.getID();
​
                // use the determined "available parallelism" to use
                // the resources we have access to
                vertex.setParallelism(vertexParallelism.getParallelism(id));
            }
​
            // use the originally configured max parallelism
            // as the default for consistent runs
            adjustedParallelismStore =
                    computeVertexParallelismStoreForExecution(
                            adjustedJobGraph,
                            executionMode,
                            (vertex) -> {
                                VertexParallelismInformation 
vertexParallelismInfo =
                                        
initialParallelismStore.getParallelismInfo(vertex.getID());
                                return 
vertexParallelismInfo.getMaxParallelism();
                            });
        } catch (Exception exception) {
            return FutureUtils.completedExceptionally(exception);
        }
​
        return 
createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
                .thenApply(
                        executionGraph ->
                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                                        executionGraph, vertexParallelism));
    }

  was:
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
   @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
    }
This executionGraphInfo is task restart build and restore to state.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().

You can see the jobGraph parallelism recalculate.
private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
            createExecutionGraphWithAvailableResourcesAsync() {
        final VertexParallelism vertexParallelism;
        final VertexParallelismStore adjustedParallelismStore;
​
        try {
            vertexParallelism = determineParallelism(slotAllocator);
            JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
​
            for (JobVertex vertex : adjustedJobGraph.getVertices()) {
                JobVertexID id = vertex.getID();
​
                // use the determined "available parallelism" to use
                // the resources we have access to
                vertex.setParallelism(vertexParallelism.getParallelism(id));
            }
​
            // use the originally configured max parallelism
            // as the default for consistent runs
            adjustedParallelismStore =
                    computeVertexParallelismStoreForExecution(
                            adjustedJobGraph,
                            executionMode,
                            (vertex) -> {
                                VertexParallelismInformation 
vertexParallelismInfo =
                                        
initialParallelismStore.getParallelismInfo(vertex.getID());
                                return 
vertexParallelismInfo.getMaxParallelism();
                            });
        } catch (Exception exception) {
            return FutureUtils.completedExceptionally(exception);
        }
​
        return 
createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
                .thenApply(
                        executionGraph ->
                                
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                                        executionGraph, vertexParallelism));
    }


> After scaling a flink task running on k8s, the flink web ui graph always 
> shows the parallelism of the first deployment.
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26351
>                 URL: https://issues.apache.org/jira/browse/FLINK-26351
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0
>            Reporter: qiunan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
> {code:java}
>  @Override
>     public ExecutionGraphInfo requestJob() {
>         return new ExecutionGraphInfo(state.getJob(), 
> exceptionHistory.toArrayList());
>    } {code}
> This executionGraphInfo is task restart build and restore to state.
> AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
> You can see the jobGraph parallelism recalculate.
> private 
> CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
>             createExecutionGraphWithAvailableResourcesAsync() {
>         final VertexParallelism vertexParallelism;
>         final VertexParallelismStore adjustedParallelismStore;
> ​
>         try {
>             vertexParallelism = determineParallelism(slotAllocator);
>             JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
> ​
>             for (JobVertex vertex : adjustedJobGraph.getVertices()) {
>                 JobVertexID id = vertex.getID();
> ​
>                 // use the determined "available parallelism" to use
>                 // the resources we have access to
>                 vertex.setParallelism(vertexParallelism.getParallelism(id));
>             }
> ​
>             // use the originally configured max parallelism
>             // as the default for consistent runs
>             adjustedParallelismStore =
>                     computeVertexParallelismStoreForExecution(
>                             adjustedJobGraph,
>                             executionMode,
>                             (vertex) -> {
>                                 VertexParallelismInformation 
> vertexParallelismInfo =
>                                         
> initialParallelismStore.getParallelismInfo(vertex.getID());
>                                 return 
> vertexParallelismInfo.getMaxParallelism();
>                             });
>         } catch (Exception exception) {
>             return FutureUtils.completedExceptionally(exception);
>         }
> ​
>         return 
> createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
>                 .thenApply(
>                         executionGraph ->
>                                 
> CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
>                                         executionGraph, vertexParallelism));
>     }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to