GJL commented on a change in pull request #11647: [FLINK-16960][runtime] Add
PipelinedRegion interface
URL: https://github.com/apache/flink/pull/11647#discussion_r405625039
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/topology/Topology.java
##########
@@ -39,4 +39,26 @@
* @return whether the topology contains co-location constraints
*/
boolean containsCoLocationConstraints();
+
+ /**
+ * Returns all pipelined regions in this topology.
+ *
+ * @return Iterable over pipelined regions in this topology
+ */
+ default Iterable<PipelinedRegion<VID, RID, V, R>>
getAllPipelinedRegions() {
Review comment:
I pushed an update. Here is an example on how to use the API:
```
final SchedulingPipelinedRegion<?, ?> pr =
schedulingTopology.getAllPipelinedRegions().iterator().next();
final SchedulingExecutionVertex<?, ?> vertex = pr.getVertex(null);
final Iterable<? extends SchedulingResultPartition<?, ?>> consumedResults =
vertex.getConsumedResults();
final IntermediateDataSetID resultId =
consumedResults.iterator().next().getResultId();
```
I am not too fond of the wildcard type parameters. One way to get rid of
them is to remove the generically typed `Topology` hierarchy. If code
duplication of the pipelined region computation is a concern, the best idea I
have at the moment is to translate the different topologies into an abstract
graph structure on which we can run a connected components algorithm. The
downsides of this approach are:
- must maintain two translation algorithms (for logical and execution level)
- performance penalty due to additional graph traversal
See below for an PoC (untested):
```
public final class PipelinedRegionComputeUtil2 {
public static Graph<SchedulingExecutionVertex2>
toGraph(SchedulingTopology2 topology) {
final MutableGraph<SchedulingExecutionVertex2> graph =
GraphBuilder.directed()
.allowsSelfLoops(false)
.build();
for (SchedulingExecutionVertex2 consumer :
topology.getVertices()) {
graph.addNode(consumer);
for (SchedulingResultPartition2 resultPartition :
consumer.getConsumedResults()) {
if
(!resultPartition.getResultType().isPipelined()) {
continue;
}
for (SchedulingExecutionVertex2 producer :
resultPartition.getConsumers()) {
graph.putEdge(producer, consumer);
}
}
}
return graph;
}
public static <V> Set<Set<V>> connectedComponents(Graph<V> graph) {
final Map<V, Set<V>> vertexToRegion = new IdentityHashMap<>();
for (V vertex : graph.nodes()) {
Set<V> currentRegion = new HashSet<>();
currentRegion.add(vertex);
vertexToRegion.put(vertex, currentRegion);
for (V producer : graph.predecessors(vertex)) {
final Set<V> producerRegion =
vertexToRegion.get(producer);
producerRegion.add(producer);
if (currentRegion != producerRegion) {
final Set<V> smallerSet;
final Set<V> largerSet;
if (currentRegion.size() <
producerRegion.size()) {
smallerSet = currentRegion;
largerSet = producerRegion;
} else {
smallerSet = producerRegion;
largerSet = currentRegion;
}
for (V v : smallerSet) {
vertexToRegion.put(v,
largerSet);
}
largerSet.addAll(smallerSet);
currentRegion = largerSet;
}
}
}
return uniqueRegions(vertexToRegion);
}
private static <V> Set<Set<V>> uniqueRegions(final Map<V, Set<V>>
vertexToRegion) {
final Set<Set<V>> distinctRegions =
Collections.newSetFromMap(new IdentityHashMap<>());
distinctRegions.addAll(vertexToRegion.values());
return distinctRegions;
}
}
```
non-generic topology classes:
```
/**
* Topology of {@link SchedulingExecutionVertex}.
*/
public interface SchedulingTopology2 {
Iterable<SchedulingExecutionVertex2> getVertices();
/**
* Looks up the {@link SchedulingExecutionVertex} for the given {@link
ExecutionVertexID}.
*
* @param executionVertexId identifying the respective scheduling vertex
* @return Optional containing the respective scheduling vertex or none
if the vertex does not exist
*/
Optional<SchedulingExecutionVertex2> getVertex(ExecutionVertexID
executionVertexId);
/**
* Looks up the {@link SchedulingExecutionVertex} for the given {@link
ExecutionVertexID}.
*
* @param executionVertexId identifying the respective scheduling vertex
* @return The respective scheduling vertex
* @throws IllegalArgumentException If the vertex does not exist
*/
default SchedulingExecutionVertex2 getVertexOrThrow(ExecutionVertexID
executionVertexId) {
return getVertex(executionVertexId).orElseThrow(
() -> new IllegalArgumentException("can not
find vertex: " + executionVertexId));
}
/**
* Looks up the {@link SchedulingResultPartition} for the given {@link
IntermediateResultPartitionID}.
*
* @param intermediateResultPartitionId identifying the respective
scheduling result partition
* @return Optional containing the respective scheduling result
partition or none if the partition does not exist
*/
Optional<SchedulingResultPartition>
getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
/**
* Looks up the {@link SchedulingResultPartition} for the given {@link
IntermediateResultPartitionID}.
*
* @param intermediateResultPartitionId identifying the respective
scheduling result partition
* @return The respective scheduling result partition
* @throws IllegalArgumentException If the partition does not exist
*/
default SchedulingResultPartition
getResultPartitionOrThrow(IntermediateResultPartitionID
intermediateResultPartitionId) {
return
getResultPartition(intermediateResultPartitionId).orElseThrow(
() -> new IllegalArgumentException("can not
find partition: " + intermediateResultPartitionId));
}
}
```
```
/**
* Scheduling representation of {@link ExecutionVertex}.
*/
public interface SchedulingExecutionVertex2 {
ExecutionVertexID getId();
Iterable<SchedulingResultPartition2> getConsumedResults();
Iterable<SchedulingResultPartition2> getProducedResults();
/**
* Gets the state of the execution vertex.
*
* @return state of the execution vertex
*/
ExecutionState getState();
/**
* Get {@link InputDependencyConstraint}.
*
* @return input dependency constraint
*/
InputDependencyConstraint getInputDependencyConstraint();
}
```
```
/**
* Representation of {@link IntermediateResultPartition}.
*/
public interface SchedulingResultPartition2 {
IntermediateResultPartitionID getId();
ResultPartitionType getResultType();
SchedulingExecutionVertex2 getProducer();
Iterable<SchedulingExecutionVertex2> getConsumers();
/**
* Gets id of the intermediate result.
*
* @return id of the intermediate result
*/
IntermediateDataSetID getResultId();
/**
* Gets the {@link ResultPartitionState}.
*
* @return result partition state
*/
ResultPartitionState getState();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services