xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r595681050



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
             return this;
         }
 
-        public Builder addExtendedResource(String name, Resource 
extendedResource) {
-            this.extendedResources.put(name, extendedResource);
+        // Add the given extended resource, the old value with the same 
resource name will be
+        // override if present.
+        public Builder setExtendedResource(Resource extendedResource) {
+            this.extendedResources.put(extendedResource.getName(), 
extendedResource);
             return this;
         }
 
-        public Builder addExtendedResources(Map<String, Resource> 
extendedResources) {
-            if (extendedResources != null) {
-                this.extendedResources.putAll(extendedResources);
-            }
+        // Add the given extended resources, this will override all the 
previous records.

Review comment:
       ```
   Add the given extended resources. This will discard all the previous added 
extended resources.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -109,18 +120,27 @@ public TaskExecutorProcessSpec(
                         networkMemSize,
                         managedMemorySize),
                 new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize),
-                1);
+                1,
+                extendedResources);
     }
 
     protected TaskExecutorProcessSpec(
             CPUResource cpuCores,
             TaskExecutorFlinkMemory flinkMemory,
             JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead,
-            int numSlots) {
+            int numSlots,
+            Collection<ExternalResource> extendedResources) {
 
         super(flinkMemory, jvmMetaspaceAndOverhead);
         this.cpuCores = cpuCores;
         this.numSlots = numSlots;
+        this.extendedResources =
+                Preconditions.checkNotNull(extendedResources).stream()
+                        .filter(resource -> !resource.isZero())
+                        .collect(Collectors.toMap(ExternalResource::getName, 
Function.identity()));
+        Preconditions.checkState(

Review comment:
       Same for `WorkerResourceSpec` and `TaskExecutorResourceSpec`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
##########
@@ -51,50 +52,51 @@ protected Resource(String name, BigDecimal value) {
         this.value = value;
     }
 
-    public Resource merge(Resource other) {
+    public T merge(T other) {
         checkNotNull(other, "Cannot merge with null resources");
         checkArgument(getClass() == other.getClass(), "Merge with different 
resource type");
-        checkArgument(name.equals(other.name), "Merge with different resource 
name");
+        checkArgument(name.equals(other.getName()), "Merge with different 
resource name");
 
-        return create(value.add(other.value));
+        return create(value.add(other.getValue()));
     }
 
-    public Resource subtract(Resource other) {
+    public T subtract(T other) {
         checkNotNull(other, "Cannot subtract null resources");
         checkArgument(getClass() == other.getClass(), "Minus with different 
resource type");
-        checkArgument(name.equals(other.name), "Minus with different resource 
name");
+        checkArgument(name.equals(other.getName()), "Minus with different 
resource name");
         checkArgument(
-                value.compareTo(other.value) >= 0,
+                value.compareTo(other.getValue()) >= 0,
                 "Try to subtract a larger resource from this one.");
 
-        return create(value.subtract(other.value));
+        return create(value.subtract(other.getValue()));
     }
 
-    public Resource multiply(BigDecimal multiplier) {
+    public T multiply(BigDecimal multiplier) {
         return create(value.multiply(multiplier));
     }
 
-    public Resource multiply(int multiplier) {
+    public T multiply(int multiplier) {
         return multiply(BigDecimal.valueOf(multiplier));
     }
 
-    public Resource divide(BigDecimal by) {
+    public T divide(BigDecimal by) {
         return create(value.divide(by, 16, RoundingMode.DOWN));
     }
 
-    public Resource divide(int by) {
+    public T divide(int by) {
         return divide(BigDecimal.valueOf(by));
     }
 
     @Override
+    @SuppressWarnings("unchecked")

Review comment:
       It would be nice to keep the scope of `@SuppressWarnings` as small as 
possible. In this case, the single statement `T other = (T) o;` should be 
enough.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -109,18 +120,27 @@ public TaskExecutorProcessSpec(
                         networkMemSize,
                         managedMemorySize),
                 new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize),
-                1);
+                1,
+                extendedResources);
     }
 
     protected TaskExecutorProcessSpec(
             CPUResource cpuCores,
             TaskExecutorFlinkMemory flinkMemory,
             JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead,
-            int numSlots) {
+            int numSlots,
+            Collection<ExternalResource> extendedResources) {
 
         super(flinkMemory, jvmMetaspaceAndOverhead);
         this.cpuCores = cpuCores;
         this.numSlots = numSlots;
+        this.extendedResources =
+                Preconditions.checkNotNull(extendedResources).stream()
+                        .filter(resource -> !resource.isZero())
+                        .collect(Collectors.toMap(ExternalResource::getName, 
Function.identity()));
+        Preconditions.checkState(

Review comment:
       `checkArgument` is preferred.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
             return this;
         }
 
-        public Builder addExtendedResource(String name, Resource 
extendedResource) {
-            this.extendedResources.put(name, extendedResource);
+        // Add the given extended resource, the old value with the same 
resource name will be
+        // override if present.
+        public Builder setExtendedResource(Resource extendedResource) {
+            this.extendedResources.put(extendedResource.getName(), 
extendedResource);
             return this;
         }
 
-        public Builder addExtendedResources(Map<String, Resource> 
extendedResources) {
-            if (extendedResources != null) {
-                this.extendedResources.putAll(extendedResources);
-            }
+        // Add the given extended resources, this will override all the 
previous records.
+        public Builder setExtendedResources(Collection<Resource> 
extendedResources) {
+            this.extendedResources =
+                    extendedResources.stream()
+                            .collect(Collectors.toMap(Resource::getName, 
Function.identity()));

Review comment:
       JavaDoc style is preferred. It hints the compiler/IDE that there's a 
binding between the docs and the following class/field/method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -485,15 +486,11 @@ public String toString() {
             return "ResourceProfile{ANY}";
         }
 
-        final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
-        for (Map.Entry<String, ExternalResource> resource : 
extendedResources.entrySet()) {
-            extendedResourceStr
-                    .append(", ")
-                    .append(resource.getKey())
-                    .append('=')
-                    .append(resource.getValue().getValue());
-        }
-        return "ResourceProfile{" + getResourceString() + extendedResourceStr 
+ '}';
+        return "ResourceProfile{"
+                + getResourceString()
+                + ", "
+                + 
ExternalResourceUtils.generateExternalResourcesString(extendedResources)

Review comment:
       What if there's no external resource? We will have something like 
`ResourceProfile{a=b, c=d, }`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -485,15 +486,11 @@ public String toString() {
             return "ResourceProfile{ANY}";
         }
 
-        final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
-        for (Map.Entry<String, ExternalResource> resource : 
extendedResources.entrySet()) {
-            extendedResourceStr
-                    .append(", ")
-                    .append(resource.getKey())
-                    .append('=')
-                    .append(resource.getValue().getValue());
-        }
-        return "ResourceProfile{" + getResourceString() + extendedResourceStr 
+ '}';
+        return "ResourceProfile{"
+                + getResourceString()
+                + ", "
+                + 
ExternalResourceUtils.generateExternalResourcesString(extendedResources)

Review comment:
       Same for `TaskExecutorProcessSpec` and `WorkerResourceSpec`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -46,12 +49,19 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
 
+    // Indicate an empty external resources list
+    public static final String EMPTY = "empty-list";

Review comment:
       I'd suggest to:
   - Use the phrase `none`
   - Add the constant in `ExternalResourceOptions`
   - Mention in the description of `EXTERNAL_RESOURCE_LIST` that the keyword is 
preserved

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -157,6 +167,30 @@ private ExternalResourceUtils() {
         return externalResourceAmountMap;
     }
 
+    /** Get the collection of all enabled external resources. */
+    public static Collection<ExternalResource> getExternalResourcesCollection(
+            Configuration config) {
+        return getExternalResourceAmountMap(config).entrySet().stream()
+                .map(entry -> new ExternalResource(entry.getKey(), 
entry.getValue()))
+                .collect(Collectors.toList());
+    }
+
+    /** Generate the string expression of the given external resources. */
+    public static String generateExternalResourcesString(
+            Map<String, ExternalResource> extendedResources) {
+        final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
+        for (Map.Entry<String, ExternalResource> resource : 
extendedResources.entrySet()) {
+            if (extendedResourceStr.length() > 0) {
+                extendedResourceStr.append(", ");
+            }
+            extendedResourceStr
+                    .append(resource.getKey())
+                    .append('=')
+                    .append(resource.getValue().getValue());
+        }

Review comment:
       This can be simplified as follows:
   ```
   return extendedResources.entrySet().stream()
           .map(e -> e.getKey() + "=" + e.getValue().getValue())
           .collect(Collectors.joining(", "));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
             return this;
         }
 
-        public Builder addExtendedResource(String name, Resource 
extendedResource) {
-            this.extendedResources.put(name, extendedResource);
+        // Add the given extended resource, the old value with the same 
resource name will be
+        // override if present.

Review comment:
       ```
   Add the given extended resource. The old value with the same resource name 
will be replaced if present.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -157,6 +167,30 @@ private ExternalResourceUtils() {
         return externalResourceAmountMap;
     }
 
+    /** Get the collection of all enabled external resources. */
+    public static Collection<ExternalResource> getExternalResourcesCollection(
+            Configuration config) {
+        return getExternalResourceAmountMap(config).entrySet().stream()
+                .map(entry -> new ExternalResource(entry.getKey(), 
entry.getValue()))
+                .collect(Collectors.toList());
+    }
+
+    /** Generate the string expression of the given external resources. */
+    public static String generateExternalResourcesString(
+            Map<String, ExternalResource> extendedResources) {

Review comment:
       `Map<String, ExternalResource>` -> `Collection<ExternalResource>`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
             return this;
         }
 
-        public Builder addExtendedResource(String name, Resource 
extendedResource) {
-            this.extendedResources.put(name, extendedResource);
+        // Add the given extended resource, the old value with the same 
resource name will be
+        // override if present.
+        public Builder setExtendedResource(Resource extendedResource) {
+            this.extendedResources.put(extendedResource.getName(), 
extendedResource);
             return this;
         }
 
-        public Builder addExtendedResources(Map<String, Resource> 
extendedResources) {
-            if (extendedResources != null) {
-                this.extendedResources.putAll(extendedResources);
-            }
+        // Add the given extended resources, this will override all the 
previous records.
+        public Builder setExtendedResources(Collection<Resource> 
extendedResources) {
+            this.extendedResources =
+                    extendedResources.stream()
+                            .collect(Collectors.toMap(Resource::getName, 
Function.identity()));

Review comment:
       Same for `ResourceSpec.Builder` and `WorkerResourceSpec.Builder`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to