Thesharing commented on a change in pull request #16436:
URL: https://github.com/apache/flink/pull/16436#discussion_r668403994



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -88,6 +88,7 @@ private static void connectAllToAll(
                         Arrays.stream(intermediateResult.getPartitions())
                                 
.map(IntermediateResultPartition::getPartitionId)
                                 .collect(Collectors.toList()));
+        registerConsumedPartitionGroupToEdgeManager(intermediateResult, 
consumedPartitions);

Review comment:
       I think variable arguments are not applicable here. The consumed 
partition ids have been put into an array list. So I create two overloaded 
methods with different parameters.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -72,6 +75,13 @@ public ResultPartitionType getResultType() {
         return 
getEdgeManager().getConsumerVertexGroupsForPartition(partitionId);
     }
 
+    public List<ConsumedPartitionGroup> getConsumedPartitionGroups() {
+        if (consumedPartitionGroups == null) {
+            consumedPartitionGroups = 
getEdgeManager().getConsumedPartitionGroupsById(partitionId);

Review comment:
       Agreed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -485,4 +491,39 @@ public void testMoreThanOneConsumerForIntermediateResult() 
{
             fail(e.getMessage());
         }
     }
+
+    @Test
+    public void testRegisterConsumedPartitionGroupToEdgeManager() throws 
Exception {
+        JobVertex v1 = new JobVertex("source");
+        JobVertex v2 = new JobVertex("sink");
+
+        v1.setParallelism(2);
+        v2.setParallelism(2);
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+        ExecutionGraph eg = createDefaultExecutionGraph(ordered);
+        eg.attachJobGraph(ordered);
+
+        IntermediateResult result =
+                
Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+        IntermediateResultPartition partition1 = result.getPartitions()[0];
+        IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+        assertEquals(
+                partition1.getConsumedPartitionGroups().get(0),
+                partition2.getConsumedPartitionGroups().get(0));
+
+        ConsumedPartitionGroup consumedPartitionGroup =
+                partition1.getConsumedPartitionGroups().get(0);
+        Set<IntermediateResultPartitionID> partitionIds = new HashSet<>();
+        for (IntermediateResultPartitionID partitionId : 
consumedPartitionGroup) {
+            partitionIds.add(partitionId);
+        }
+        assertTrue(partitionIds.contains(partition1.getPartitionId()));

Review comment:
       Agreed. I've replaced with `org.hamcrest.MatcherAssert.assertThat`, 
since `org.junit.Assert.assertThat` has been deprecated.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
##########
@@ -34,30 +46,71 @@
 /** Unit tests for {@link DefaultResultPartition}. */
 public class DefaultResultPartitionTest extends TestLogger {
 
-    private static final TestResultPartitionStateSupplier resultPartitionState 
=
-            new TestResultPartitionStateSupplier();
-
-    private final IntermediateResultPartitionID resultPartitionId =
-            new IntermediateResultPartitionID();
-    private final IntermediateDataSetID intermediateResultId = new 
IntermediateDataSetID();
+    @Test
+    public void testGetPartitionState() {
+        final TestResultPartitionStateSupplier resultPartitionState =
+                new TestResultPartitionStateSupplier();
 
-    private DefaultResultPartition resultPartition;
+        final IntermediateResultPartitionID resultPartitionId = new 
IntermediateResultPartitionID();
+        final IntermediateDataSetID intermediateResultId = new 
IntermediateDataSetID();
 
-    @Before
-    public void setUp() {
-        resultPartition =
+        final DefaultResultPartition resultPartition =
                 new DefaultResultPartition(
                         resultPartitionId, intermediateResultId, BLOCKING, 
resultPartitionState);
-    }
 
-    @Test
-    public void testGetPartitionState() {
         for (ResultPartitionState state : ResultPartitionState.values()) {
             resultPartitionState.setResultPartitionState(state);
             assertEquals(state, resultPartition.getState());
         }
     }
 
+    @Test
+    public void testGetConsumedPartitionGroup() throws Exception {

Review comment:
       Moved.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -94,6 +94,9 @@ void addConsumedPartition(TestingSchedulingResultPartition 
consumedPartition) {
                 
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId());
 
         
consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
+        if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) {

Review comment:
       Yes, thank you for pointing this out. This should be simplified.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
##########
@@ -174,10 +174,12 @@ public void 
testFinishedBlockingResultPartitionProducerDoNotScheduleNonCreatedRe
 
         final PipelinedRegionSchedulingStrategy schedulingStrategy = 
startScheduling(topology);
 
+        // non-CREATED regions should not be re-scheduled
         consumer.get(0).setState(ExecutionState.SCHEDULED);

Review comment:
       I'll remove this case, since the changes of FLINK-21707 should be 
removed along with this pull request.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -58,4 +64,21 @@ public boolean isEmpty() {
     public IntermediateResultPartitionID getFirst() {
         return iterator().next();
     }
+
+    public int partitionUnfinished() {
+        return unfinishedPartitions.incrementAndGet();
+    }
+
+    public int partitionFinished() {
+        return unfinishedPartitions.decrementAndGet();
+    }
+
+    @VisibleForTesting
+    public int getUnfinishedPartition() {
+        return unfinishedPartitions.get();
+    }
+
+    public boolean isConsumable() {

Review comment:
       Agreed. Thank you for pointing this out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
##########
@@ -201,13 +182,11 @@ public void onExecutionStateChange(
                             .filter(
                                     partition ->
                                             partition.getState() == 
ResultPartitionState.CONSUMABLE)
-                            .flatMap(
-                                    partition ->
-                                            correlatedResultPartitionGroups
-                                                    .getOrDefault(
-                                                            
partition.getResultId(),
-                                                            
Collections.emptySet())
-                                                    .stream())
+                            .flatMap(partition -> 
partition.getConsumedPartitionGroups().stream())
+                            .filter(
+                                    group ->
+                                            
crossRegionConsumedPartitionGroups.contains(group)
+                                                    || group.isConsumable())
                             .collect(Collectors.toSet());
 
             // for POINTWISE consumers of a BLOCKING partition, it's possible 
that some of the

Review comment:
       Yes. Now pointwise blocking partitions will schedule itself rather than 
all partitions belong to the same IntermediateResult. This filter is no longer 
needed. Should we remove the sanity check in `maybeScheduleRegion`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -120,17 +118,8 @@ boolean markFinished() {
 
         hasDataProduced = true;

Review comment:
       Thank you for pointing this out. I've added a check to make sure that 
`hasDataProduced == false` when `markFinished` is called.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
##########
@@ -89,4 +92,17 @@ public void connectVertexWithConsumedPartitionGroup(
         return Collections.unmodifiableList(
                 
getConsumedPartitionGroupsForVertexInternal(executionVertexId));
     }
+
+    public void registerConsumedPartitionGroup(ConsumedPartitionGroup group) {
+        for (IntermediateResultPartitionID partitionId : group) {
+            consumedPartitionsById
+                    .computeIfAbsent(partitionId, ignore -> new ArrayList<>())
+                    .add(group);
+        }
+    }
+
+    public List<ConsumedPartitionGroup> getConsumedPartitionGroupsById(
+            IntermediateResultPartitionID id) {
+        return Collections.unmodifiableList(consumedPartitionsById.get(id));

Review comment:
       Agree. We don't throw any exception here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
##########
@@ -58,4 +64,21 @@ public boolean isEmpty() {
     public IntermediateResultPartitionID getFirst() {
         return iterator().next();
     }
+
+    public int partitionUnfinished() {
+        return unfinishedPartitions.incrementAndGet();
+    }
+
+    public int partitionFinished() {
+        return unfinishedPartitions.decrementAndGet();
+    }
+
+    @VisibleForTesting
+    public int getUnfinishedPartition() {

Review comment:
       Agreed and done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to