Thesharing commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r684919002
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +245,87 @@ private ExecutionSlotSharingGroup
tryFindAvailableProducerExecutionSlotSharingGr
final ExecutionVertexID executionVertexId =
executionVertex.getId();
- for (SchedulingResultPartition partition :
executionVertex.getConsumedResults()) {
- final ExecutionVertexID producerVertexId =
partition.getProducer().getId();
- if (!inSameLogicalSlotSharingGroup(producerVertexId,
executionVertexId)) {
- continue;
- }
-
- final ExecutionSlotSharingGroup producerGroup =
- executionSlotSharingGroupMap.get(producerVertexId);
-
- checkState(producerGroup != null);
- if (isGroupAvailableForVertex(producerGroup,
executionVertexId)) {
- return producerGroup;
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ Iterator<ExecutionSlotSharingGroup> availableGroupIterator =
+ getAvailableGroupsForConsumedPartitionGroup(
+ executionVertexId.getJobVertexId(),
consumedPartitionGroup)
+ .iterator();
+ if (availableGroupIterator.hasNext()) {
+ ExecutionSlotSharingGroup nextAvailableGroup =
availableGroupIterator.next();
+ availableGroupIterator.remove();
+ return nextAvailableGroup;
}
}
return null;
}
private boolean inSameLogicalSlotSharingGroup(
- final ExecutionVertexID executionVertexId1,
- final ExecutionVertexID executionVertexId2) {
+ final JobVertexID jobVertexId1, final JobVertexID
jobVertexId2) {
return Objects.equals(
-
getSlotSharingGroup(executionVertexId1).getSlotSharingGroupId(),
-
getSlotSharingGroup(executionVertexId2).getSlotSharingGroupId());
+
checkNotNull(slotSharingGroupMap.get(jobVertexId1)).getSlotSharingGroupId(),
+
checkNotNull(slotSharingGroupMap.get(jobVertexId2)).getSlotSharingGroupId());
}
private SlotSharingGroup getSlotSharingGroup(final ExecutionVertexID
executionVertexId) {
// slot sharing group of a vertex would never be null in production
return
checkNotNull(slotSharingGroupMap.get(executionVertexId.getJobVertexId()));
}
- private boolean isGroupAvailableForVertex(
- final ExecutionSlotSharingGroup executionSlotSharingGroup,
- final ExecutionVertexID executionVertexId) {
-
- final Set<JobVertexID> assignedVertices =
-
assignedJobVerticesForGroups.get(executionSlotSharingGroup);
- return assignedVertices == null
- ||
!assignedVertices.contains(executionVertexId.getJobVertexId());
- }
-
private void addVertexToExecutionSlotSharingGroup(
final SchedulingExecutionVertex vertex, final
ExecutionSlotSharingGroup group) {
- group.addVertex(vertex.getId());
- executionSlotSharingGroupMap.put(vertex.getId(), group);
- assignedJobVerticesForGroups
- .computeIfAbsent(group, k -> new HashSet<>())
- .add(vertex.getId().getJobVertexId());
+ ExecutionVertexID executionVertexId = vertex.getId();
+ group.addVertex(executionVertexId);
+ executionSlotSharingGroupMap.put(executionVertexId, group);
+
+
getAvailableGroupsForJobVertex(executionVertexId.getJobVertexId()).remove(group);
}
private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {
for (SchedulingExecutionVertex executionVertex :
executionVertices) {
- final SlotSharingGroup slotSharingGroup =
- getSlotSharingGroup(executionVertex.getId());
- final List<ExecutionSlotSharingGroup> groups =
- executionSlotSharingGroups.computeIfAbsent(
- slotSharingGroup.getSlotSharingGroupId(), k ->
new ArrayList<>());
ExecutionSlotSharingGroup group = null;
- for (ExecutionSlotSharingGroup executionSlotSharingGroup :
groups) {
- if (isGroupAvailableForVertex(
- executionSlotSharingGroup,
executionVertex.getId())) {
- group = executionSlotSharingGroup;
- break;
- }
+ for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+
getAvailableGroupsForJobVertex(executionVertex.getId().getJobVertexId())) {
+ group = executionSlotSharingGroup;
+ break;
}
if (group == null) {
- group = new ExecutionSlotSharingGroup();
-
group.setResourceProfile(slotSharingGroup.getResourceProfile());
- groups.add(group);
+ group =
createNewExecutionSlotSharingGroup(executionVertex.getId());
}
addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
+ private ExecutionSlotSharingGroup createNewExecutionSlotSharingGroup(
+ ExecutionVertexID executionVertexId) {
+ final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertexId);
+ final List<ExecutionSlotSharingGroup> correspondingGroups =
+ executionSlotSharingGroups.computeIfAbsent(
+ slotSharingGroup.getSlotSharingGroupId(), k -> new
ArrayList<>());
+
+ final ExecutionSlotSharingGroup newGroup = new
ExecutionSlotSharingGroup();
+ newGroup.setResourceProfile(slotSharingGroup.getResourceProfile());
+
+ correspondingGroups.add(newGroup);
+
+ // Once a new ExecutionSlotSharingGroup is created, it's available
for all JobVertices
+ // in this SlotSharingGroup
Review comment:
Remove one space here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]