xintongsong commented on a change in pull request #14860: URL: https://github.com/apache/flink/pull/14860#discussion_r569980985
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -158,6 +159,9 @@ private boolean shouldExecuteInBatchMode; + // Records the slot sharing groups and their corresponding ResourceProfile + private Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>(); Review comment: `final` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java ########## @@ -47,6 +48,9 @@ */ private ResourceSpec resourceSpec = ResourceSpec.ZERO; Review comment: It could be confusing what's the differences between the original `resourceSpec` and the new introduced `resourceProfile`. I would suggest to mark this field and its getter `@Deprecated`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } + /** + * Specify fine-grained resource requirements for slot sharing groups. + * + * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operator together. In cases grouped operators are deployed into separate slots, the + * slot resources will be derived from the specified group requirement. Review comment: `group requirement` -> `group requirements` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) { graph.setTimeCharacteristic(timeCharacteristic); graph.setJobName(jobName); graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : JobType.STREAMING); + graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources)); Review comment: Not sure if this is necessary to make it unmodifiable. I would not expect the generator to maintain certain states and protect them from being accidentally modified. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } + /** + * Specify fine-grained resource requirements for slot sharing groups. + * + * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operator together. In cases grouped operators are deployed into separate slots, the Review comment: `grouped operator` -> `grouped operators` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java ########## @@ -47,6 +48,9 @@ */ private ResourceSpec resourceSpec = ResourceSpec.ZERO; + // Represents resources of all tasks in the group. Default to be null. Review comment: `Default to be UNKNOWN`. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ########## @@ -1195,6 +1197,55 @@ public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled( assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex); } + @Test + public void testSlotSharingResourceConfiguration() { + final String slotSharingGroup1 = "slot-a"; + final String slotSharingGroup2 = "slot-b"; + final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); + final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); + final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30); + final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>(); + slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1); + slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2); + slotSharingGroupResource.put( + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2, 3) + .name(slotSharingGroup1) + .slotSharingGroup(slotSharingGroup1) + .map(x -> x + 1) + .name(slotSharingGroup2) + .slotSharingGroup(slotSharingGroup2) + .map(x -> x * x) + .name(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP) + .slotSharingGroup(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP); Review comment: I think we need another test case for an operator that does not specify SSG (thus the default group) while the resource profile of the default group is specified. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org