Thesharing commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r688926551
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +260,116 @@ private ExecutionSlotSharingGroup
tryFindAvailableProducerExecutionSlotSharingGr
final ExecutionVertexID executionVertexId =
executionVertex.getId();
- for (SchedulingResultPartition partition :
executionVertex.getConsumedResults()) {
- final ExecutionVertexID producerVertexId =
partition.getProducer().getId();
- if (!inSameLogicalSlotSharingGroup(producerVertexId,
executionVertexId)) {
- continue;
- }
-
- final ExecutionSlotSharingGroup producerGroup =
- executionSlotSharingGroupMap.get(producerVertexId);
-
- checkState(producerGroup != null);
- if (isGroupAvailableForVertex(producerGroup,
executionVertexId)) {
- return producerGroup;
+ Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex =
+
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ Set<ExecutionSlotSharingGroup> candidateGroups =
+
candidateGroupsForConsumedPartitionGroup.computeIfAbsent(
+ consumedPartitionGroup,
+ group ->
+
computeAllCandidateGroupsForConsumedPartitionGroup(
+
executionVertexId.getJobVertexId(), group));
+
+ Iterator<ExecutionSlotSharingGroup> candidateIterator =
candidateGroups.iterator();
+
+ while (candidateIterator.hasNext()) {
+ ExecutionSlotSharingGroup candidateGroup =
candidateIterator.next();
+ // There are two cases for this candidate group:
+ // 1. The group is available for this vertex, and it will
be assigned to this
+ // vertex
+ // 2. The group is not available for this vertex, because
it's assigned to
+ // another vertex
+ // No matter what case it is, the candidate group is no
longer a candidate and
+ // should be removed
+ candidateIterator.remove();
+ if (availableGroupsForCurrentVertex != null
Review comment:
Thank you for providing a better way. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +260,116 @@ private ExecutionSlotSharingGroup
tryFindAvailableProducerExecutionSlotSharingGr
final ExecutionVertexID executionVertexId =
executionVertex.getId();
- for (SchedulingResultPartition partition :
executionVertex.getConsumedResults()) {
- final ExecutionVertexID producerVertexId =
partition.getProducer().getId();
- if (!inSameLogicalSlotSharingGroup(producerVertexId,
executionVertexId)) {
- continue;
- }
-
- final ExecutionSlotSharingGroup producerGroup =
- executionSlotSharingGroupMap.get(producerVertexId);
-
- checkState(producerGroup != null);
- if (isGroupAvailableForVertex(producerGroup,
executionVertexId)) {
- return producerGroup;
+ Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex =
+
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ Set<ExecutionSlotSharingGroup> candidateGroups =
+
candidateGroupsForConsumedPartitionGroup.computeIfAbsent(
+ consumedPartitionGroup,
+ group ->
+
computeAllCandidateGroupsForConsumedPartitionGroup(
+
executionVertexId.getJobVertexId(), group));
+
+ Iterator<ExecutionSlotSharingGroup> candidateIterator =
candidateGroups.iterator();
+
+ while (candidateIterator.hasNext()) {
+ ExecutionSlotSharingGroup candidateGroup =
candidateIterator.next();
+ // There are two cases for this candidate group:
+ // 1. The group is available for this vertex, and it will
be assigned to this
+ // vertex
+ // 2. The group is not available for this vertex, because
it's assigned to
+ // another vertex
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -94,12 +97,50 @@ public LocalInputPreferredSlotSharingStrategy create(
private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
executionSlotSharingGroupMap;
- final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
+ private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
constraintToExecutionSlotSharingGroupMap;
- final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>>
executionSlotSharingGroups;
+ private final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>>
+ executionSlotSharingGroups;
- private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>>
assignedJobVerticesForGroups;
+ /**
+ * A JobVertex only belongs to a {@link SlotSharingGroup}. A
SlotSharingGroup is
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +260,116 @@ private ExecutionSlotSharingGroup
tryFindAvailableProducerExecutionSlotSharingGr
final ExecutionVertexID executionVertexId =
executionVertex.getId();
- for (SchedulingResultPartition partition :
executionVertex.getConsumedResults()) {
- final ExecutionVertexID producerVertexId =
partition.getProducer().getId();
- if (!inSameLogicalSlotSharingGroup(producerVertexId,
executionVertexId)) {
- continue;
- }
-
- final ExecutionSlotSharingGroup producerGroup =
- executionSlotSharingGroupMap.get(producerVertexId);
-
- checkState(producerGroup != null);
- if (isGroupAvailableForVertex(producerGroup,
executionVertexId)) {
- return producerGroup;
+ Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex =
+
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ Set<ExecutionSlotSharingGroup> candidateGroups =
+
candidateGroupsForConsumedPartitionGroup.computeIfAbsent(
+ consumedPartitionGroup,
+ group ->
+
computeAllCandidateGroupsForConsumedPartitionGroup(
+
executionVertexId.getJobVertexId(), group));
+
+ Iterator<ExecutionSlotSharingGroup> candidateIterator =
candidateGroups.iterator();
+
+ while (candidateIterator.hasNext()) {
+ ExecutionSlotSharingGroup candidateGroup =
candidateIterator.next();
+ // There are two cases for this candidate group:
+ // 1. The group is available for this vertex, and it will
be assigned to this
+ // vertex
+ // 2. The group is not available for this vertex, because
it's assigned to
+ // another vertex
+ // No matter what case it is, the candidate group is no
longer a candidate and
+ // should be removed
+ candidateIterator.remove();
+ if (availableGroupsForCurrentVertex != null
+ &&
availableGroupsForCurrentVertex.contains(candidateGroup)) {
+ return candidateGroup;
+ }
}
}
return null;
}
private boolean inSameLogicalSlotSharingGroup(
- final ExecutionVertexID executionVertexId1,
- final ExecutionVertexID executionVertexId2) {
+ final JobVertexID jobVertexId1, final JobVertexID
jobVertexId2) {
return Objects.equals(
-
getSlotSharingGroup(executionVertexId1).getSlotSharingGroupId(),
-
getSlotSharingGroup(executionVertexId2).getSlotSharingGroupId());
+ getSlotSharingGroup(jobVertexId1).getSlotSharingGroupId(),
+ getSlotSharingGroup(jobVertexId2).getSlotSharingGroupId());
}
- private SlotSharingGroup getSlotSharingGroup(final ExecutionVertexID
executionVertexId) {
+ private SlotSharingGroup getSlotSharingGroup(final JobVertexID
jobVertexId) {
// slot sharing group of a vertex would never be null in production
- return
checkNotNull(slotSharingGroupMap.get(executionVertexId.getJobVertexId()));
- }
-
- private boolean isGroupAvailableForVertex(
- final ExecutionSlotSharingGroup executionSlotSharingGroup,
- final ExecutionVertexID executionVertexId) {
-
- final Set<JobVertexID> assignedVertices =
-
assignedJobVerticesForGroups.get(executionSlotSharingGroup);
- return assignedVertices == null
- ||
!assignedVertices.contains(executionVertexId.getJobVertexId());
+ return checkNotNull(slotSharingGroupMap.get(jobVertexId));
}
private void addVertexToExecutionSlotSharingGroup(
final SchedulingExecutionVertex vertex, final
ExecutionSlotSharingGroup group) {
- group.addVertex(vertex.getId());
- executionSlotSharingGroupMap.put(vertex.getId(), group);
- assignedJobVerticesForGroups
- .computeIfAbsent(group, k -> new HashSet<>())
- .add(vertex.getId().getJobVertexId());
+ ExecutionVertexID executionVertexId = vertex.getId();
+ group.addVertex(executionVertexId);
+ executionSlotSharingGroupMap.put(executionVertexId, group);
+
+ // The ExecutionSlotSharingGroup is no longer available for the
JobVertex
+ Set<ExecutionSlotSharingGroup> availableExecutionSlotSharingGroups
=
+
availableGroupsForJobVertex.get(executionVertexId.getJobVertexId());
+ if (availableExecutionSlotSharingGroups != null) {
+ availableExecutionSlotSharingGroups.remove(group);
+ }
}
private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {
for (SchedulingExecutionVertex executionVertex :
executionVertices) {
- final SlotSharingGroup slotSharingGroup =
- getSlotSharingGroup(executionVertex.getId());
- final List<ExecutionSlotSharingGroup> groups =
- executionSlotSharingGroups.computeIfAbsent(
- slotSharingGroup.getSlotSharingGroupId(), k ->
new ArrayList<>());
ExecutionSlotSharingGroup group = null;
- for (ExecutionSlotSharingGroup executionSlotSharingGroup :
groups) {
- if (isGroupAvailableForVertex(
- executionSlotSharingGroup,
executionVertex.getId())) {
- group = executionSlotSharingGroup;
- break;
- }
+
+ Set<ExecutionSlotSharingGroup> availableGroupsForCurrentVertex
=
Review comment:
Thank you for proposing a better way. Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]