xintongsong commented on a change in pull request #14860:
URL: https://github.com/apache/flink/pull/14860#discussion_r569980985



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -158,6 +159,9 @@
 
     private boolean shouldExecuteInBatchMode;
 
+    // Records the slot sharing groups and their corresponding ResourceProfile
+    private Map<String, ResourceProfile> slotSharingGroupResources = new 
HashMap<>();

Review comment:
       `final`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##########
@@ -47,6 +48,9 @@
      */
     private ResourceSpec resourceSpec = ResourceSpec.ZERO;

Review comment:
       It could be confusing what's the differences between the original 
`resourceSpec` and the new introduced `resourceProfile`. I would suggest to 
mark this field and its getter `@Deprecated`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
         return this;
     }
 
+    /**
+     * Specify fine-grained resource requirements for slot sharing groups.
+     *
+     * <p>Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+     * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+     * grouped operator together. In cases grouped operators are deployed into 
separate slots, the
+     * slot resources will be derived from the specified group requirement.

Review comment:
       `group requirement` -> `group requirements`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) {
         graph.setTimeCharacteristic(timeCharacteristic);
         graph.setJobName(jobName);
         graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : 
JobType.STREAMING);
+        
graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources));

Review comment:
       Not sure if this is necessary to make it unmodifiable. I would not 
expect the generator to maintain certain states and protect them from being 
accidentally modified.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
         return this;
     }
 
+    /**
+     * Specify fine-grained resource requirements for slot sharing groups.
+     *
+     * <p>Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+     * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+     * grouped operator together. In cases grouped operators are deployed into 
separate slots, the

Review comment:
       `grouped operator` -> `grouped operators`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##########
@@ -47,6 +48,9 @@
      */
     private ResourceSpec resourceSpec = ResourceSpec.ZERO;
 
+    // Represents resources of all tasks in the group. Default to be null.

Review comment:
       `Default to be UNKNOWN`.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -1195,6 +1197,55 @@ public void 
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
         assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex);
     }
 
+    @Test
+    public void testSlotSharingResourceConfiguration() {
+        final String slotSharingGroup1 = "slot-a";
+        final String slotSharingGroup2 = "slot-b";
+        final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
+        final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
+        final ResourceProfile resourceProfile3 = 
ResourceProfile.fromResources(3, 30);
+        final Map<String, ResourceProfile> slotSharingGroupResource = new 
HashMap<>();
+        slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+        slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+        slotSharingGroupResource.put(
+                StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 
resourceProfile3);
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromElements(1, 2, 3)
+                .name(slotSharingGroup1)
+                .slotSharingGroup(slotSharingGroup1)
+                .map(x -> x + 1)
+                .name(slotSharingGroup2)
+                .slotSharingGroup(slotSharingGroup2)
+                .map(x -> x * x)
+                .name(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                
.slotSharingGroup(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP);

Review comment:
       I think we need another test case for an operator that does not specify 
SSG (thus the default group) while the resource profile of the default group is 
specified.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to