xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415523696
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
}
Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec
workerResourceSpec) {
- return
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+ final InternalContainerResource internalContainerResource =
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
- this::createAndMapContainerResource));
+ this::createAndMapContainerResource);
+ if (internalContainerResource != null) {
+ return
Optional.of(internalContainerResource.toResource());
+ } else {
+ return Optional.empty();
+ }
}
Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
- return getEquivalentContainerResource(containerResource,
matchingStrategy).stream()
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
.flatMap(resource ->
containerResourceToWorkerSpecs.getOrDefault(resource,
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
+ .map(InternalContainerResource::toResource)
+ .collect(Collectors.toSet());
+ }
+
+ private Set<InternalContainerResource>
getEquivalentInternalContainerResource(final InternalContainerResource
internalContainerResource, final MatchingStrategy matchingStrategy) {
// Yarn might ignore the requested vcores, depending on its
configurations.
// In such cases, we should also not matching vcores.
- final Set<Resource> equivalentContainerResources;
+ final Set<InternalContainerResource>
equivalentInternalContainerResources;
switch (matchingStrategy) {
case MATCH_VCORE:
- equivalentContainerResources =
Collections.singleton(containerResource);
+ equivalentInternalContainerResources =
Collections.singleton(internalContainerResource);
break;
case IGNORE_VCORE:
default:
- equivalentContainerResources =
containerMemoryToContainerResource
-
.getOrDefault(containerResource.getMemory(), Collections.emptySet());
+ equivalentInternalContainerResources =
containerMemoryToContainerResource
+
.getOrDefault(internalContainerResource.memory, Collections.emptySet());
break;
}
- return equivalentContainerResources;
+ return equivalentInternalContainerResources;
}
@Nullable
- private Resource createAndMapContainerResource(final WorkerResourceSpec
workerResourceSpec) {
+ private InternalContainerResource createAndMapContainerResource(final
WorkerResourceSpec workerResourceSpec) {
Review comment:
We cannot do that because it is `InternalContainerResource` we want to
put into `workerSpecToContainerResource`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]