zentol commented on a change in pull request #10079: [FLINK-14594] Fix matching 
logics of ResourceSpec/ResourceProfile/Resource considering double values
URL: https://github.com/apache/flink/pull/10079#discussion_r346804468
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
 ##########
 @@ -33,71 +34,34 @@
 
        private static final long serialVersionUID = 1L;
 
-       /**
-        * Enum defining how resources are aggregated.
-        */
-       public enum ResourceAggregateType {
-               /**
-                * Denotes keeping the sum of the values with same name when 
merging two resource specs for operator chaining.
-                */
-               AGGREGATE_TYPE_SUM,
-
-               /**
-                * Denotes keeping the max of the values with same name when 
merging two resource specs for operator chaining.
-                */
-               AGGREGATE_TYPE_MAX
-       }
-
        private final String name;
 
-       private final double value;
+       private final BigDecimal value;
 
-       private final ResourceAggregateType resourceAggregateType;
+       protected Resource(String name, double value) {
+               this(name, BigDecimal.valueOf(value));
+       }
 
-       protected Resource(String name, double value, ResourceAggregateType 
type) {
+       protected Resource(String name, BigDecimal value) {
                this.name = checkNotNull(name);
-               this.value = value;
-               this.resourceAggregateType = checkNotNull(type);
+               this.value = checkNotNull(value);
        }
 
        public Resource merge(Resource other) {
-               Preconditions.checkArgument(getClass() == other.getClass(), 
"Merge with different resource type");
-               Preconditions.checkArgument(name.equals(other.name), "Merge 
with different resource name");
-               Preconditions.checkArgument(resourceAggregateType == 
other.resourceAggregateType, "Merge with different aggregate 
resourceAggregateType");
-
-               final double aggregatedValue;
-               switch (resourceAggregateType) {
-                       case AGGREGATE_TYPE_MAX :
-                               aggregatedValue = Math.max(value, other.value);
-                               break;
-
-                       case AGGREGATE_TYPE_SUM:
-                       default:
-                               aggregatedValue = value + other.value;
-               }
+               checkNotNull(other, "Cannot merge with null resources");
+               checkArgument(getClass() == other.getClass(), "Merge with 
different resource type");
+               checkArgument(name.equals(other.name), "Merge with different 
resource name");
 
-               return create(aggregatedValue, resourceAggregateType);
+               return create(value.add(other.value));
        }
 
        public Resource subtract(Resource other) {
-               Preconditions.checkArgument(getClass() == other.getClass(), 
"Minus with different resource type");
-               Preconditions.checkArgument(name.equals(other.name), "Minus 
with different resource name");
-               Preconditions.checkArgument(resourceAggregateType == 
other.resourceAggregateType, "Minus with different aggregate 
resourceAggregateType");
-               Preconditions.checkArgument(value >= other.value, "Try to 
subtract a larger resource from this one.");
-
-               final double subtractedValue;
-               switch (resourceAggregateType) {
-                       case AGGREGATE_TYPE_MAX :
-                               // TODO: For max, should check if the latest 
max item is removed and change accordingly.
-                               subtractedValue = value;
-                               break;
-
-                       case AGGREGATE_TYPE_SUM:
-                       default:
-                               subtractedValue = value - other.value;
-               }
+               checkNotNull(other, "Cannot subtract null resources");
+               checkArgument(getClass() == other.getClass(), "Minus with 
different resource type");
+               checkArgument(name.equals(other.name), "Minus with different 
resource name");
+               checkArgument(value.compareTo(other.value) >= 0, "Try to 
subtract a larger resource from this one.");
 
-               return create(subtractedValue, resourceAggregateType);
+               return create(value.subtract(other.value));
 
 Review comment:
   what are the semantics for this returning a negative value?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to