xintongsong commented on a change in pull request #14860:
URL: https://github.com/apache/flink/pull/14860#discussion_r569980985
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -158,6 +159,9 @@
private boolean shouldExecuteInBatchMode;
+ // Records the slot sharing groups and their corresponding ResourceProfile
+ private Map<String, ResourceProfile> slotSharingGroupResources = new
HashMap<>();
Review comment:
`final`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##########
@@ -47,6 +48,9 @@
*/
private ResourceSpec resourceSpec = ResourceSpec.ZERO;
Review comment:
It could be confusing what's the differences between the original
`resourceSpec` and the new introduced `resourceProfile`. I would suggest to
mark this field and its getter `@Deprecated`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
return this;
}
+ /**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * <p>Note that a slot sharing group hints the scheduler that the grouped
operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler
always deploy the
+ * grouped operator together. In cases grouped operators are deployed into
separate slots, the
+ * slot resources will be derived from the specified group requirement.
Review comment:
`group requirement` -> `group requirements`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) {
graph.setTimeCharacteristic(timeCharacteristic);
graph.setJobName(jobName);
graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH :
JobType.STREAMING);
+
graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources));
Review comment:
Not sure if this is necessary to make it unmodifiable. I would not
expect the generator to maintain certain states and protect them from being
accidentally modified.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
return this;
}
+ /**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * <p>Note that a slot sharing group hints the scheduler that the grouped
operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler
always deploy the
+ * grouped operator together. In cases grouped operators are deployed into
separate slots, the
Review comment:
`grouped operator` -> `grouped operators`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##########
@@ -47,6 +48,9 @@
*/
private ResourceSpec resourceSpec = ResourceSpec.ZERO;
+ // Represents resources of all tasks in the group. Default to be null.
Review comment:
`Default to be UNKNOWN`.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -1195,6 +1197,55 @@ public void
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex);
}
+ @Test
+ public void testSlotSharingResourceConfiguration() {
+ final String slotSharingGroup1 = "slot-a";
+ final String slotSharingGroup2 = "slot-b";
+ final ResourceProfile resourceProfile1 =
ResourceProfile.fromResources(1, 10);
+ final ResourceProfile resourceProfile2 =
ResourceProfile.fromResources(2, 20);
+ final ResourceProfile resourceProfile3 =
ResourceProfile.fromResources(3, 30);
+ final Map<String, ResourceProfile> slotSharingGroupResource = new
HashMap<>();
+ slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+ slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+ slotSharingGroupResource.put(
+ StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
resourceProfile3);
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2, 3)
+ .name(slotSharingGroup1)
+ .slotSharingGroup(slotSharingGroup1)
+ .map(x -> x + 1)
+ .name(slotSharingGroup2)
+ .slotSharingGroup(slotSharingGroup2)
+ .map(x -> x * x)
+ .name(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+
.slotSharingGroup(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP);
Review comment:
I think we need another test case for an operator that does not specify
SSG (thus the default group) while the resource profile of the default group is
specified.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]