[ 
https://issues.apache.org/jira/browse/FLINK-26351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qiunan updated FLINK-26351:
---------------------------
    Description: 
In the code,flink web ui graph data from under method.

AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

You can see the code, the parallelism recalculate and copy jobGraph to reset.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
{code:java}
vertexParallelism = determineParallelism(slotAllocator);
JobGraph adjustedJobGraph = jobInformation.copyJobGraph();

for (JobVertex vertex : adjustedJobGraph.getVertices()) {
    JobVertexID id = vertex.getID();

    // use the determined "available parallelism" to use
    // the resources we have access to
    vertex.setParallelism(vertexParallelism.getParallelism(id));
}{code}
But in the restoreState copy jobGraph again, so the jobGraph parallelism always 
deployed for the first time.

AdaptiveScheduler.createExecutionGraphAndRestoreState(VertexParallelismStore 
adjustedParallelismStore)  
{code:java}
private ExecutionGraph createExecutionGraphAndRestoreState(
        VertexParallelismStore adjustedParallelismStore) throws Exception {
    return executionGraphFactory.createAndRestoreExecutionGraph(
            jobInformation.copyJobGraph(),
            completedCheckpointStore,
            checkpointsCleaner,
            checkpointIdCounter,
            
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
            initializationTimestamp,
            vertexAttemptNumberStore,
            adjustedParallelismStore,
            deploymentTimeMetrics,
            LOG);
} {code}

  was:
In the code,flink web ui graph data from under method.

AdaptiveScheduler.requestJob()
{code:java}
 @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
   } {code}
This executionGraphInfo is task restart build and restore to state.

You can see the code, the jobGraph parallelism recalculate and copy jobGraph to 
reset.

AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
{code:java}
vertexParallelism = determineParallelism(slotAllocator);
JobGraph adjustedJobGraph = jobInformation.copyJobGraph();

for (JobVertex vertex : adjustedJobGraph.getVertices()) {
    JobVertexID id = vertex.getID();

    // use the determined "available parallelism" to use
    // the resources we have access to
    vertex.setParallelism(vertexParallelism.getParallelism(id));
}{code}
But in the restoreState copy jobGraph again, so the jobGraph parallelism always 
deployed for the first time.

AdaptiveScheduler.createExecutionGraphAndRestoreState(VertexParallelismStore 
adjustedParallelismStore)  
{code:java}
private ExecutionGraph createExecutionGraphAndRestoreState(
        VertexParallelismStore adjustedParallelismStore) throws Exception {
    return executionGraphFactory.createAndRestoreExecutionGraph(
            jobInformation.copyJobGraph(),
            completedCheckpointStore,
            checkpointsCleaner,
            checkpointIdCounter,
            
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
            initializationTimestamp,
            vertexAttemptNumberStore,
            adjustedParallelismStore,
            deploymentTimeMetrics,
            LOG);
} {code}


> After scaling a flink task running on k8s, the flink web ui graph always 
> shows the parallelism of the first deployment.
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26351
>                 URL: https://issues.apache.org/jira/browse/FLINK-26351
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0
>            Reporter: qiunan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> In the code,flink web ui graph data from under method.
> AdaptiveScheduler.requestJob()
> {code:java}
>  @Override
>     public ExecutionGraphInfo requestJob() {
>         return new ExecutionGraphInfo(state.getJob(), 
> exceptionHistory.toArrayList());
>    } {code}
> This executionGraphInfo is task restart build and restore to state.
> You can see the code, the parallelism recalculate and copy jobGraph to reset.
> AdaptiveScheduler.createExecutionGraphWithAvailableResourcesAsync().
> {code:java}
> vertexParallelism = determineParallelism(slotAllocator);
> JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
> for (JobVertex vertex : adjustedJobGraph.getVertices()) {
>     JobVertexID id = vertex.getID();
>     // use the determined "available parallelism" to use
>     // the resources we have access to
>     vertex.setParallelism(vertexParallelism.getParallelism(id));
> }{code}
> But in the restoreState copy jobGraph again, so the jobGraph parallelism 
> always deployed for the first time.
> AdaptiveScheduler.createExecutionGraphAndRestoreState(VertexParallelismStore 
> adjustedParallelismStore)  
> {code:java}
> private ExecutionGraph createExecutionGraphAndRestoreState(
>         VertexParallelismStore adjustedParallelismStore) throws Exception {
>     return executionGraphFactory.createAndRestoreExecutionGraph(
>             jobInformation.copyJobGraph(),
>             completedCheckpointStore,
>             checkpointsCleaner,
>             checkpointIdCounter,
>             
> TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
>             initializationTimestamp,
>             vertexAttemptNumberStore,
>             adjustedParallelismStore,
>             deploymentTimeMetrics,
>             LOG);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to