zhuzhurk commented on a change in pull request #11691: [FLINK-14234][runtime]
Notifies all kinds of consumable partitions to SchedulingStrategy
URL: https://github.com/apache/flink/pull/11691#discussion_r406622356
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -527,6 +531,26 @@ private boolean isNotifiable(
return false;
}
+ private void maybeNotifyBlockingPartitionsConsumable(final
ExecutionVertexID executionVertexId) {
+ final ExecutionVertex executionVertex =
getExecutionVertex(executionVertexId);
+
+ if (executionVertex.getExecutionState() !=
ExecutionState.FINISHED) {
+ return;
+ }
+
+ final Set<IntermediateResultPartitionID>
consumableResultPartitions = new HashSet<>();
+ for (IntermediateResultPartition resultPartition :
executionVertex.getProducedPartitions().values()) {
+ if (resultPartition.getResultType().isBlocking() &&
resultPartition.isConsumable()) {
Review comment:
1. I think a blocking partition can be consumed once it finishes. Actually
we already refined it like this in Blink.
Waiting for the entire intermediate result to finish is not a must. I
feel that it was so because Flink had wanted to schedule batch jobs stage by
stage, i.e. finish one JobVertex and then schedule the consumers.
2. If later we would redefine blocking partition consumability, I think it's
better to do it in SchedulerNG instead of in the SchedulingStrategy. This can
make the concept of PIPELINED and BLOCKING much more clear. A PIPELINED result
partition mean it can be consumed once any data has been produced inside. A
BLOCKING result partition mean it can be consumed after all its data has been
produced. The LazyFromSourcesSchedulingStrategy would not break in this way.
The LazyFromSourcesSchedulingStrategy tests are really complex at the moment
and some tests behave strangely. For example, all pipelined partitions are
consumable initially while all blocking partitions are not. It has been a pain
each time I had to touch them. So I hope we could simply it earlier.
3. each time a partition becomes consumable,
PipelinedRegionSchedulingStrategy finds out all its consumer regions and
schedule those whose inputs are all consumable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services