xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r595681050
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
return this;
}
- public Builder addExtendedResource(String name, Resource
extendedResource) {
- this.extendedResources.put(name, extendedResource);
+ // Add the given extended resource, the old value with the same
resource name will be
+ // override if present.
+ public Builder setExtendedResource(Resource extendedResource) {
+ this.extendedResources.put(extendedResource.getName(),
extendedResource);
return this;
}
- public Builder addExtendedResources(Map<String, Resource>
extendedResources) {
- if (extendedResources != null) {
- this.extendedResources.putAll(extendedResources);
- }
+ // Add the given extended resources, this will override all the
previous records.
Review comment:
```
Add the given extended resources. This will discard all the previous added
extended resources.
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -109,18 +120,27 @@ public TaskExecutorProcessSpec(
networkMemSize,
managedMemorySize),
new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize),
- 1);
+ 1,
+ extendedResources);
}
protected TaskExecutorProcessSpec(
CPUResource cpuCores,
TaskExecutorFlinkMemory flinkMemory,
JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead,
- int numSlots) {
+ int numSlots,
+ Collection<ExternalResource> extendedResources) {
super(flinkMemory, jvmMetaspaceAndOverhead);
this.cpuCores = cpuCores;
this.numSlots = numSlots;
+ this.extendedResources =
+ Preconditions.checkNotNull(extendedResources).stream()
+ .filter(resource -> !resource.isZero())
+ .collect(Collectors.toMap(ExternalResource::getName,
Function.identity()));
+ Preconditions.checkState(
Review comment:
Same for `WorkerResourceSpec` and `TaskExecutorResourceSpec`.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
##########
@@ -51,50 +52,51 @@ protected Resource(String name, BigDecimal value) {
this.value = value;
}
- public Resource merge(Resource other) {
+ public T merge(T other) {
checkNotNull(other, "Cannot merge with null resources");
checkArgument(getClass() == other.getClass(), "Merge with different
resource type");
- checkArgument(name.equals(other.name), "Merge with different resource
name");
+ checkArgument(name.equals(other.getName()), "Merge with different
resource name");
- return create(value.add(other.value));
+ return create(value.add(other.getValue()));
}
- public Resource subtract(Resource other) {
+ public T subtract(T other) {
checkNotNull(other, "Cannot subtract null resources");
checkArgument(getClass() == other.getClass(), "Minus with different
resource type");
- checkArgument(name.equals(other.name), "Minus with different resource
name");
+ checkArgument(name.equals(other.getName()), "Minus with different
resource name");
checkArgument(
- value.compareTo(other.value) >= 0,
+ value.compareTo(other.getValue()) >= 0,
"Try to subtract a larger resource from this one.");
- return create(value.subtract(other.value));
+ return create(value.subtract(other.getValue()));
}
- public Resource multiply(BigDecimal multiplier) {
+ public T multiply(BigDecimal multiplier) {
return create(value.multiply(multiplier));
}
- public Resource multiply(int multiplier) {
+ public T multiply(int multiplier) {
return multiply(BigDecimal.valueOf(multiplier));
}
- public Resource divide(BigDecimal by) {
+ public T divide(BigDecimal by) {
return create(value.divide(by, 16, RoundingMode.DOWN));
}
- public Resource divide(int by) {
+ public T divide(int by) {
return divide(BigDecimal.valueOf(by));
}
@Override
+ @SuppressWarnings("unchecked")
Review comment:
It would be nice to keep the scope of `@SuppressWarnings` as small as
possible. In this case, the single statement `T other = (T) o;` should be
enough.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##########
@@ -109,18 +120,27 @@ public TaskExecutorProcessSpec(
networkMemSize,
managedMemorySize),
new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize),
- 1);
+ 1,
+ extendedResources);
}
protected TaskExecutorProcessSpec(
CPUResource cpuCores,
TaskExecutorFlinkMemory flinkMemory,
JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead,
- int numSlots) {
+ int numSlots,
+ Collection<ExternalResource> extendedResources) {
super(flinkMemory, jvmMetaspaceAndOverhead);
this.cpuCores = cpuCores;
this.numSlots = numSlots;
+ this.extendedResources =
+ Preconditions.checkNotNull(extendedResources).stream()
+ .filter(resource -> !resource.isZero())
+ .collect(Collectors.toMap(ExternalResource::getName,
Function.identity()));
+ Preconditions.checkState(
Review comment:
`checkArgument` is preferred.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
return this;
}
- public Builder addExtendedResource(String name, Resource
extendedResource) {
- this.extendedResources.put(name, extendedResource);
+ // Add the given extended resource, the old value with the same
resource name will be
+ // override if present.
+ public Builder setExtendedResource(Resource extendedResource) {
+ this.extendedResources.put(extendedResource.getName(),
extendedResource);
return this;
}
- public Builder addExtendedResources(Map<String, Resource>
extendedResources) {
- if (extendedResources != null) {
- this.extendedResources.putAll(extendedResources);
- }
+ // Add the given extended resources, this will override all the
previous records.
+ public Builder setExtendedResources(Collection<Resource>
extendedResources) {
+ this.extendedResources =
+ extendedResources.stream()
+ .collect(Collectors.toMap(Resource::getName,
Function.identity()));
Review comment:
JavaDoc style is preferred. It hints the compiler/IDE that there's a
binding between the docs and the following class/field/method.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -485,15 +486,11 @@ public String toString() {
return "ResourceProfile{ANY}";
}
- final StringBuilder extendedResourceStr = new
StringBuilder(extendedResources.size() * 10);
- for (Map.Entry<String, ExternalResource> resource :
extendedResources.entrySet()) {
- extendedResourceStr
- .append(", ")
- .append(resource.getKey())
- .append('=')
- .append(resource.getValue().getValue());
- }
- return "ResourceProfile{" + getResourceString() + extendedResourceStr
+ '}';
+ return "ResourceProfile{"
+ + getResourceString()
+ + ", "
+ +
ExternalResourceUtils.generateExternalResourcesString(extendedResources)
Review comment:
What if there's no external resource? We will have something like
`ResourceProfile{a=b, c=d, }`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -485,15 +486,11 @@ public String toString() {
return "ResourceProfile{ANY}";
}
- final StringBuilder extendedResourceStr = new
StringBuilder(extendedResources.size() * 10);
- for (Map.Entry<String, ExternalResource> resource :
extendedResources.entrySet()) {
- extendedResourceStr
- .append(", ")
- .append(resource.getKey())
- .append('=')
- .append(resource.getValue().getValue());
- }
- return "ResourceProfile{" + getResourceString() + extendedResourceStr
+ '}';
+ return "ResourceProfile{"
+ + getResourceString()
+ + ", "
+ +
ExternalResourceUtils.generateExternalResourcesString(extendedResources)
Review comment:
Same for `TaskExecutorProcessSpec` and `WorkerResourceSpec`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -46,12 +49,19 @@
private static final Logger LOG =
LoggerFactory.getLogger(ExternalResourceUtils.class);
+ // Indicate an empty external resources list
+ public static final String EMPTY = "empty-list";
Review comment:
I'd suggest to:
- Use the phrase `none`
- Add the constant in `ExternalResourceOptions`
- Mention in the description of `EXTERNAL_RESOURCE_LIST` that the keyword is
preserved
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -157,6 +167,30 @@ private ExternalResourceUtils() {
return externalResourceAmountMap;
}
+ /** Get the collection of all enabled external resources. */
+ public static Collection<ExternalResource> getExternalResourcesCollection(
+ Configuration config) {
+ return getExternalResourceAmountMap(config).entrySet().stream()
+ .map(entry -> new ExternalResource(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ /** Generate the string expression of the given external resources. */
+ public static String generateExternalResourcesString(
+ Map<String, ExternalResource> extendedResources) {
+ final StringBuilder extendedResourceStr = new
StringBuilder(extendedResources.size() * 10);
+ for (Map.Entry<String, ExternalResource> resource :
extendedResources.entrySet()) {
+ if (extendedResourceStr.length() > 0) {
+ extendedResourceStr.append(", ");
+ }
+ extendedResourceStr
+ .append(resource.getKey())
+ .append('=')
+ .append(resource.getValue().getValue());
+ }
Review comment:
This can be simplified as follows:
```
return extendedResources.entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue().getValue())
.collect(Collectors.joining(", "));
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
return this;
}
- public Builder addExtendedResource(String name, Resource
extendedResource) {
- this.extendedResources.put(name, extendedResource);
+ // Add the given extended resource, the old value with the same
resource name will be
+ // override if present.
Review comment:
```
Add the given extended resource. The old value with the same resource name
will be replaced if present.
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -157,6 +167,30 @@ private ExternalResourceUtils() {
return externalResourceAmountMap;
}
+ /** Get the collection of all enabled external resources. */
+ public static Collection<ExternalResource> getExternalResourcesCollection(
+ Configuration config) {
+ return getExternalResourceAmountMap(config).entrySet().stream()
+ .map(entry -> new ExternalResource(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ /** Generate the string expression of the given external resources. */
+ public static String generateExternalResourcesString(
+ Map<String, ExternalResource> extendedResources) {
Review comment:
`Map<String, ExternalResource>` -> `Collection<ExternalResource>`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##########
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
return this;
}
- public Builder addExtendedResource(String name, Resource
extendedResource) {
- this.extendedResources.put(name, extendedResource);
+ // Add the given extended resource, the old value with the same
resource name will be
+ // override if present.
+ public Builder setExtendedResource(Resource extendedResource) {
+ this.extendedResources.put(extendedResource.getName(),
extendedResource);
return this;
}
- public Builder addExtendedResources(Map<String, Resource>
extendedResources) {
- if (extendedResources != null) {
- this.extendedResources.putAll(extendedResources);
- }
+ // Add the given extended resources, this will override all the
previous records.
+ public Builder setExtendedResources(Collection<Resource>
extendedResources) {
+ this.extendedResources =
+ extendedResources.stream()
+ .collect(Collectors.toMap(Resource::getName,
Function.identity()));
Review comment:
Same for `ResourceSpec.Builder` and `WorkerResourceSpec.Builder`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]