[
https://issues.apache.org/jira/browse/FLINK-26351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
qiunan updated FLINK-26351:
---------------------------
Description:
In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
{code:java}
@Override
public ExecutionGraphInfo requestJob()
{ // no exception history support is added for now (see FLINK-21439) return new
ExecutionGraphInfo(state.getJob()); } {code}
This executionGraphInfo is task restart build.
{code:java}
private
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
createExecutionGraphWithAvailableResourcesAsync() { final
VertexParallelism vertexParallelism; final VertexParallelismStore
adjustedParallelismStore;
try { vertexParallelism =
determineParallelism(slotAllocator); JobGraph adjustedJobGraph =
jobInformation.copyJobGraph();
for (JobVertex vertex : adjustedJobGraph.getVertices()) {
JobVertexID id = vertex.getID();
// use the determined "available parallelism" to use
// the resources we have access to
vertex.setParallelism(vertexParallelism.getParallelism(id)); }
// use the originally configured max parallelism // as
the default for consistent runs adjustedParallelismStore =
computeVertexParallelismStoreForExecution(
adjustedJobGraph, executionMode,
(vertex) -> {
VertexParallelismInformation vertexParallelismInfo =
initialParallelismStore.getParallelismInfo(vertex.getID());
return vertexParallelismInfo.getMaxParallelism();
}); } catch (Exception exception) {
return FutureUtils.completedExceptionally(exception); }
return
createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
.thenApply( executionGraph ->
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, vertexParallelism)); } {code}
was:
In the code,flink web ui graph data from `AdaptiveScheduler.requestJob()`
```
@Override
public ExecutionGraphInfo requestJob() {
// no exception history support is added for now (see FLINK-21439)
return new ExecutionGraphInfo(state.getJob());
}
```
> After scaling a flink task running on k8s, the flink web ui graph always
> shows the parallelism of the first deployment.
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26351
> URL: https://issues.apache.org/jira/browse/FLINK-26351
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.15.0
> Reporter: qiunan
> Priority: Major
> Fix For: 1.15.0
>
>
> In the code,flink web ui graph data from AdaptiveScheduler.requestJob()
> {code:java}
> @Override
> public ExecutionGraphInfo requestJob()
> { // no exception history support is added for now (see FLINK-21439) return
> new ExecutionGraphInfo(state.getJob()); } {code}
> This executionGraphInfo is task restart build.
> {code:java}
> private
> CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
> createExecutionGraphWithAvailableResourcesAsync() { final
> VertexParallelism vertexParallelism; final VertexParallelismStore
> adjustedParallelismStore;
> try { vertexParallelism =
> determineParallelism(slotAllocator); JobGraph adjustedJobGraph =
> jobInformation.copyJobGraph();
> for (JobVertex vertex : adjustedJobGraph.getVertices()) {
> JobVertexID id = vertex.getID();
> // use the determined "available parallelism" to use
> // the resources we have access to
> vertex.setParallelism(vertexParallelism.getParallelism(id)); }
> // use the originally configured max parallelism // as
> the default for consistent runs adjustedParallelismStore =
> computeVertexParallelismStoreForExecution(
> adjustedJobGraph, executionMode,
> (vertex) -> {
> VertexParallelismInformation vertexParallelismInfo =
> initialParallelismStore.getParallelismInfo(vertex.getID());
> return vertexParallelismInfo.getMaxParallelism();
> }); } catch (Exception exception) {
> return FutureUtils.completedExceptionally(exception); }
> return
> createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
> .thenApply( executionGraph ->
> CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
> executionGraph, vertexParallelism));
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)