KarmaGYZ commented on a change in pull request #14955:
URL: https://github.com/apache/flink/pull/14955#discussion_r578172700



##########
File path: 
flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java
##########
@@ -152,6 +152,12 @@ public void testMutiplyNegativeInteger() {
         resource.multiply(by);
     }
 
+    @Test
+    public void testIsZero() {
+        final Resource resource = new TestResource(0.0);

Review comment:
       nit: we may also have a negative case for it, e.g.
   ```
   final Resource resource = new TestResource(1.0);
   assertFalse(resource.isZero());
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -453,10 +456,15 @@ public ResourceProfile multiply(final int multiplier) {
             return UNKNOWN;
         }
 
-        Map<String, Resource> resultExtendedResource = new 
HashMap<>(extendedResources.size());
-        for (Map.Entry<String, Resource> entry : extendedResources.entrySet()) 
{
-            resultExtendedResource.put(entry.getKey(), 
entry.getValue().multiply(multiplier));
-        }
+        Map<String, Resource> resultExtendedResource =
+                extendedResources.entrySet().stream()
+                        .map(
+                                entry ->
+                                        Tuple2.of(
+                                                entry.getKey(),
+                                                
entry.getValue().multiply(multiplier)))
+                        .filter(tuple -> tuple.f1 != null && 
!tuple.f1.isZero())

Review comment:
       Ditto.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -143,9 +145,11 @@ private ResourceProfile(
         this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory);
         this.managedMemory = checkNotNull(managedMemory);
         this.networkMemory = checkNotNull(networkMemory);
-        if (extendedResources != null) {
-            this.extendedResources.putAll(extendedResources);
-        }
+
+        this.extendedResources =
+                checkNotNull(extendedResources).entrySet().stream()
+                        .filter(entry -> entry.getValue() != null && 
!entry.getValue().isZero())

Review comment:
       Same here, maybe `!checkNotNull(entry.getValue()).isZero()`?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -101,11 +103,10 @@ private ResourceSpec(
         this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory);
         this.managedMemory = checkNotNull(managedMemory);
 
-        for (Resource resource : extendedResources) {
-            if (resource != null) {
-                this.extendedResources.put(resource.getName(), resource);
-            }
-        }
+        this.extendedResources =
+                Arrays.stream(extendedResources)
+                        .filter(resource -> resource != null && 
!resource.isZero())

Review comment:
       IIUC, we should assert the `resource` to be non-null instead of filter 
out it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to