KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
SlotManagerUtils.generateDefaultSlotResourceProfile(
totalResourceProfile, numSlotsPerWorker);
this.availableResourceMatchingStrategy =
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
Review Comment:
What about tasks?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
private final Set<ExecutionVertexID> executionVertexIds;
- private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+ private @Nonnull SlotSharingGroup slotSharingGroup;
+ /** @deprecated Only for test classes. */
+ @Deprecated
Review Comment:
`@VisibleForTesting`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##########
@@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration(
configuration.getBoolean(
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
- boolean evenlySpreadOutSlots =
-
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+ TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
final SlotMatchingStrategy slotMatchingStrategy =
- evenlySpreadOutSlots
+ taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS
Review Comment:
ditto.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##########
@@ -42,69 +43,23 @@
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* This strategy tries to reduce remote data exchanges. Execution vertices,
which are connected and
* belong to the same SlotSharingGroup, tend to be put in the same
ExecutionSlotSharingGroup.
* Co-location constraints will be respected.
*/
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends
AbstractSlotSharingStrategy
implements SlotSharingStrategy, SchedulingTopologyListener {
Review Comment:
No need to implement them again.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##########
@@ -42,69 +43,23 @@
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* This strategy tries to reduce remote data exchanges. Execution vertices,
which are connected and
* belong to the same SlotSharingGroup, tend to be put in the same
ExecutionSlotSharingGroup.
* Co-location constraints will be respected.
*/
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends
AbstractSlotSharingStrategy
implements SlotSharingStrategy, SchedulingTopologyListener {
- private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
executionSlotSharingGroupMap;
-
- private final Set<SlotSharingGroup> logicalSlotSharingGroups;
-
- private final Set<CoLocationGroup> coLocationGroups;
+ public static final Logger LOG =
+
LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class);
Review Comment:
Where do we use it?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##########
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements
java.io.Serializable {
//
--------------------------------------------------------------------------------------------
+ public SlotSharingGroup() {}
+
+ public SlotSharingGroup(ResourceProfile resourceProfile) {
Review Comment:
Seems we don't need to do that. Just instantiate a SlotSharingGroup and set
the resource profile.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##########
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements
java.io.Serializable {
//
--------------------------------------------------------------------------------------------
+ public SlotSharingGroup() {}
+
+ public SlotSharingGroup(ResourceProfile resourceProfile) {
Review Comment:
Only visible for testing?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
private final Set<ExecutionVertexID> executionVertexIds;
- private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+ private @Nonnull SlotSharingGroup slotSharingGroup;
+ /** @deprecated Only for test classes. */
+ @Deprecated
ExecutionSlotSharingGroup() {
this.executionVertexIds = new HashSet<>();
+ this.slotSharingGroup = new SlotSharingGroup();
+ }
+
+ ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) {
+ this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
+ this.executionVertexIds = new HashSet<>();
}
void addVertex(final ExecutionVertexID executionVertexId) {
executionVertexIds.add(executionVertexId);
}
- void setResourceProfile(ResourceProfile resourceProfile) {
- this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+ void setSlotSharingGroup(SlotSharingGroup slotSharingGroup) {
+ this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
+ }
+
+ @Nonnull
+ SlotSharingGroup getSlotSharingGroup() {
+ return slotSharingGroup;
Review Comment:
They are all visible only for testing.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
private final Set<ExecutionVertexID> executionVertexIds;
- private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+ private @Nonnull SlotSharingGroup slotSharingGroup;
+ /** @deprecated Only for test classes. */
+ @Deprecated
Review Comment:
BTW, I think there is no need to keep it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]