zhuzhurk commented on a change in pull request #16688: URL: https://github.com/apache/flink/pull/16688#discussion_r684015715
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyBuildUtil.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; + +import java.util.Arrays; + +/** Build utils for {@link DefaultExecutionTopology}. */ +public class DefaultExecutionTopologyBuildUtil { Review comment: I would suggest name it as `DefaultExecutionTopologyBuildTestingUtils`. to make it easier to be identified as a test class. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyBuildUtil.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; + +import java.util.Arrays; + +/** Build utils for {@link DefaultExecutionTopology}. */ +public class DefaultExecutionTopologyBuildUtil { + public static DefaultExecutionTopology fromJobVertices(JobVertex... vertices) throws Exception { + return fromExecutionGraph( + ExecutionGraphTestUtils.createSimpleTestGraph(vertices), vertices); + } + + public static DefaultExecutionTopology fromExecutionGraph( Review comment: Same as above, I prefer to rename it to `createDefaultExecutionTopology(...)`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/LogicalVertex.java ########## @@ -18,10 +18,19 @@ package org.apache.flink.runtime.jobgraph.topology; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.topology.Vertex; /** Represents a vertex in {@link LogicalTopology}, i.e. {@link JobVertex}. */ public interface LogicalVertex - extends Vertex<JobVertexID, IntermediateDataSetID, LogicalVertex, LogicalResult> {} + extends Vertex<JobVertexID, IntermediateDataSetID, LogicalVertex, LogicalResult> { + + /** + * Get the input {@link JobEdge}s of the {@link JobVertex}. + * + * @return the input {@link JobEdge}s + */ + Iterable<JobEdge> getInputs(); Review comment: `JobEdge` is too heavy to be in an interface. Can we introduce a `LogicalEdge` interface which provides `getDistributionPattern()` and `getProducerVertexId()`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java ########## @@ -243,13 +275,36 @@ private static DefaultExecutionVertex generateSchedulingExecutionVertex( } private static IndexedPipelinedRegions computePipelinedRegions( - Iterable<DefaultExecutionVertex> topologicallySortedVertexes, + Iterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions, + Function<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>> + sortedExecutionVerticesInPipelinedRegion, + Function<ExecutionVertexID, DefaultExecutionVertex> executionVertexRetriever, Function<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionRetriever) { + long buildRegionsStartTime = System.nanoTime(); Set<Set<SchedulingExecutionVertex>> rawPipelinedRegions = - PipelinedRegionComputeUtil.computePipelinedRegions(topologicallySortedVertexes); + Collections.newSetFromMap(new IdentityHashMap<>()); + + for (DefaultLogicalPipelinedRegion logicalPipelinedRegion : logicalPipelinedRegions) { + + List<DefaultExecutionVertex> schedulingExecutionVertices = + sortedExecutionVerticesInPipelinedRegion.apply(logicalPipelinedRegion); + + if (containsIntraRegionAllToAllEdge(logicalPipelinedRegion)) { + // If the logical pipelined region contains any intra-region all-to-all edge, + // convert the entire logical pipelined region to the sole scheduling pipelined + // region + rawPipelinedRegions.add(new HashSet<>(schedulingExecutionVertices)); + } else { + rawPipelinedRegions.addAll( Review comment: Let's add some explanation for why we are computing scheduling pipelined regions out from vertices of a logical pipelined region. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyBuildUtil.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; + +import java.util.Arrays; + +/** Build utils for {@link DefaultExecutionTopology}. */ +public class DefaultExecutionTopologyBuildUtil { + public static DefaultExecutionTopology fromJobVertices(JobVertex... vertices) throws Exception { Review comment: I think it's better to name it as `createDefaultExecutionTopology(...)` otherwise one cannot know what this method is for. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopologyTest.java ########## @@ -75,9 +74,11 @@ public void testGetVertices() { @Test public void testGetLogicalPipelinedRegions() { - final Set<DefaultLogicalPipelinedRegion> regions = - logicalTopology.getLogicalPipelinedRegions(); - assertEquals(2, regions.size()); + int count = 0; + for (DefaultLogicalPipelinedRegion ignored : logicalTopology.getAllPipelinedRegions()) { + count++; + } + assertEquals(2, count); Review comment: nit: can be simplified as `assertEquals(2, IterableUtils.toStream(logicalTopology.getAllPipelinedRegions()).count());` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java ########## @@ -815,7 +816,16 @@ public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobExcept registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder); // the topology assigning should happen before notifying new vertices to failoverStrategy - executionTopology = DefaultExecutionTopology.fromExecutionGraph(this); + List<JobVertex> allOrderedJobVertices = Review comment: I prefer to use `sorted vertices` instead of `ordered vertices` in all places. Otherwise it may cause confusion because `sorted vertices` is already used in many places. Optionally, we can also name it as `jobVerticesInCreationOrder`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java ########## @@ -132,7 +139,8 @@ public EdgeManager getEdgeManager() { } public static DefaultExecutionTopology fromExecutionGraph( - DefaultExecutionGraph executionGraph) { + DefaultExecutionGraph executionGraph, + Iterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions) { Review comment: I think we can get job vertices from `executionGraph` via `executionGraph.getVerticesTopologically()` and do not need to add param `logicalPipelinedRegions`. In this way, we can also ensure that the job vertices are really job vertices of the execution graph. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java ########## @@ -163,11 +174,23 @@ public static DefaultExecutionTopology fromExecutionGraph( private static ExecutionGraphIndex computeExecutionGraphIndex( Iterable<ExecutionVertex> executionVertices, int vertexNumber, + Iterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions, EdgeManager edgeManager) { Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById = new HashMap<>(); List<DefaultExecutionVertex> executionVerticesList = new ArrayList<>(vertexNumber); Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById = new HashMap<>(); + Map<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>> + sortedExecutionVerticesInPipelinedRegion = new IdentityHashMap<>(); + + Map<JobVertexID, DefaultLogicalPipelinedRegion> pipelinedRegionByJobVertexId = Review comment: `pipelinedRegionByJobVertexId` -> `logicalPipelinedRegionByJobVertexId ` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyBuildUtil.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; + +import java.util.Arrays; + +/** Build utils for {@link DefaultExecutionTopology}. */ +public class DefaultExecutionTopologyBuildUtil { + public static DefaultExecutionTopology fromJobVertices(JobVertex... vertices) throws Exception { + return fromExecutionGraph( + ExecutionGraphTestUtils.createSimpleTestGraph(vertices), vertices); + } + + public static DefaultExecutionTopology fromExecutionGraph( + DefaultExecutionGraph executionGraph, JobVertex... vertices) throws Exception { Review comment: The param `vertices` is not needed because it can be get via `executionGraph.getVerticesTopologically()`. This also ensures that the job vertices and the execution graph are matched. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java ########## @@ -243,13 +275,36 @@ private static DefaultExecutionVertex generateSchedulingExecutionVertex( } private static IndexedPipelinedRegions computePipelinedRegions( - Iterable<DefaultExecutionVertex> topologicallySortedVertexes, + Iterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions, + Function<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>> + sortedExecutionVerticesInPipelinedRegion, + Function<ExecutionVertexID, DefaultExecutionVertex> executionVertexRetriever, Function<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionRetriever) { + long buildRegionsStartTime = System.nanoTime(); Set<Set<SchedulingExecutionVertex>> rawPipelinedRegions = - PipelinedRegionComputeUtil.computePipelinedRegions(topologicallySortedVertexes); + Collections.newSetFromMap(new IdentityHashMap<>()); + + for (DefaultLogicalPipelinedRegion logicalPipelinedRegion : logicalPipelinedRegions) { + + List<DefaultExecutionVertex> schedulingExecutionVertices = + sortedExecutionVerticesInPipelinedRegion.apply(logicalPipelinedRegion); + + if (containsIntraRegionAllToAllEdge(logicalPipelinedRegion)) { + // If the logical pipelined region contains any intra-region all-to-all edge, + // convert the entire logical pipelined region to the sole scheduling pipelined + // region + rawPipelinedRegions.add(new HashSet<>(schedulingExecutionVertices)); Review comment: Let's add some explanation for why we have to make these vertices one region. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java ########## @@ -45,22 +45,32 @@ private final Map<IntermediateDataSetID, DefaultLogicalResult> idToResultMap; - public DefaultLogicalTopology(final JobGraph jobGraph) { - checkNotNull(jobGraph); + private DefaultLogicalTopology(final List<JobVertex> jobVertices) { + checkNotNull(jobVertices); - this.verticesSorted = new ArrayList<>(jobGraph.getNumberOfVertices()); + this.verticesSorted = new ArrayList<>(jobVertices.size()); this.idToVertexMap = new HashMap<>(); this.idToResultMap = new HashMap<>(); - buildVerticesAndResults(jobGraph); + buildVerticesAndResults(jobVertices); + } + + public static DefaultLogicalTopology fromJobGraph(final JobGraph jobGraph) { + checkNotNull(jobGraph); + + return fromOrderedJobVertices(jobGraph.getVerticesSortedTopologicallyFromSources()); + } + + public static DefaultLogicalTopology fromOrderedJobVertices(final List<JobVertex> jobVertices) { Review comment: `Ordered` -> `topologicallySorted` `jobVertices` -> `topologicallySortedJobVertices ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
