xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r594812450



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
##########
@@ -30,7 +30,8 @@
 
 /** Base class for resources one can specify. */
 @Internal
-public abstract class Resource implements Serializable, Comparable<Resource> {
+public abstract class Resource<T extends Resource<T>>

Review comment:
       nit: Convert `other` to `T` rather than `Resource` in `equals()`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -97,7 +105,8 @@ public TaskExecutorProcessSpec(
             MemorySize networkMemSize,
             MemorySize managedMemorySize,
             MemorySize jvmMetaspaceSize,
-            MemorySize jvmOverheadSize) {
+            MemorySize jvmOverheadSize,
+            Map<String, ExternalResource> extendedResources) {

Review comment:
       I'd suggest replace `Map<String, ExternalResource>` with 
`Collection<ExternalResource>`.
   The problem of the current implementation is that, we rely on the caller to 
guarantee the key of the entry equals the name of the value external resource.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -197,14 +226,30 @@ public Builder setNumSlots(int numSlots) {
             return this;
         }
 
+        public Builder addExtendedResource(ExternalResource extendedResource) {
+            this.extendedResources.put(extendedResource.getName(), 
extendedResource);
+            return this;
+        }
+
+        public Builder addExtendedResources(Collection<ExternalResource> 
extendedResources) {
+            if (extendedResources != null) {
+                extendedResources.forEach(
+                        extendedResource ->
+                                this.extendedResources.put(
+                                        extendedResource.getName(), 
extendedResource));
+            }
+            return this;
+        }

Review comment:
       Same here for existing external resource names.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -368,14 +373,24 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
             return this;
         }
 
-        public Builder setGPUResource(double gpus) {
-            this.gpuResource = new GPUResource(gpus);
+        public Builder addExtendedResource(ExternalResource extendedResource) {
+            this.extendedResources.put(extendedResource.getName(), 
extendedResource);
+            return this;
+        }
+
+        public Builder addExtendedResources(Collection<ExternalResource> 
extendedResources) {
+            if (extendedResources != null) {
+                extendedResources.forEach(
+                        extendedResource ->
+                                this.extendedResources.put(
+                                        extendedResource.getName(), 
extendedResource));
+            }

Review comment:
       Same for the `ResourceProfile.Builder`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -157,6 +159,29 @@ private ExternalResourceUtils() {
         return externalResourceAmountMap;
     }
 
+    /** Get the map of resource name and Resources of all of enabled external 
resources. */
+    public static Map<String, ExternalResource> 
getExternalResourcesMap(Configuration config) {
+        return getExternalResourceAmountMap(config).entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> new ExternalResource(entry.getKey(), 
entry.getValue())));
+    }
+
+    /** Generate the string expression of the given external resources. */
+    public static String generateExternalResourcesString(
+            Map<String, ExternalResource> extendedResources) {
+        final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
+        for (Map.Entry<String, ExternalResource> resource : 
extendedResources.entrySet()) {
+            extendedResourceStr
+                    .append(", ")
+                    .append(resource.getKey())
+                    .append('=')
+                    .append(resource.getValue().getValue());
+        }
+        return extendedResourceStr.toString();
+    }

Review comment:
       It's against intuition that the external resources string starts with a 
`, `.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -97,7 +105,8 @@ public TaskExecutorProcessSpec(
             MemorySize networkMemSize,
             MemorySize managedMemorySize,
             MemorySize jvmMetaspaceSize,
-            MemorySize jvmOverheadSize) {
+            MemorySize jvmOverheadSize,
+            Map<String, ExternalResource> extendedResources) {

Review comment:
       Same for `WorkerResourceSpec` and `TaskExecutorResourceSpec`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -368,14 +373,24 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
             return this;
         }
 
-        public Builder setGPUResource(double gpus) {
-            this.gpuResource = new GPUResource(gpus);
+        public Builder addExtendedResource(ExternalResource extendedResource) {
+            this.extendedResources.put(extendedResource.getName(), 
extendedResource);
+            return this;
+        }
+
+        public Builder addExtendedResources(Collection<ExternalResource> 
extendedResources) {
+            if (extendedResources != null) {
+                extendedResources.forEach(
+                        extendedResource ->
+                                this.extendedResources.put(
+                                        extendedResource.getName(), 
extendedResource));
+            }

Review comment:
       It's unclear to me what is the expected behavior by contract if an 
external resource already exist with the same name as the to be added resource.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to