Thesharing commented on a change in pull request #16436:
URL: https://github.com/apache/flink/pull/16436#discussion_r668403994
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -88,6 +88,7 @@ private static void connectAllToAll(
Arrays.stream(intermediateResult.getPartitions())
.map(IntermediateResultPartition::getPartitionId)
.collect(Collectors.toList()));
+ registerConsumedPartitionGroupToEdgeManager(intermediateResult,
consumedPartitions);
Review comment:
I think variable arguments are not applicable here. The consumed
partition ids have been put into an array list. So I create two overloaded
methods with different parameters.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -72,6 +75,13 @@ public ResultPartitionType getResultType() {
return
getEdgeManager().getConsumerVertexGroupsForPartition(partitionId);
}
+ public List<ConsumedPartitionGroup> getConsumedPartitionGroups() {
+ if (consumedPartitionGroups == null) {
+ consumedPartitionGroups =
getEdgeManager().getConsumedPartitionGroupsById(partitionId);
Review comment:
Agreed.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -485,4 +491,39 @@ public void testMoreThanOneConsumerForIntermediateResult()
{
fail(e.getMessage());
}
}
+
+ @Test
+ public void testRegisterConsumedPartitionGroupToEdgeManager() throws
Exception {
+ JobVertex v1 = new JobVertex("source");
+ JobVertex v2 = new JobVertex("sink");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+
+ v2.connectNewDataSetAsInput(
+ v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+
+ List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+ ExecutionGraph eg = createDefaultExecutionGraph(ordered);
+ eg.attachJobGraph(ordered);
+
+ IntermediateResult result =
+
Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+ IntermediateResultPartition partition1 = result.getPartitions()[0];
+ IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+ assertEquals(
+ partition1.getConsumedPartitionGroups().get(0),
+ partition2.getConsumedPartitionGroups().get(0));
+
+ ConsumedPartitionGroup consumedPartitionGroup =
+ partition1.getConsumedPartitionGroups().get(0);
+ Set<IntermediateResultPartitionID> partitionIds = new HashSet<>();
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ partitionIds.add(partitionId);
+ }
+ assertTrue(partitionIds.contains(partition1.getPartitionId()));
Review comment:
Agreed. I've replaced with `org.hamcrest.MatcherAssert.assertThat`,
since `org.junit.Assert.assertThat` has been deprecated.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
##########
@@ -34,30 +46,71 @@
/** Unit tests for {@link DefaultResultPartition}. */
public class DefaultResultPartitionTest extends TestLogger {
- private static final TestResultPartitionStateSupplier resultPartitionState
=
- new TestResultPartitionStateSupplier();
-
- private final IntermediateResultPartitionID resultPartitionId =
- new IntermediateResultPartitionID();
- private final IntermediateDataSetID intermediateResultId = new
IntermediateDataSetID();
+ @Test
+ public void testGetPartitionState() {
+ final TestResultPartitionStateSupplier resultPartitionState =
+ new TestResultPartitionStateSupplier();
- private DefaultResultPartition resultPartition;
+ final IntermediateResultPartitionID resultPartitionId = new
IntermediateResultPartitionID();
+ final IntermediateDataSetID intermediateResultId = new
IntermediateDataSetID();
- @Before
- public void setUp() {
- resultPartition =
+ final DefaultResultPartition resultPartition =
new DefaultResultPartition(
resultPartitionId, intermediateResultId, BLOCKING,
resultPartitionState);
- }
- @Test
- public void testGetPartitionState() {
for (ResultPartitionState state : ResultPartitionState.values()) {
resultPartitionState.setResultPartitionState(state);
assertEquals(state, resultPartition.getState());
}
}
+ @Test
+ public void testGetConsumedPartitionGroup() throws Exception {
Review comment:
Moved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -94,6 +94,9 @@ void addConsumedPartition(TestingSchedulingResultPartition
consumedPartition) {
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId());
consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
+ if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) {
Review comment:
Yes, thank you for pointing this out. This should be simplified.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
##########
@@ -174,10 +174,12 @@ public void
testFinishedBlockingResultPartitionProducerDoNotScheduleNonCreatedRe
final PipelinedRegionSchedulingStrategy schedulingStrategy =
startScheduling(topology);
+ // non-CREATED regions should not be re-scheduled
consumer.get(0).setState(ExecutionState.SCHEDULED);
Review comment:
I'll remove this case, since the changes of FLINK-21707 should be
removed along with this pull request.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -58,4 +64,21 @@ public boolean isEmpty() {
public IntermediateResultPartitionID getFirst() {
return iterator().next();
}
+
+ public int partitionUnfinished() {
+ return unfinishedPartitions.incrementAndGet();
+ }
+
+ public int partitionFinished() {
+ return unfinishedPartitions.decrementAndGet();
+ }
+
+ @VisibleForTesting
+ public int getUnfinishedPartition() {
+ return unfinishedPartitions.get();
+ }
+
+ public boolean isConsumable() {
Review comment:
Agreed. Thank you for pointing this out.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -201,13 +182,11 @@ public void onExecutionStateChange(
.filter(
partition ->
partition.getState() ==
ResultPartitionState.CONSUMABLE)
- .flatMap(
- partition ->
- correlatedResultPartitionGroups
- .getOrDefault(
-
partition.getResultId(),
-
Collections.emptySet())
- .stream())
+ .flatMap(partition ->
partition.getConsumedPartitionGroups().stream())
+ .filter(
+ group ->
+
crossRegionConsumedPartitionGroups.contains(group)
+ || group.isConsumable())
.collect(Collectors.toSet());
// for POINTWISE consumers of a BLOCKING partition, it's possible
that some of the
Review comment:
Yes. Now pointwise blocking partitions will schedule itself rather than
all partitions belong to the same IntermediateResult. This filter is no longer
needed. Should we remove the sanity check in `maybeScheduleRegion`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -120,17 +118,8 @@ boolean markFinished() {
hasDataProduced = true;
Review comment:
Thank you for pointing this out. I've added a check to make sure that
`hasDataProduced == false` when `markFinished` is called.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -89,4 +92,17 @@ public void connectVertexWithConsumedPartitionGroup(
return Collections.unmodifiableList(
getConsumedPartitionGroupsForVertexInternal(executionVertexId));
}
+
+ public void registerConsumedPartitionGroup(ConsumedPartitionGroup group) {
+ for (IntermediateResultPartitionID partitionId : group) {
+ consumedPartitionsById
+ .computeIfAbsent(partitionId, ignore -> new ArrayList<>())
+ .add(group);
+ }
+ }
+
+ public List<ConsumedPartitionGroup> getConsumedPartitionGroupsById(
+ IntermediateResultPartitionID id) {
+ return Collections.unmodifiableList(consumedPartitionsById.get(id));
Review comment:
Agree. We don't throw any exception here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -58,4 +64,21 @@ public boolean isEmpty() {
public IntermediateResultPartitionID getFirst() {
return iterator().next();
}
+
+ public int partitionUnfinished() {
+ return unfinishedPartitions.incrementAndGet();
+ }
+
+ public int partitionFinished() {
+ return unfinishedPartitions.decrementAndGet();
+ }
+
+ @VisibleForTesting
+ public int getUnfinishedPartition() {
Review comment:
Agreed and done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]