zhuzhurk commented on a change in pull request #16436:
URL: https://github.com/apache/flink/pull/16436#discussion_r668312184
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -72,6 +75,13 @@ public ResultPartitionType getResultType() {
return
getEdgeManager().getConsumerVertexGroupsForPartition(partitionId);
}
+ public List<ConsumedPartitionGroup> getConsumedPartitionGroups() {
+ if (consumedPartitionGroups == null) {
+ consumedPartitionGroups =
getEdgeManager().getConsumedPartitionGroupsById(partitionId);
Review comment:
I think we do not need this field `consumedPartitionGroups`.
Using it without initialization may even cause problems.
In the next commit I can see that `getConsumedPartitionGroups()` is used
instead of `consumedPartitionGroups`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -88,6 +88,7 @@ private static void connectAllToAll(
Arrays.stream(intermediateResult.getPartitions())
.map(IntermediateResultPartition::getPartitionId)
.collect(Collectors.toList()));
+ registerConsumedPartitionGroupToEdgeManager(intermediateResult,
consumedPartitions);
Review comment:
maybe add a method
`createAndRegisterConsumedPartitionGroup(IntermediateResultPartition...
partitions)`, in case some `ConsumedPartitionGroup` is created but not
registered?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -485,4 +491,39 @@ public void testMoreThanOneConsumerForIntermediateResult()
{
fail(e.getMessage());
}
}
+
+ @Test
+ public void testRegisterConsumedPartitionGroupToEdgeManager() throws
Exception {
+ JobVertex v1 = new JobVertex("source");
+ JobVertex v2 = new JobVertex("sink");
+
+ v1.setParallelism(2);
+ v2.setParallelism(2);
+
+ v2.connectNewDataSetAsInput(
+ v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+
+ List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+ ExecutionGraph eg = createDefaultExecutionGraph(ordered);
+ eg.attachJobGraph(ordered);
+
+ IntermediateResult result =
+
Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+ IntermediateResultPartition partition1 = result.getPartitions()[0];
+ IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+ assertEquals(
+ partition1.getConsumedPartitionGroups().get(0),
+ partition2.getConsumedPartitionGroups().get(0));
+
+ ConsumedPartitionGroup consumedPartitionGroup =
+ partition1.getConsumedPartitionGroups().get(0);
+ Set<IntermediateResultPartitionID> partitionIds = new HashSet<>();
+ for (IntermediateResultPartitionID partitionId :
consumedPartitionGroup) {
+ partitionIds.add(partitionId);
+ }
+ assertTrue(partitionIds.contains(partition1.getPartitionId()));
Review comment:
can be `assertThat(partitionIds,
containsInAnyOrder(partition1.getPartitionId(), partition2.getPartitionId()))`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -89,4 +92,17 @@ public void connectVertexWithConsumedPartitionGroup(
return Collections.unmodifiableList(
getConsumedPartitionGroupsForVertexInternal(executionVertexId));
}
+
+ public void registerConsumedPartitionGroup(ConsumedPartitionGroup group) {
+ for (IntermediateResultPartitionID partitionId : group) {
+ consumedPartitionsById
+ .computeIfAbsent(partitionId, ignore -> new ArrayList<>())
+ .add(group);
+ }
+ }
+
+ public List<ConsumedPartitionGroup> getConsumedPartitionGroupsById(
+ IntermediateResultPartitionID id) {
+ return Collections.unmodifiableList(consumedPartitionsById.get(id));
Review comment:
maybe `consumedPartitionsById.get(id)` ->
`consumedPartitionsById.computeIfAbsent(id, id -> new ArrayList<>())`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
##########
@@ -34,30 +46,71 @@
/** Unit tests for {@link DefaultResultPartition}. */
public class DefaultResultPartitionTest extends TestLogger {
- private static final TestResultPartitionStateSupplier resultPartitionState
=
- new TestResultPartitionStateSupplier();
-
- private final IntermediateResultPartitionID resultPartitionId =
- new IntermediateResultPartitionID();
- private final IntermediateDataSetID intermediateResultId = new
IntermediateDataSetID();
+ @Test
+ public void testGetPartitionState() {
+ final TestResultPartitionStateSupplier resultPartitionState =
+ new TestResultPartitionStateSupplier();
- private DefaultResultPartition resultPartition;
+ final IntermediateResultPartitionID resultPartitionId = new
IntermediateResultPartitionID();
+ final IntermediateDataSetID intermediateResultId = new
IntermediateDataSetID();
- @Before
- public void setUp() {
- resultPartition =
+ final DefaultResultPartition resultPartition =
new DefaultResultPartition(
resultPartitionId, intermediateResultId, BLOCKING,
resultPartitionState);
- }
- @Test
- public void testGetPartitionState() {
for (ResultPartitionState state : ResultPartitionState.values()) {
resultPartitionState.setResultPartitionState(state);
assertEquals(state, resultPartition.getState());
}
}
+ @Test
+ public void testGetConsumedPartitionGroup() throws Exception {
Review comment:
I think this case is better to be placed in `EdgeManagerTest`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -58,4 +64,21 @@ public boolean isEmpty() {
public IntermediateResultPartitionID getFirst() {
return iterator().next();
}
+
+ public int partitionUnfinished() {
+ return unfinishedPartitions.incrementAndGet();
+ }
+
+ public int partitionFinished() {
+ return unfinishedPartitions.decrementAndGet();
+ }
+
+ @VisibleForTesting
+ public int getUnfinishedPartition() {
Review comment:
`getUnfinishedPartition` -> `getNumberOfUnfinishedPartitions`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -201,13 +182,11 @@ public void onExecutionStateChange(
.filter(
partition ->
partition.getState() ==
ResultPartitionState.CONSUMABLE)
- .flatMap(
- partition ->
- correlatedResultPartitionGroups
- .getOrDefault(
-
partition.getResultId(),
-
Collections.emptySet())
- .stream())
+ .flatMap(partition ->
partition.getConsumedPartitionGroups().stream())
+ .filter(
+ group ->
+
crossRegionConsumedPartitionGroups.contains(group)
+ || group.isConsumable())
.collect(Collectors.toSet());
// for POINTWISE consumers of a BLOCKING partition, it's possible
that some of the
Review comment:
The following filter `areRegionVerticesAllInCreatedState` can be removed
now
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -94,6 +94,9 @@ void addConsumedPartition(TestingSchedulingResultPartition
consumedPartition) {
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId());
consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
+ if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) {
Review comment:
can we put this condition block into
`consumedPartition.registerConsumedPartitionGroup`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -120,17 +118,8 @@ boolean markFinished() {
hasDataProduced = true;
Review comment:
it's better to check that this partition was not finished yet.
this can expose problems to ease trouble shooting.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
##########
@@ -174,10 +174,12 @@ public void
testFinishedBlockingResultPartitionProducerDoNotScheduleNonCreatedRe
final PipelinedRegionSchedulingStrategy schedulingStrategy =
startScheduling(topology);
+ // non-CREATED regions should not be re-scheduled
consumer.get(0).setState(ExecutionState.SCHEDULED);
Review comment:
now we do not expect this case to happen.
exception should be thrown in this case.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -58,4 +64,21 @@ public boolean isEmpty() {
public IntermediateResultPartitionID getFirst() {
return iterator().next();
}
+
+ public int partitionUnfinished() {
+ return unfinishedPartitions.incrementAndGet();
+ }
+
+ public int partitionFinished() {
+ return unfinishedPartitions.decrementAndGet();
+ }
+
+ @VisibleForTesting
+ public int getUnfinishedPartition() {
+ return unfinishedPartitions.get();
+ }
+
+ public boolean isConsumable() {
Review comment:
This is not right for pipelined partitions.
So I think a name `areAllPartitionsFinished()` is more accurate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]