Thesharing commented on a change in pull request #16688:
URL: https://github.com/apache/flink/pull/16688#discussion_r685939491



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));

Review comment:
       Sorry for being careless. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));
+
+        return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
+    }
+
+    /**
+     * Merge the regions base on <a
+     * 
href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm";>
+     * Tarjan's strongly connected components algorithm</a>. For more details 
please see <a
+     * 
href="https://issues.apache.org/jira/browse/FLINK-17330";>FLINK-17330</a>.
+     */
+    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final List<Set<SchedulingExecutionVertex>> regionList =
+                new ArrayList<>(uniqueRegions(vertexToRegion));
+        final List<List<Integer>> outEdges =
+                buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
+        final Set<Set<Integer>> sccs =
+                
StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents(
+                        outEdges.size(), outEdges);
+
+        final Set<Set<SchedulingExecutionVertex>> mergedRegions =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+        for (Set<Integer> scc : sccs) {
+            checkState(scc.size() > 0);
+
+            Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
+            for (int regionIndex : scc) {
+                mergedRegion =
+                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+            }
+            mergedRegions.add(mergedRegion);
+        }
+
+        return mergedRegions;
+    }
+
+    private static List<List<Integer>> buildOutEdgesDesc(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final List<Set<SchedulingExecutionVertex>> regionList,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final Map<Set<SchedulingExecutionVertex>, Integer> regionIndices = new 
IdentityHashMap<>();
+        for (int i = 0; i < regionList.size(); i++) {
+            regionIndices.put(regionList.get(i), i);
+        }
+
+        final List<List<Integer>> outEdges = new 
ArrayList<>(regionList.size());
+        for (Set<SchedulingExecutionVertex> currentRegion : regionList) {
+            final List<Integer> currentRegionOutEdges = new ArrayList<>();
+            for (SchedulingExecutionVertex vertex : currentRegion) {
+                for (SchedulingResultPartition producedResult : 
vertex.getProducedResults()) {
+                    if (producedResult.getResultType().isPipelined()) {
+                        continue;
+                    }
+                    for (ConsumerVertexGroup consumerVertexGroup :
+                            producedResult.getConsumerVertexGroups()) {
+                        for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
+                            SchedulingExecutionVertex consumerVertex =
+                                    
executionVertexRetriever.apply(consumerVertexId);
+                            // Since we are merging SchedulingPipelinedRegions 
inside one
+                            // LogicalPipelinedRegion, if any vertex of the 
ConsumerVertexGroup
+                            // doesn't belong to this LogicalPipelinedRegion, 
we can just skip the
+                            // remaining vertices in this group. This can 
decrease the computation
+                            // complexity.

Review comment:
       I tried to rewrite this comment.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
##########
@@ -27,7 +28,7 @@
  * intermediate result partition to a job vertex. An edge is parametrized with 
its {@link
  * DistributionPattern}.
  */
-public class JobEdge implements java.io.Serializable {
+public class JobEdge implements LogicalEdge, java.io.Serializable {

Review comment:
       Thank you for proposing this idea. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.topology.LogicalPipelinedRegion;
+import org.apache.flink.runtime.jobgraph.topology.LogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+
+/** Utils for computing {@link LogicalPipelinedRegion}s. */
+public final class LogicalPipelinedRegionComputeUtil {

Review comment:
       Sure. I've added several unit tests for 
`LogicalPipelinedRegionComputeUtil`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));
+
+        return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
+    }
+
+    /**
+     * Merge the regions base on <a
+     * 
href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm";>
+     * Tarjan's strongly connected components algorithm</a>. For more details 
please see <a
+     * 
href="https://issues.apache.org/jira/browse/FLINK-17330";>FLINK-17330</a>.
+     */
+    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final List<Set<SchedulingExecutionVertex>> regionList =
+                new ArrayList<>(uniqueRegions(vertexToRegion));
+        final List<List<Integer>> outEdges =
+                buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
+        final Set<Set<Integer>> sccs =
+                
StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents(
+                        outEdges.size(), outEdges);
+
+        final Set<Set<SchedulingExecutionVertex>> mergedRegions =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+        for (Set<Integer> scc : sccs) {
+            checkState(scc.size() > 0);
+
+            Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
+            for (int regionIndex : scc) {
+                mergedRegion =
+                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+            }
+            mergedRegions.add(mergedRegion);
+        }
+
+        return mergedRegions;
+    }
+
+    private static List<List<Integer>> buildOutEdgesDesc(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final List<Set<SchedulingExecutionVertex>> regionList,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final Map<Set<SchedulingExecutionVertex>, Integer> regionIndices = new 
IdentityHashMap<>();
+        for (int i = 0; i < regionList.size(); i++) {
+            regionIndices.put(regionList.get(i), i);
+        }
+
+        final List<List<Integer>> outEdges = new 
ArrayList<>(regionList.size());
+        for (Set<SchedulingExecutionVertex> currentRegion : regionList) {
+            final List<Integer> currentRegionOutEdges = new ArrayList<>();
+            for (SchedulingExecutionVertex vertex : currentRegion) {
+                for (SchedulingResultPartition producedResult : 
vertex.getProducedResults()) {
+                    if (producedResult.getResultType().isPipelined()) {
+                        continue;
+                    }
+                    for (ConsumerVertexGroup consumerVertexGroup :
+                            producedResult.getConsumerVertexGroups()) {
+                        for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
+                            SchedulingExecutionVertex consumerVertex =
+                                    
executionVertexRetriever.apply(consumerVertexId);
+                            // Since we are merging SchedulingPipelinedRegions 
inside one
+                            // LogicalPipelinedRegion, if any vertex of the 
ConsumerVertexGroup
+                            // doesn't belong to this LogicalPipelinedRegion, 
we can just skip the
+                            // remaining vertices in this group. This can 
decrease the computation
+                            // complexity.
+                            if (!vertexToRegion.containsKey(consumerVertex)) {
+                                break;
+                            }
+                            if (!currentRegion.contains(consumerVertex)) {
+                                currentRegionOutEdges.add(
+                                        
regionIndices.get(vertexToRegion.get(consumerVertex)));
+                            }
+                        }
+                    }
+                }
+            }
+            outEdges.add(currentRegionOutEdges);
+        }
+
+        return outEdges;
+    }
+
+    private static Iterable<SchedulingResultPartition> getReconnectableResults(

Review comment:
       Sorry that I make a mistake here. I forgot to check this meaning.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.topology.LogicalPipelinedRegion;
+import org.apache.flink.runtime.jobgraph.topology.LogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+
+/** Utils for computing {@link LogicalPipelinedRegion}s. */
+public final class LogicalPipelinedRegionComputeUtil {
+
+    public static Set<Set<LogicalVertex>> computePipelinedRegions(
+            final Iterable<? extends LogicalVertex> 
topologicallySortedVertices) {
+
+        final Map<LogicalVertex, Set<LogicalVertex>> vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        
LogicalPipelinedRegionComputeUtil::getReconnectableResults);
+
+        // Since LogicalTopology is a DAG, it cannot be cyclic. Thus, we don't 
need to use Tarjan's
+        // strongly connected components algorithm here.

Review comment:
       Thank you for proposing a better description here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
##########
@@ -60,39 +45,27 @@
             currentRegion.add(vertex);
             vertexToRegion.put(vertex, currentRegion);
 
-            for (R consumedResult : vertex.getConsumedResults()) {
+            for (R consumedResult : getReconnectableResults.apply(vertex)) {
                 // Similar to the BLOCKING ResultPartitionType, each vertex 
connected through

Review comment:
       Moved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));
+
+        return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
+    }
+
+    /**
+     * Merge the regions base on <a
+     * 
href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm";>
+     * Tarjan's strongly connected components algorithm</a>. For more details 
please see <a
+     * 
href="https://issues.apache.org/jira/browse/FLINK-17330";>FLINK-17330</a>.
+     */
+    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final List<Set<SchedulingExecutionVertex>> regionList =
+                new ArrayList<>(uniqueRegions(vertexToRegion));
+        final List<List<Integer>> outEdges =
+                buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
+        final Set<Set<Integer>> sccs =
+                
StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents(
+                        outEdges.size(), outEdges);
+
+        final Set<Set<SchedulingExecutionVertex>> mergedRegions =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+        for (Set<Integer> scc : sccs) {
+            checkState(scc.size() > 0);
+
+            Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
+            for (int regionIndex : scc) {
+                mergedRegion =
+                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+            }
+            mergedRegions.add(mergedRegion);
+        }
+
+        return mergedRegions;
+    }
+
+    private static List<List<Integer>> buildOutEdgesDesc(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final List<Set<SchedulingExecutionVertex>> regionList,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final Map<Set<SchedulingExecutionVertex>, Integer> regionIndices = new 
IdentityHashMap<>();
+        for (int i = 0; i < regionList.size(); i++) {
+            regionIndices.put(regionList.get(i), i);
+        }
+
+        final List<List<Integer>> outEdges = new 
ArrayList<>(regionList.size());
+        for (Set<SchedulingExecutionVertex> currentRegion : regionList) {
+            final List<Integer> currentRegionOutEdges = new ArrayList<>();
+            for (SchedulingExecutionVertex vertex : currentRegion) {
+                for (SchedulingResultPartition producedResult : 
vertex.getProducedResults()) {
+                    if (producedResult.getResultType().isPipelined()) {
+                        continue;
+                    }
+                    for (ConsumerVertexGroup consumerVertexGroup :
+                            producedResult.getConsumerVertexGroups()) {
+                        for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
+                            SchedulingExecutionVertex consumerVertex =
+                                    
executionVertexRetriever.apply(consumerVertexId);
+                            // Since we are merging SchedulingPipelinedRegions 
inside one
+                            // LogicalPipelinedRegion, if any vertex of the 
ConsumerVertexGroup
+                            // doesn't belong to this LogicalPipelinedRegion, 
we can just skip the
+                            // remaining vertices in this group. This can 
decrease the computation
+                            // complexity.
+                            if (!vertexToRegion.containsKey(consumerVertex)) {
+                                break;
+                            }
+                            if (!currentRegion.contains(consumerVertex)) {
+                                currentRegionOutEdges.add(
+                                        
regionIndices.get(vertexToRegion.get(consumerVertex)));
+                            }
+                        }
+                    }
+                }
+            }
+            outEdges.add(currentRegionOutEdges);
+        }
+
+        return outEdges;
+    }
+
+    private static Iterable<SchedulingResultPartition> getReconnectableResults(
+            SchedulingExecutionVertex vertex,
+            Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+        return () ->

Review comment:
       I've tried to create an `ArrayList` and insert all satisfying results 
into the list. For 8k parallelism it will make the total time of 
`BuildTopologyBenchmark` increase from nearly 500ms to 2s. Thus, I think 
iterator would be better.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));
+
+        return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
+    }
+
+    /**
+     * Merge the regions base on <a
+     * 
href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm";>
+     * Tarjan's strongly connected components algorithm</a>. For more details 
please see <a
+     * 
href="https://issues.apache.org/jira/browse/FLINK-17330";>FLINK-17330</a>.
+     */
+    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final List<Set<SchedulingExecutionVertex>> regionList =
+                new ArrayList<>(uniqueRegions(vertexToRegion));
+        final List<List<Integer>> outEdges =
+                buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
+        final Set<Set<Integer>> sccs =
+                
StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents(
+                        outEdges.size(), outEdges);
+
+        final Set<Set<SchedulingExecutionVertex>> mergedRegions =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+        for (Set<Integer> scc : sccs) {
+            checkState(scc.size() > 0);
+
+            Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
+            for (int regionIndex : scc) {
+                mergedRegion =
+                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+            }
+            mergedRegions.add(mergedRegion);
+        }
+
+        return mergedRegions;
+    }
+
+    private static List<List<Integer>> buildOutEdgesDesc(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final List<Set<SchedulingExecutionVertex>> regionList,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final Map<Set<SchedulingExecutionVertex>, Integer> regionIndices = new 
IdentityHashMap<>();
+        for (int i = 0; i < regionList.size(); i++) {
+            regionIndices.put(regionList.get(i), i);
+        }
+
+        final List<List<Integer>> outEdges = new 
ArrayList<>(regionList.size());
+        for (Set<SchedulingExecutionVertex> currentRegion : regionList) {
+            final List<Integer> currentRegionOutEdges = new ArrayList<>();
+            for (SchedulingExecutionVertex vertex : currentRegion) {
+                for (SchedulingResultPartition producedResult : 
vertex.getProducedResults()) {
+                    if (producedResult.getResultType().isPipelined()) {
+                        continue;
+                    }
+                    for (ConsumerVertexGroup consumerVertexGroup :
+                            producedResult.getConsumerVertexGroups()) {
+                        for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
+                            SchedulingExecutionVertex consumerVertex =
+                                    
executionVertexRetriever.apply(consumerVertexId);
+                            // Since we are merging SchedulingPipelinedRegions 
inside one
+                            // LogicalPipelinedRegion, if any vertex of the 
ConsumerVertexGroup
+                            // doesn't belong to this LogicalPipelinedRegion, 
we can just skip the
+                            // remaining vertices in this group. This can 
decrease the computation
+                            // complexity.
+                            if (!vertexToRegion.containsKey(consumerVertex)) {
+                                break;
+                            }
+                            if (!currentRegion.contains(consumerVertex)) {
+                                currentRegionOutEdges.add(
+                                        
regionIndices.get(vertexToRegion.get(consumerVertex)));
+                            }
+                        }
+                    }
+                }
+            }
+            outEdges.add(currentRegionOutEdges);
+        }
+
+        return outEdges;
+    }
+
+    private static Iterable<SchedulingResultPartition> getReconnectableResults(
+            SchedulingExecutionVertex vertex,
+            Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+        return () ->

Review comment:
       In `LogicalPipelinedRegionComputUtil` the computation complexity is not 
disturbing us, thus, it returns a list.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
##########
@@ -22,35 +22,20 @@
 import org.apache.flink.runtime.topology.Result;
 import org.apache.flink.runtime.topology.Vertex;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.function.Function;
 
-/** Utility for computing pipelined regions. */
+/** Common utils for computing pipelined regions. */
 public final class PipelinedRegionComputeUtil {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
-
-    public static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
-            Set<Set<V>> computePipelinedRegions(
-                    final Iterable<? extends V> topologicallySortedVertexes) {
-        final Map<V, Set<V>> vertexToRegion = 
buildRawRegions(topologicallySortedVertexes);
-        return mergeRegionsOnCycles(vertexToRegion);
-    }
-
-    private static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
+    static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
             Map<V, Set<V>> buildRawRegions(
-                    final Iterable<? extends V> topologicallySortedVertexes) {
+                    final Iterable<? extends V> topologicallySortedVertexes,

Review comment:
       Truly sorry for being careless again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to