Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4911#discussion_r154965613
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
---
@@ -183,17 +240,124 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+ result = 31 * result + extendedResources.hashCode();
return result;
}
@Override
public String toString() {
+ String extend = "";
+ for (Resource resource : extendedResources.values()) {
+ extend += ", " + resource.name + "=" + resource.value;
+ }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
- ", stateSizeInMB=" + stateSizeInMB +
+ ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+ public static abstract class Resource implements Serializable {
+ final private String name;
+
+ final private Double value;
+
+ final private ResourceAggregateType type;
+
+ public Resource(String name, double value,
ResourceAggregateType type) {
+ this.name = checkNotNull(name);
+ this.value = Double.valueOf(value);
+ this.type = checkNotNull(type);
+ }
+
+ Resource merge(Resource other) {
+ Preconditions.checkArgument(getClass() ==
other.getClass(), "Merge with different resource type");
+
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different
resource name");
+
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different
aggregate type");
+
+ Double value = null;
+ switch (type) {
+ case AGGREGATE_TYPE_MAX :
+ value =
other.value.compareTo(this.value) > 0 ? other.value : this.value;
+ break;
+
+ case AGGREGATE_TYPE_SUM:
+ default:
+ value = this.value + other.value;
+ }
+
+ Resource resource = create(value, type);
+ return resource;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o != null && getClass() == o.getClass()) {
+ Resource other = (Resource) o;
+
+ return name.equals(other.name) &&
type.equals(other.type) && value.equals(other.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + type.ordinal();
+ result = 31 * result + value.hashCode();
+ return result;
+ }
+
+ /**
+ * create a resource of the same resource type
+ *
+ * @param value the value of the resource
+ * @param type the aggregate type of the resource
+ * @return a new instance of the sub resource
+ */
+ protected abstract Resource create(double value,
ResourceAggregateType type);
+ }
+
+ /**
+ * The GPU resource.
+ */
+ public static class GPUResource extends Resource {
+
+ public GPUResource(double value) {
+ this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+ }
+
+ public GPUResource(double value, ResourceAggregateType type) {
+ super("GPU", value, type);
+ }
+
+ @Override
+ public Resource create(double value, ResourceAggregateType
type) {
+ return new GPUResource(value, type);
+ }
+ }
+
+ /**
+ * The FPGA resource.
+ */
+ public static class FPGAResource extends Resource {
--- End diff --
I think this resource is too specific for Flink right now. Therefore I
would remove it and only keep the `GPUResource`.
---