This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e739b3853a60b8c7584510b0becd87265306140e Author: Weijie Guo <[email protected]> AuthorDate: Mon Oct 31 15:14:13 2022 +0800 [FLINK-29767] Introduce InputConsumableDecider and using it to decide are input all consumable for VertexwiseSchedulingStrategy. --- .../strategy/DefaultInputConsumableDecider.java | 107 ++++++++++++ .../scheduler/strategy/InputConsumableDecider.java | 50 ++++++ .../strategy/VertexwiseSchedulingStrategy.java | 45 ++--- .../DefaultInputConsumableDeciderTest.java | 189 +++++++++++++++++++++ .../strategy/TestingInputConsumableDecider.java | 56 ++++++ .../strategy/VertexwiseSchedulingStrategyTest.java | 55 ++++-- 6 files changed, 457 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java new file mode 100644 index 00000000000..1308568db7f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +/** + * Default implementation of {@link InputConsumableDecider}. This decider will judge whether the + * executionVertex's inputs are consumable as follows: + * + * <p>For blocking consumed partition group: Whether all result partitions in the group are + * finished. + * + * <p>For canBePipelined consumed partition group: whether all result partitions in the group are + * scheduled. + */ +class DefaultInputConsumableDecider implements InputConsumableDecider { + private final Function<IntermediateResultPartitionID, SchedulingResultPartition> + resultPartitionRetriever; + + private final Function<ExecutionVertexID, Boolean> scheduledVertexRetriever; + + DefaultInputConsumableDecider( + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, + Function<IntermediateResultPartitionID, SchedulingResultPartition> + resultPartitionRetriever) { + this.scheduledVertexRetriever = scheduledVertexRetriever; + this.resultPartitionRetriever = resultPartitionRetriever; + } + + @Override + public boolean isInputConsumable( + SchedulingExecutionVertex executionVertex, + Set<ExecutionVertexID> verticesToSchedule, + Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) { + for (ConsumedPartitionGroup consumedPartitionGroup : + executionVertex.getConsumedPartitionGroups()) { + + if (!consumableStatusCache.computeIfAbsent( + consumedPartitionGroup, + (group) -> isConsumedPartitionGroupConsumable(group, verticesToSchedule))) { + return false; + } + } + return true; + } + + private boolean isConsumedPartitionGroupConsumable( + final ConsumedPartitionGroup consumedPartitionGroup, + final Set<ExecutionVertexID> verticesToSchedule) { + if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { + for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { + ExecutionVertexID producerVertex = + resultPartitionRetriever.apply(partitionId).getProducer().getId(); + if (!verticesToSchedule.contains(producerVertex) + && !scheduledVertexRetriever.apply(producerVertex)) { + return false; + } + } + } else { + for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { + if (resultPartitionRetriever.apply(partitionId).getState() + != ResultPartitionState.CONSUMABLE) { + return false; + } + } + } + return true; + } + + /** Factory for {@link DefaultInputConsumableDecider}. */ + public static class Factory implements InputConsumableDecider.Factory { + + public static final InputConsumableDecider.Factory INSTANCE = new Factory(); + + // disable public instantiation. + private Factory() {} + + @Override + public InputConsumableDecider createInstance( + SchedulingTopology schedulingTopology, + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) { + return new DefaultInputConsumableDecider( + scheduledVertexRetriever, schedulingTopology::getResultPartition); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java new file mode 100644 index 00000000000..e34cb06bc4e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +/** + * {@link InputConsumableDecider} is responsible for determining whether the input of an + * executionVertex is consumable. + */ +public interface InputConsumableDecider { + /** + * Determining whether the input of an execution vertex is consumable. + * + * @param executionVertex to be determined whether it's input is consumable. + * @param verticesToSchedule vertices that are not yet scheduled but already decided to be + * scheduled. + * @param consumableStatusCache a cache for {@link ConsumedPartitionGroup} consumable status. + * This is to avoid repetitive computation. + */ + boolean isInputConsumable( + SchedulingExecutionVertex executionVertex, + Set<ExecutionVertexID> verticesToSchedule, + Map<ConsumedPartitionGroup, Boolean> consumableStatusCache); + + /** Factory for {@link InputConsumableDecider}. */ + interface Factory { + InputConsumableDecider createInstance( + SchedulingTopology schedulingTopology, + Function<ExecutionVertexID, Boolean> scheduledVertexRetriever); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java index 4d217a9eb97..74c47234a3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java @@ -51,12 +51,20 @@ public class VertexwiseSchedulingStrategy private final Set<ExecutionVertexID> newVertices = new HashSet<>(); + private final Set<ExecutionVertexID> scheduledVertices = new HashSet<>(); + + private final InputConsumableDecider inputConsumableDecider; + public VertexwiseSchedulingStrategy( final SchedulerOperations schedulerOperations, - final SchedulingTopology schedulingTopology) { + final SchedulingTopology schedulingTopology, + final InputConsumableDecider.Factory inputConsumableDeciderFactory) { this.schedulerOperations = checkNotNull(schedulerOperations); this.schedulingTopology = checkNotNull(schedulingTopology); + this.inputConsumableDecider = + inputConsumableDeciderFactory.createInstance( + schedulingTopology, scheduledVertices::contains); schedulingTopology.registerSchedulingTopologyListener(this); } @@ -73,6 +81,7 @@ public class VertexwiseSchedulingStrategy @Override public void restartTasks(Set<ExecutionVertexID> verticesToRestart) { + scheduledVertices.removeAll(verticesToRestart); maybeScheduleVertices(verticesToRestart); } @@ -123,12 +132,13 @@ public class VertexwiseSchedulingStrategy SchedulingExecutionVertex vertex = schedulingTopology.getVertex(vertexId); checkState(vertex.getState() == ExecutionState.CREATED); - return areVertexInputsAllConsumable( - vertex, consumableStatusCache); + return inputConsumableDecider.isInputConsumable( + vertex, Collections.emptySet(), consumableStatusCache); }) .collect(Collectors.toSet()); scheduleVerticesOneByOne(verticesToDeploy); + scheduledVertices.addAll(verticesToDeploy); } private void scheduleVerticesOneByOne(final Set<ExecutionVertexID> verticesToDeploy) { @@ -143,37 +153,16 @@ public class VertexwiseSchedulingStrategy id -> schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id))); } - private boolean areVertexInputsAllConsumable( - SchedulingExecutionVertex vertex, - Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) { - for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) { - - if (!consumableStatusCache.computeIfAbsent( - consumedPartitionGroup, this::isConsumedPartitionGroupConsumable)) { - return false; - } - } - return true; - } - - private boolean isConsumedPartitionGroupConsumable( - final ConsumedPartitionGroup consumedPartitionGroup) { - for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) { - if (schedulingTopology.getResultPartition(partitionId).getState() - != ResultPartitionState.CONSUMABLE) { - return false; - } - } - return true; - } - /** The factory for creating {@link VertexwiseSchedulingStrategy}. */ public static class Factory implements SchedulingStrategyFactory { @Override public SchedulingStrategy createInstance( final SchedulerOperations schedulerOperations, final SchedulingTopology schedulingTopology) { - return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology); + return new VertexwiseSchedulingStrategy( + schedulerOperations, + schedulingTopology, + DefaultInputConsumableDecider.Factory.INSTANCE); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java new file mode 100644 index 00000000000..6b373e3c09d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DefaultInputConsumableDecider}. */ +class DefaultInputConsumableDeciderTest { + @Test + void testNotFinishedBlockingInput() { + final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + + final List<TestingSchedulingExecutionVertex> producers = + topology.addExecutionVertices().withParallelism(2).finish(); + + final List<TestingSchedulingExecutionVertex> consumer = + topology.addExecutionVertices().withParallelism(2).finish(); + + topology.connectAllToAll(producers, consumer) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.BLOCKING) + .finish(); + + DefaultInputConsumableDecider inputConsumableDecider = + createDefaultInputConsumableDecider(Collections.emptySet(), topology); + + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(0), Collections.emptySet(), new HashMap<>())) + .isFalse(); + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(1), Collections.emptySet(), new HashMap<>())) + .isFalse(); + } + + @Test + void testAllFinishedBlockingInput() { + final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + + final List<TestingSchedulingExecutionVertex> producers = + topology.addExecutionVertices().withParallelism(2).finish(); + + final List<TestingSchedulingExecutionVertex> consumer = + topology.addExecutionVertices().withParallelism(2).finish(); + + topology.connectAllToAll(producers, consumer) + .withResultPartitionState(ResultPartitionState.CONSUMABLE) + .withResultPartitionType(ResultPartitionType.BLOCKING) + .finish(); + + DefaultInputConsumableDecider inputConsumableDecider = + createDefaultInputConsumableDecider(Collections.emptySet(), topology); + + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(0), Collections.emptySet(), new HashMap<>())) + .isTrue(); + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(1), Collections.emptySet(), new HashMap<>())) + .isTrue(); + } + + @Test + void testUpstreamNotScheduledHybridInput() { + final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + + final List<TestingSchedulingExecutionVertex> producers = + topology.addExecutionVertices().withParallelism(2).finish(); + + final List<TestingSchedulingExecutionVertex> consumer = + topology.addExecutionVertices().withParallelism(2).finish(); + + topology.connectAllToAll(producers, consumer) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.HYBRID_FULL) + .finish(); + + DefaultInputConsumableDecider inputConsumableDecider = + createDefaultInputConsumableDecider(Collections.emptySet(), topology); + + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(0), Collections.emptySet(), new HashMap<>())) + .isFalse(); + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(1), Collections.emptySet(), new HashMap<>())) + .isFalse(); + } + + @Test + void testUpstreamAllScheduledHybridInput() { + final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + + final List<TestingSchedulingExecutionVertex> producers = + topology.addExecutionVertices().withParallelism(2).finish(); + + final List<TestingSchedulingExecutionVertex> consumer = + topology.addExecutionVertices().withParallelism(2).finish(); + + topology.connectAllToAll(producers, consumer) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.HYBRID_FULL) + .finish(); + + HashSet<ExecutionVertexID> scheduledVertices = new HashSet<>(); + DefaultInputConsumableDecider inputConsumableDecider = + createDefaultInputConsumableDecider(scheduledVertices, topology); + scheduledVertices.add(producers.get(0).getId()); + HashSet<ExecutionVertexID> vertexToDeploy = new HashSet<>(); + vertexToDeploy.add(producers.get(1).getId()); + + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(0), vertexToDeploy, new HashMap<>())) + .isTrue(); + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(1), vertexToDeploy, new HashMap<>())) + .isTrue(); + } + + @Test + void testHybridAndBlockingInputButBlockingInputNotFinished() { + final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + + final List<TestingSchedulingExecutionVertex> producers1 = + topology.addExecutionVertices().withParallelism(1).finish(); + + final List<TestingSchedulingExecutionVertex> producers2 = + topology.addExecutionVertices().withParallelism(1).finish(); + + final List<TestingSchedulingExecutionVertex> consumer = + topology.addExecutionVertices().withParallelism(1).finish(); + + topology.connectAllToAll(producers1, consumer) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.BLOCKING) + .finish(); + + topology.connectAllToAll(producers2, consumer) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.HYBRID_FULL) + .finish(); + + DefaultInputConsumableDecider inputConsumableDecider = + createDefaultInputConsumableDecider( + Collections.singleton(producers2.get(0).getId()), topology); + + assertThat( + inputConsumableDecider.isInputConsumable( + consumer.get(0), Collections.emptySet(), new HashMap<>())) + .isFalse(); + } + + private DefaultInputConsumableDecider createDefaultInputConsumableDecider( + Set<ExecutionVertexID> scheduledVertices, SchedulingTopology schedulingTopology) { + return new DefaultInputConsumableDecider( + scheduledVertices::contains, schedulingTopology::getResultPartition); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java new file mode 100644 index 00000000000..da2aa8241ac --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** Mock {@link InputConsumableDecider} for testing. */ +public class TestingInputConsumableDecider implements InputConsumableDecider { + + private final Set<SchedulingExecutionVertex> inputConsumableExecutionVertices = new HashSet<>(); + + private final Set<SchedulingExecutionVertex> sourceVertices = new HashSet<>(); + + private SchedulingExecutionVertex lastExecutionToDecideInputConsumable; + + @Override + public boolean isInputConsumable( + SchedulingExecutionVertex executionVertex, + Set<ExecutionVertexID> verticesToDeploy, + Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) { + lastExecutionToDecideInputConsumable = executionVertex; + return sourceVertices.contains(executionVertex) + || inputConsumableExecutionVertices.contains(executionVertex); + } + + public void setInputConsumable(SchedulingExecutionVertex executionVertex) { + inputConsumableExecutionVertices.add(executionVertex); + } + + public void addSourceVertices(Collection<SchedulingExecutionVertex> sourceVertices) { + this.sourceVertices.addAll(sourceVertices); + } + + public SchedulingExecutionVertex getLastExecutionToDecideInputConsumable() { + return lastExecutionToDecideInputConsumable; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java index 86cdd6a4f21..6b679b73e29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -44,6 +45,8 @@ class VertexwiseSchedulingStrategyTest { private TestingSchedulingTopology testingSchedulingTopology; + private TestingInputConsumableDecider inputConsumableDecider; + private List<TestingSchedulingExecutionVertex> source; private List<TestingSchedulingExecutionVertex> map; @@ -53,7 +56,7 @@ class VertexwiseSchedulingStrategyTest { @BeforeEach void setUp() { testingSchedulerOperation = new TestingSchedulerOperations(); - + inputConsumableDecider = new TestingInputConsumableDecider(); buildTopology(); } @@ -90,13 +93,17 @@ class VertexwiseSchedulingStrategyTest { @Test void testStartScheduling() { - startScheduling(testingSchedulingTopology); + VertexwiseSchedulingStrategy schedulingStrategy = + createSchedulingStrategy(testingSchedulingTopology); final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<>(); expectedScheduledVertices.add(Collections.singletonList(source.get(0))); expectedScheduledVertices.add(Collections.singletonList(source.get(1))); + inputConsumableDecider.addSourceVertices(new HashSet<>(source)); + + schedulingStrategy.startScheduling(); assertLatestScheduledVerticesAreEqualTo( expectedScheduledVertices, testingSchedulerOperation); } @@ -104,7 +111,9 @@ class VertexwiseSchedulingStrategyTest { @Test void testRestartTasks() { final VertexwiseSchedulingStrategy schedulingStrategy = - startScheduling(testingSchedulingTopology); + createSchedulingStrategy(testingSchedulingTopology); + + inputConsumableDecider.addSourceVertices(new HashSet<>(source)); final Set<ExecutionVertexID> verticesToRestart = Stream.of(source, map, sink) @@ -126,30 +135,40 @@ class VertexwiseSchedulingStrategyTest { void testOnExecutionStateChangeToFinished() { // trigger source1, source2 scheduled. final VertexwiseSchedulingStrategy schedulingStrategy = - startScheduling(testingSchedulingTopology); + createSchedulingStrategy(testingSchedulingTopology); + + inputConsumableDecider.addSourceVertices(new HashSet<>(source)); + + schedulingStrategy.startScheduling(); assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(2); // trigger map1 scheduled final TestingSchedulingExecutionVertex source1 = source.get(0); - source1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE); + inputConsumableDecider.setInputConsumable(map.get(0)); schedulingStrategy.onExecutionStateChange(source1.getId(), ExecutionState.FINISHED); assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(3); // trigger map2 scheduled final TestingSchedulingExecutionVertex source2 = source.get(1); - source2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE); + inputConsumableDecider.setInputConsumable(map.get(1)); schedulingStrategy.onExecutionStateChange(source2.getId(), ExecutionState.FINISHED); assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4); - // sinks' inputs are not all consumable yet so they are not scheduled + // sinks' inputs are not consumable yet so they are not scheduled final TestingSchedulingExecutionVertex map1 = map.get(0); - map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE); schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED); + assertThat( + inputConsumableDecider + .getLastExecutionToDecideInputConsumable() + .getId() + .getJobVertexId()) + .isEqualTo(sink.get(0).getId().getJobVertexId()); assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(4); // trigger sink1, sink2 scheduled final TestingSchedulingExecutionVertex map2 = map.get(1); - map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE); + inputConsumableDecider.setInputConsumable(sink.get(0)); + inputConsumableDecider.setInputConsumable(sink.get(1)); schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED); assertThat(testingSchedulerOperation.getScheduledVertices()).hasSize(6); @@ -181,7 +200,9 @@ class VertexwiseSchedulingStrategyTest { final List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish(); - final VertexwiseSchedulingStrategy schedulingStrategy = startScheduling(topology); + final VertexwiseSchedulingStrategy schedulingStrategy = createSchedulingStrategy(topology); + inputConsumableDecider.addSourceVertices(new HashSet<>(producers)); + schedulingStrategy.startScheduling(); final List<TestingSchedulingExecutionVertex> consumers = topology.addExecutionVertices().withParallelism(2).finish(); @@ -193,12 +214,10 @@ class VertexwiseSchedulingStrategyTest { // add consumers to scheduling strategy. if (allToAll) { topology.connectAllToAll(producers, consumers) - .withResultPartitionState(ResultPartitionState.CONSUMABLE) .withResultPartitionType(ResultPartitionType.BLOCKING) .finish(); } else { topology.connectPointwise(producers, consumers) - .withResultPartitionState(ResultPartitionState.CONSUMABLE) .withResultPartitionType(ResultPartitionType.BLOCKING) .finish(); } @@ -209,6 +228,8 @@ class VertexwiseSchedulingStrategyTest { consumers.stream() .map(TestingSchedulingExecutionVertex::getId) .collect(Collectors.toList())); + inputConsumableDecider.setInputConsumable(consumers.get(0)); + inputConsumableDecider.setInputConsumable(consumers.get(1)); schedulingStrategy.onExecutionStateChange( producers.get(1).getId(), ExecutionState.FINISHED); @@ -223,10 +244,10 @@ class VertexwiseSchedulingStrategyTest { testingSchedulerOperation); } - VertexwiseSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) { - final VertexwiseSchedulingStrategy schedulingStrategy = - new VertexwiseSchedulingStrategy(testingSchedulerOperation, schedulingTopology); - schedulingStrategy.startScheduling(); - return schedulingStrategy; + VertexwiseSchedulingStrategy createSchedulingStrategy(SchedulingTopology schedulingTopology) { + return new VertexwiseSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + (ignore1, ignore2) -> inputConsumableDecider); } }
