Thesharing commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r688926551



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +260,116 @@ private ExecutionSlotSharingGroup 
tryFindAvailableProducerExecutionSlotSharingGr
 
             final ExecutionVertexID executionVertexId = 
executionVertex.getId();
 
-            for (SchedulingResultPartition partition : 
executionVertex.getConsumedResults()) {
-                final ExecutionVertexID producerVertexId = 
partition.getProducer().getId();
-                if (!inSameLogicalSlotSharingGroup(producerVertexId, 
executionVertexId)) {
-                    continue;
-                }
-
-                final ExecutionSlotSharingGroup producerGroup =
-                        executionSlotSharingGroupMap.get(producerVertexId);
-
-                checkState(producerGroup != null);
-                if (isGroupAvailableForVertex(producerGroup, 
executionVertexId)) {
-                    return producerGroup;
+            Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex =
+                    
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+
+                Set<ExecutionSlotSharingGroup> candidateGroups =
+                        
candidateGroupsForConsumedPartitionGroup.computeIfAbsent(
+                                consumedPartitionGroup,
+                                group ->
+                                        
computeAllCandidateGroupsForConsumedPartitionGroup(
+                                                
executionVertexId.getJobVertexId(), group));
+
+                Iterator<ExecutionSlotSharingGroup> candidateIterator = 
candidateGroups.iterator();
+
+                while (candidateIterator.hasNext()) {
+                    ExecutionSlotSharingGroup candidateGroup = 
candidateIterator.next();
+                    // There are two cases for this candidate group:
+                    // 1. The group is available for this vertex, and it will 
be assigned to this
+                    // vertex
+                    // 2. The group is not available for this vertex, because 
it's assigned to
+                    // another vertex
+                    // No matter what case it is, the candidate group is no 
longer a candidate and
+                    // should be removed
+                    candidateIterator.remove();
+                    if (availableGroupsForCurrentVertex != null

Review comment:
       Thank you for providing a better way. Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +260,116 @@ private ExecutionSlotSharingGroup 
tryFindAvailableProducerExecutionSlotSharingGr
 
             final ExecutionVertexID executionVertexId = 
executionVertex.getId();
 
-            for (SchedulingResultPartition partition : 
executionVertex.getConsumedResults()) {
-                final ExecutionVertexID producerVertexId = 
partition.getProducer().getId();
-                if (!inSameLogicalSlotSharingGroup(producerVertexId, 
executionVertexId)) {
-                    continue;
-                }
-
-                final ExecutionSlotSharingGroup producerGroup =
-                        executionSlotSharingGroupMap.get(producerVertexId);
-
-                checkState(producerGroup != null);
-                if (isGroupAvailableForVertex(producerGroup, 
executionVertexId)) {
-                    return producerGroup;
+            Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex =
+                    
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+
+                Set<ExecutionSlotSharingGroup> candidateGroups =
+                        
candidateGroupsForConsumedPartitionGroup.computeIfAbsent(
+                                consumedPartitionGroup,
+                                group ->
+                                        
computeAllCandidateGroupsForConsumedPartitionGroup(
+                                                
executionVertexId.getJobVertexId(), group));
+
+                Iterator<ExecutionSlotSharingGroup> candidateIterator = 
candidateGroups.iterator();
+
+                while (candidateIterator.hasNext()) {
+                    ExecutionSlotSharingGroup candidateGroup = 
candidateIterator.next();
+                    // There are two cases for this candidate group:
+                    // 1. The group is available for this vertex, and it will 
be assigned to this
+                    // vertex
+                    // 2. The group is not available for this vertex, because 
it's assigned to
+                    // another vertex

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -94,12 +97,50 @@ public LocalInputPreferredSlotSharingStrategy create(
         private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
                 executionSlotSharingGroupMap;
 
-        final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
                 constraintToExecutionSlotSharingGroupMap;
 
-        final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>> 
executionSlotSharingGroups;
+        private final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>>
+                executionSlotSharingGroups;
 
-        private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>> 
assignedJobVerticesForGroups;
+        /**
+         * A JobVertex only belongs to a {@link SlotSharingGroup}. A 
SlotSharingGroup is

Review comment:
       Resolved.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +260,116 @@ private ExecutionSlotSharingGroup 
tryFindAvailableProducerExecutionSlotSharingGr
 
             final ExecutionVertexID executionVertexId = 
executionVertex.getId();
 
-            for (SchedulingResultPartition partition : 
executionVertex.getConsumedResults()) {
-                final ExecutionVertexID producerVertexId = 
partition.getProducer().getId();
-                if (!inSameLogicalSlotSharingGroup(producerVertexId, 
executionVertexId)) {
-                    continue;
-                }
-
-                final ExecutionSlotSharingGroup producerGroup =
-                        executionSlotSharingGroupMap.get(producerVertexId);
-
-                checkState(producerGroup != null);
-                if (isGroupAvailableForVertex(producerGroup, 
executionVertexId)) {
-                    return producerGroup;
+            Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex =
+                    
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+
+                Set<ExecutionSlotSharingGroup> candidateGroups =
+                        
candidateGroupsForConsumedPartitionGroup.computeIfAbsent(
+                                consumedPartitionGroup,
+                                group ->
+                                        
computeAllCandidateGroupsForConsumedPartitionGroup(
+                                                
executionVertexId.getJobVertexId(), group));
+
+                Iterator<ExecutionSlotSharingGroup> candidateIterator = 
candidateGroups.iterator();
+
+                while (candidateIterator.hasNext()) {
+                    ExecutionSlotSharingGroup candidateGroup = 
candidateIterator.next();
+                    // There are two cases for this candidate group:
+                    // 1. The group is available for this vertex, and it will 
be assigned to this
+                    // vertex
+                    // 2. The group is not available for this vertex, because 
it's assigned to
+                    // another vertex
+                    // No matter what case it is, the candidate group is no 
longer a candidate and
+                    // should be removed
+                    candidateIterator.remove();
+                    if (availableGroupsForCurrentVertex != null
+                            && 
availableGroupsForCurrentVertex.contains(candidateGroup)) {
+                        return candidateGroup;
+                    }
                 }
             }
 
             return null;
         }
 
         private boolean inSameLogicalSlotSharingGroup(
-                final ExecutionVertexID executionVertexId1,
-                final ExecutionVertexID executionVertexId2) {
+                final JobVertexID jobVertexId1, final JobVertexID 
jobVertexId2) {
 
             return Objects.equals(
-                    
getSlotSharingGroup(executionVertexId1).getSlotSharingGroupId(),
-                    
getSlotSharingGroup(executionVertexId2).getSlotSharingGroupId());
+                    getSlotSharingGroup(jobVertexId1).getSlotSharingGroupId(),
+                    getSlotSharingGroup(jobVertexId2).getSlotSharingGroupId());
         }
 
-        private SlotSharingGroup getSlotSharingGroup(final ExecutionVertexID 
executionVertexId) {
+        private SlotSharingGroup getSlotSharingGroup(final JobVertexID 
jobVertexId) {
             // slot sharing group of a vertex would never be null in production
-            return 
checkNotNull(slotSharingGroupMap.get(executionVertexId.getJobVertexId()));
-        }
-
-        private boolean isGroupAvailableForVertex(
-                final ExecutionSlotSharingGroup executionSlotSharingGroup,
-                final ExecutionVertexID executionVertexId) {
-
-            final Set<JobVertexID> assignedVertices =
-                    
assignedJobVerticesForGroups.get(executionSlotSharingGroup);
-            return assignedVertices == null
-                    || 
!assignedVertices.contains(executionVertexId.getJobVertexId());
+            return checkNotNull(slotSharingGroupMap.get(jobVertexId));
         }
 
         private void addVertexToExecutionSlotSharingGroup(
                 final SchedulingExecutionVertex vertex, final 
ExecutionSlotSharingGroup group) {
 
-            group.addVertex(vertex.getId());
-            executionSlotSharingGroupMap.put(vertex.getId(), group);
-            assignedJobVerticesForGroups
-                    .computeIfAbsent(group, k -> new HashSet<>())
-                    .add(vertex.getId().getJobVertexId());
+            ExecutionVertexID executionVertexId = vertex.getId();
+            group.addVertex(executionVertexId);
+            executionSlotSharingGroupMap.put(executionVertexId, group);
+
+            // The ExecutionSlotSharingGroup is no longer available for the 
JobVertex
+            Set<ExecutionSlotSharingGroup> availableExecutionSlotSharingGroups 
=
+                    
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+            if (availableExecutionSlotSharingGroups != null) {
+                availableExecutionSlotSharingGroups.remove(group);
+            }
         }
 
         private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
                 final List<SchedulingExecutionVertex> executionVertices) {
 
             for (SchedulingExecutionVertex executionVertex : 
executionVertices) {
-                final SlotSharingGroup slotSharingGroup =
-                        getSlotSharingGroup(executionVertex.getId());
-                final List<ExecutionSlotSharingGroup> groups =
-                        executionSlotSharingGroups.computeIfAbsent(
-                                slotSharingGroup.getSlotSharingGroupId(), k -> 
new ArrayList<>());
 
                 ExecutionSlotSharingGroup group = null;
-                for (ExecutionSlotSharingGroup executionSlotSharingGroup : 
groups) {
-                    if (isGroupAvailableForVertex(
-                            executionSlotSharingGroup, 
executionVertex.getId())) {
-                        group = executionSlotSharingGroup;
-                        break;
-                    }
+
+                Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex 
=

Review comment:
       Thank you for proposing a better way. Resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to