zhuzhurk commented on a change in pull request #18102:
URL: https://github.com/apache/flink/pull/18102#discussion_r786479084



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
##########
@@ -61,6 +64,8 @@ public RegionPartitionGroupReleaseStrategy(final 
SchedulingTopology schedulingTo
         this.consumerRegionGroupExecutionViewMaintainer =

Review comment:
       Can we introduce a method like 
`notifySchedulingTopologyUpdatedInternal(SchedulingTopology)` and use it to do 
initialization so that the logical are guaranteed to be consistent?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -44,25 +44,35 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy implements SlotSharingStrategy {
+class LocalInputPreferredSlotSharingStrategy
+        implements SlotSharingStrategy, SchedulingTopologyListener {
 
     private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
executionSlotSharingGroupMap;
 
+    private final Set<SlotSharingGroup> logicalSlotSharingGroups;
+
+    private final Set<CoLocationGroup> coLocationGroups;
+
     LocalInputPreferredSlotSharingStrategy(
             final SchedulingTopology topology,
             final Set<SlotSharingGroup> logicalSlotSharingGroups,
             final Set<CoLocationGroup> coLocationGroups) {
 
+        this.logicalSlotSharingGroups = logicalSlotSharingGroups;

Review comment:
       checkNotNull

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
##########
@@ -144,6 +149,54 @@ private PipelinedRegionExecutionView 
getPipelinedRegionExecutionViewForVertex(
         return releasablePartitionGroups;
     }
 
+    @VisibleForTesting
+    public boolean isRegionOfVertexFinished(final ExecutionVertexID 
executionVertexId) {
+        final PipelinedRegionExecutionView regionExecutionView =
+                getPipelinedRegionExecutionViewForVertex(executionVertexId);
+        return regionExecutionView.isFinished();
+    }
+
+    @Override
+    public void notifySchedulingTopologyUpdated(
+            SchedulingTopology schedulingTopology, List<ExecutionVertexID> 
newlyAddedVertices) {
+
+        final List<ConsumerRegionGroupExecutionView> newlyAddedRegionGroup = 
new ArrayList<>();

Review comment:
       I think it would be better if we organize the method logic to be:
   1. find all newly added regions
   2. invoke `initRegionExecutionViewByVertex()`
   3. invoke `initPartitionGroupConsumerRegions`
   4. invoke `maintainer.notifyNewRegionGroupExecutionViews()`
   
   `initRegionExecutionViewByVertex` and `initPartitionGroupConsumerRegions()` 
may need some small change. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -142,7 +175,7 @@ public LocalInputPreferredSlotSharingStrategy create(
         private final Map<ConsumedPartitionGroup, 
LinkedHashSet<ExecutionSlotSharingGroup>>
                 candidateGroupsForConsumedPartitionGroup;
 
-        private ExecutionSlotSharingGroupBuilder(

Review comment:
       the change is not needed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java
##########
@@ -301,6 +302,57 @@ public void 
testInputLocalityIsRespectedWithTwoEdgesBetweenTwoVertices() throws
         }
     }
 
+    @Test
+    public void testGetExecutionSlotSharingGroupOfLateAttachedVertices() {
+
+        JobVertexID jobVertexID1 = new JobVertexID();
+        JobVertexID jobVertexID2 = new JobVertexID();
+        JobVertexID jobVertexID3 = new JobVertexID();
+
+        final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+        slotSharingGroup1.addVertexToGroup(jobVertexID1);
+        slotSharingGroup1.addVertexToGroup(jobVertexID2);
+
+        final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
+        slotSharingGroup2.addVertexToGroup(jobVertexID3);
+
+        TestingSchedulingTopology topology = new TestingSchedulingTopology();
+
+        TestingSchedulingExecutionVertex ev1 = 
topology.newExecutionVertex(jobVertexID1, 0);
+        TestingSchedulingExecutionVertex ev2 = 
topology.newExecutionVertex(jobVertexID2, 0);
+        topology.connect(ev1, ev2);
+
+        final LocalInputPreferredSlotSharingStrategy strategy =
+                new LocalInputPreferredSlotSharingStrategy(
+                        topology,
+                        new HashSet<>(Arrays.asList(slotSharingGroup1, 
slotSharingGroup2)),
+                        Collections.emptySet());
+
+        assertThat(strategy.getExecutionSlotSharingGroups().size(), is(1));
+        assertThat(
+                
strategy.getExecutionSlotSharingGroup(ev1.getId()).getExecutionVertexIds(),
+                containsInAnyOrder(ev1.getId(), ev2.getId()));
+        assertThat(
+                
strategy.getExecutionSlotSharingGroup(ev2.getId()).getExecutionVertexIds(),
+                containsInAnyOrder(ev1.getId(), ev2.getId()));
+
+        // add new job vertices and notify scheduling topology updated
+        TestingSchedulingExecutionVertex ev3 = 
topology.newExecutionVertex(jobVertexID3, 0);
+        topology.connect(ev2, ev3);

Review comment:
       -> topology.connect(ev2, ev3, ResultPartitionType.BLOCKING);
   
   a.t.m we do not expect pipelined downstream vertices to be attached 
dynamically

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -44,25 +44,35 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy implements SlotSharingStrategy {
+class LocalInputPreferredSlotSharingStrategy
+        implements SlotSharingStrategy, SchedulingTopologyListener {
 
     private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
executionSlotSharingGroupMap;
 
+    private final Set<SlotSharingGroup> logicalSlotSharingGroups;
+
+    private final Set<CoLocationGroup> coLocationGroups;
+
     LocalInputPreferredSlotSharingStrategy(
             final SchedulingTopology topology,
             final Set<SlotSharingGroup> logicalSlotSharingGroups,
             final Set<CoLocationGroup> coLocationGroups) {
 
+        this.logicalSlotSharingGroups = logicalSlotSharingGroups;
+        this.coLocationGroups = coLocationGroups;

Review comment:
       checkNotNull

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -205,6 +216,19 @@ public void notifyExecutionGraphUpdated(
         updatePipelinedRegions(newAddedExecutionVertices);
 
         ensureCoLocatedVerticesInSameRegion(pipelinedRegions, executionGraph);
+
+        // notify update.

Review comment:
       I think this comment does not provide any additional information

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
##########
@@ -144,6 +149,54 @@ private PipelinedRegionExecutionView 
getPipelinedRegionExecutionViewForVertex(
         return releasablePartitionGroups;
     }
 
+    @VisibleForTesting
+    public boolean isRegionOfVertexFinished(final ExecutionVertexID 
executionVertexId) {
+        final PipelinedRegionExecutionView regionExecutionView =
+                getPipelinedRegionExecutionViewForVertex(executionVertexId);
+        return regionExecutionView.isFinished();
+    }
+
+    @Override
+    public void notifySchedulingTopologyUpdated(
+            SchedulingTopology schedulingTopology, List<ExecutionVertexID> 
newlyAddedVertices) {
+
+        final List<ConsumerRegionGroupExecutionView> newlyAddedRegionGroup = 
new ArrayList<>();

Review comment:
       And then we can extract step 2/3/4 to be a method 
`notifySchedulingTopologyUpdatedInternal()` to be reused by the ctor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -88,7 +121,7 @@ public LocalInputPreferredSlotSharingStrategy create(
         }
     }
 
-    private static class ExecutionSlotSharingGroupBuilder {

Review comment:
       the change is not needed

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
##########
@@ -61,6 +64,8 @@ public RegionPartitionGroupReleaseStrategy(final 
SchedulingTopology schedulingTo
         this.consumerRegionGroupExecutionViewMaintainer =

Review comment:
       We can change `ConsumerRegionGroupExecutionViewMaintainer` ctor to be 
paramless to simplify the implementation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to