xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415523696



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
        }
 
        Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-               return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+               final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
                        Preconditions.checkNotNull(workerResourceSpec),
-                       this::createAndMapContainerResource));
+                       this::createAndMapContainerResource);
+               if (internalContainerResource != null) {
+                       return 
Optional.of(internalContainerResource.toResource());
+               } else {
+                       return Optional.empty();
+               }
        }
 
        Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-               return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
                        .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
                        .collect(Collectors.toSet());
        }
 
        Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
+                       .map(InternalContainerResource::toResource)
+                       .collect(Collectors.toSet());
+       }
+
+       private Set<InternalContainerResource> 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {
                // Yarn might ignore the requested vcores, depending on its 
configurations.
                // In such cases, we should also not matching vcores.
-               final Set<Resource> equivalentContainerResources;
+               final Set<InternalContainerResource> 
equivalentInternalContainerResources;
                switch (matchingStrategy) {
                        case MATCH_VCORE:
-                               equivalentContainerResources = 
Collections.singleton(containerResource);
+                               equivalentInternalContainerResources = 
Collections.singleton(internalContainerResource);
                                break;
                        case IGNORE_VCORE:
                        default:
-                               equivalentContainerResources = 
containerMemoryToContainerResource
-                                       
.getOrDefault(containerResource.getMemory(), Collections.emptySet());
+                               equivalentInternalContainerResources = 
containerMemoryToContainerResource
+                                       
.getOrDefault(internalContainerResource.memory, Collections.emptySet());
                                break;
                }
-               return equivalentContainerResources;
+               return equivalentInternalContainerResources;
        }
 
        @Nullable
-       private Resource createAndMapContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+       private InternalContainerResource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {

Review comment:
       We cannot do that because it is `InternalContainerResource` we want to 
put into `workerSpecToContainerResource`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to