zhuzhurk commented on a change in pull request #15310:
URL: https://github.com/apache/flink/pull/15310#discussion_r603082091
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -96,7 +101,7 @@ private void init() {
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
- .filter(region ->
!region.getConsumedResults().iterator().hasNext())
+ .filter(region ->
!region.getConsumedPartitionGroups().iterator().hasNext())
Review comment:
outdated.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -175,9 +184,24 @@ private void maybeScheduleRegion(final
SchedulingPipelinedRegion region) {
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
- private boolean areRegionInputsAllConsumable(final
SchedulingPipelinedRegion region) {
- for (SchedulingResultPartition partition :
region.getConsumedResults()) {
- if (partition.getState() != ResultPartitionState.CONSUMABLE) {
+ private boolean areRegionInputsAllConsumable(
+ final SchedulingPipelinedRegion region,
+ final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
region.getConsumedPartitionGroups()) {
+ if (!consumableStatusCache.computeIfAbsent(
+ consumedPartitionGroup,
this::isConsumedPartitionGroupConsumable)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isConsumedPartitionGroupConsumable(
+ final ConsumedPartitionGroup consumedPartitionGroup) {
+ // For grouped pipelined result partitions, they may not be consumable
at the same time
Review comment:
outdated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]