xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r590131518



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -93,7 +92,7 @@ private ResourceSpec(
             final MemorySize taskHeapMemory,
             final MemorySize taskOffHeapMemory,
             final MemorySize managedMemory,
-            final Resource... extendedResources) {
+            final Map<String, Resource> extendedResources) {

Review comment:
       It would be nice to enforce `ExternalResource` rather than `Resource` 
here.
   
   IIUC, what prevent us from doing so is that, return types of 
`Resource#merge/subtract/multiply/divide/create` are hard coded to `Resource`. 
We might consider to solve this with generics. That should also help replacing 
`Resource` with `CPUResource` wherever needed.
   
   ```
   abstract class Resource<T extends Resource<T>> {
       T merge(T other);
   }
   
   class ExternalResource extends Resource<ExternalResource> {}
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
             return this;
         }
 
-        public Builder setGPUResource(double gpus) {
-            this.gpuResource = new GPUResource(gpus);
+        public Builder addExtendedResource(String name, Resource 
extendedResource) {

Review comment:
       I would be fine with providing a shortcut setter for a commonly used 
external resource like GPU. That would also save us from touching all the test 
cases.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -118,13 +141,22 @@ public boolean equals(Object obj) {
                     && Objects.equals(this.taskHeapSize, that.taskHeapSize)
                     && Objects.equals(this.taskOffHeapSize, 
that.taskOffHeapSize)
                     && Objects.equals(this.networkMemSize, that.networkMemSize)
-                    && Objects.equals(this.managedMemSize, 
that.managedMemSize);
+                    && Objects.equals(this.managedMemSize, that.managedMemSize)
+                    && Objects.equals(this.extendedResources, 
that.extendedResources);
         }
         return false;
     }
 
     @Override
     public String toString() {
+        final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
+        for (Map.Entry<String, Resource> resource : 
extendedResources.entrySet()) {
+            extendedResourceStr
+                    .append(", ")
+                    .append(resource.getKey())
+                    .append('=')
+                    .append(resource.getValue().getValue());
+        }

Review comment:
       Consider deduplicating this string generation with a util method. We can 
put it in `ExternalResourceUtils`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtils.java
##########
@@ -29,14 +29,24 @@
      */
     public static ResourceProfile generateDefaultSlotResourceProfile(
             WorkerResourceSpec workerResourceSpec, int numSlotsPerWorker) {
-        return ResourceProfile.newBuilder()
-                
.setCpuCores(workerResourceSpec.getCpuCores().divide(numSlotsPerWorker))
-                
.setTaskHeapMemory(workerResourceSpec.getTaskHeapSize().divide(numSlotsPerWorker))
-                .setTaskOffHeapMemory(
-                        
workerResourceSpec.getTaskOffHeapSize().divide(numSlotsPerWorker))
-                
.setManagedMemory(workerResourceSpec.getManagedMemSize().divide(numSlotsPerWorker))
-                
.setNetworkMemory(workerResourceSpec.getNetworkMemSize().divide(numSlotsPerWorker))
-                .build();
+        final ResourceProfile.Builder resourceProfileBuilder =

Review comment:
       I would suggest to add external resources to `WorkerResourceSpec` and 
`TaskExecutorResourceSpec` in one commit, so that this util is always 
consistent with `TaskExecutorResourceUtils` in the git history.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
             return this;
         }
 
-        public Builder setGPUResource(double gpus) {
-            this.gpuResource = new GPUResource(gpus);
+        public Builder addExtendedResource(String name, Resource 
extendedResource) {
+            this.extendedResources.put(name, extendedResource);
+            return this;
+        }
+
+        public Builder addExtendedResources(Map<String, Resource> 
extendedResources) {

Review comment:
       We should not need the map keys for the argument. Same for 
`ResourceProfile.Builder`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -174,9 +208,26 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
             return this;
         }
 
+        public Builder addExtendedResource(String name, Resource 
extendedResource) {
+            this.extendedResources.put(name, extendedResource);
+            return this;
+        }
+
+        public Builder addExtendedResources(Map<String, Resource> 
extendedResources) {
+            if (extendedResources != null) {
+                this.extendedResources.putAll(extendedResources);
+            }
+            return this;
+        }

Review comment:
       Same here:
   * `Resource` -> `ExternalResource`
   * Should not need the names.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
             return this;
         }
 
-        public Builder setGPUResource(double gpus) {
-            this.gpuResource = new GPUResource(gpus);
+        public Builder addExtendedResource(String name, Resource 
extendedResource) {

Review comment:
       We should not need the argument `name`. Same for 
`ResourceProfile.Builder`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to