xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r590131518
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -93,7 +92,7 @@ private ResourceSpec(
final MemorySize taskHeapMemory,
final MemorySize taskOffHeapMemory,
final MemorySize managedMemory,
- final Resource... extendedResources) {
+ final Map<String, Resource> extendedResources) {
Review comment:
It would be nice to enforce `ExternalResource` rather than `Resource`
here.
IIUC, what prevent us from doing so is that, return types of
`Resource#merge/subtract/multiply/divide/create` are hard coded to `Resource`.
We might consider to solve this with generics. That should also help replacing
`Resource` with `CPUResource` wherever needed.
```
abstract class Resource<T extends Resource<T>> {
T merge(T other);
}
class ExternalResource extends Resource<ExternalResource> {}
```
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
return this;
}
- public Builder setGPUResource(double gpus) {
- this.gpuResource = new GPUResource(gpus);
+ public Builder addExtendedResource(String name, Resource
extendedResource) {
Review comment:
I would be fine with providing a shortcut setter for a commonly used
external resource like GPU. That would also save us from touching all the test
cases.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -118,13 +141,22 @@ public boolean equals(Object obj) {
&& Objects.equals(this.taskHeapSize, that.taskHeapSize)
&& Objects.equals(this.taskOffHeapSize,
that.taskOffHeapSize)
&& Objects.equals(this.networkMemSize, that.networkMemSize)
- && Objects.equals(this.managedMemSize,
that.managedMemSize);
+ && Objects.equals(this.managedMemSize, that.managedMemSize)
+ && Objects.equals(this.extendedResources,
that.extendedResources);
}
return false;
}
@Override
public String toString() {
+ final StringBuilder extendedResourceStr = new
StringBuilder(extendedResources.size() * 10);
+ for (Map.Entry<String, Resource> resource :
extendedResources.entrySet()) {
+ extendedResourceStr
+ .append(", ")
+ .append(resource.getKey())
+ .append('=')
+ .append(resource.getValue().getValue());
+ }
Review comment:
Consider deduplicating this string generation with a util method. We can
put it in `ExternalResourceUtils`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtils.java
##########
@@ -29,14 +29,24 @@
*/
public static ResourceProfile generateDefaultSlotResourceProfile(
WorkerResourceSpec workerResourceSpec, int numSlotsPerWorker) {
- return ResourceProfile.newBuilder()
-
.setCpuCores(workerResourceSpec.getCpuCores().divide(numSlotsPerWorker))
-
.setTaskHeapMemory(workerResourceSpec.getTaskHeapSize().divide(numSlotsPerWorker))
- .setTaskOffHeapMemory(
-
workerResourceSpec.getTaskOffHeapSize().divide(numSlotsPerWorker))
-
.setManagedMemory(workerResourceSpec.getManagedMemSize().divide(numSlotsPerWorker))
-
.setNetworkMemory(workerResourceSpec.getNetworkMemSize().divide(numSlotsPerWorker))
- .build();
+ final ResourceProfile.Builder resourceProfileBuilder =
Review comment:
I would suggest to add external resources to `WorkerResourceSpec` and
`TaskExecutorResourceSpec` in one commit, so that this util is always
consistent with `TaskExecutorResourceUtils` in the git history.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
return this;
}
- public Builder setGPUResource(double gpus) {
- this.gpuResource = new GPUResource(gpus);
+ public Builder addExtendedResource(String name, Resource
extendedResource) {
+ this.extendedResources.put(name, extendedResource);
+ return this;
+ }
+
+ public Builder addExtendedResources(Map<String, Resource>
extendedResources) {
Review comment:
We should not need the map keys for the argument. Same for
`ResourceProfile.Builder`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -174,9 +208,26 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
return this;
}
+ public Builder addExtendedResource(String name, Resource
extendedResource) {
+ this.extendedResources.put(name, extendedResource);
+ return this;
+ }
+
+ public Builder addExtendedResources(Map<String, Resource>
extendedResources) {
+ if (extendedResources != null) {
+ this.extendedResources.putAll(extendedResources);
+ }
+ return this;
+ }
Review comment:
Same here:
* `Resource` -> `ExternalResource`
* Should not need the names.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
return this;
}
- public Builder setGPUResource(double gpus) {
- this.gpuResource = new GPUResource(gpus);
+ public Builder addExtendedResource(String name, Resource
extendedResource) {
Review comment:
We should not need the argument `name`. Same for
`ResourceProfile.Builder`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]