KarmaGYZ commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415511862



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
        }
 
        Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-               return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+               final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
                        Preconditions.checkNotNull(workerResourceSpec),
-                       this::createAndMapContainerResource));
+                       this::createAndMapContainerResource);
+               if (internalContainerResource != null) {
+                       return 
Optional.of(internalContainerResource.toResource());
+               } else {
+                       return Optional.empty();
+               }
        }
 
        Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-               return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
                        .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
                        .collect(Collectors.toSet());
        }
 
        Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
+                       .map(InternalContainerResource::toResource)
+                       .collect(Collectors.toSet());
+       }
+
+       private Set<InternalContainerResource> 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {

Review comment:
       I would only keep the `Set<Resource> 
getEquivalentContainerResource(final Resource containerResource, final 
MatchingStrategy matchingStrategy)`. To avoid duplication code `final 
InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);`

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
        }
 
        Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-               return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+               final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
                        Preconditions.checkNotNull(workerResourceSpec),
-                       this::createAndMapContainerResource));
+                       this::createAndMapContainerResource);
+               if (internalContainerResource != null) {
+                       return 
Optional.of(internalContainerResource.toResource());
+               } else {
+                       return Optional.empty();
+               }
        }
 
        Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-               return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()

Review comment:
       Seems equivalent to `getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy)`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
        }
 
        Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-               return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+               final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
                        Preconditions.checkNotNull(workerResourceSpec),
-                       this::createAndMapContainerResource));
+                       this::createAndMapContainerResource);
+               if (internalContainerResource != null) {
+                       return 
Optional.of(internalContainerResource.toResource());
+               } else {
+                       return Optional.empty();
+               }
        }
 
        Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-               return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
                        .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
                        .collect(Collectors.toSet());
        }
 
        Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+               final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+               return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
+                       .map(InternalContainerResource::toResource)
+                       .collect(Collectors.toSet());
+       }
+
+       private Set<InternalContainerResource> 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {
                // Yarn might ignore the requested vcores, depending on its 
configurations.
                // In such cases, we should also not matching vcores.
-               final Set<Resource> equivalentContainerResources;
+               final Set<InternalContainerResource> 
equivalentInternalContainerResources;
                switch (matchingStrategy) {
                        case MATCH_VCORE:
-                               equivalentContainerResources = 
Collections.singleton(containerResource);
+                               equivalentInternalContainerResources = 
Collections.singleton(internalContainerResource);
                                break;
                        case IGNORE_VCORE:
                        default:
-                               equivalentContainerResources = 
containerMemoryToContainerResource
-                                       
.getOrDefault(containerResource.getMemory(), Collections.emptySet());
+                               equivalentInternalContainerResources = 
containerMemoryToContainerResource
+                                       
.getOrDefault(internalContainerResource.memory, Collections.emptySet());
                                break;
                }
-               return equivalentContainerResources;
+               return equivalentInternalContainerResources;
        }
 
        @Nullable
-       private Resource createAndMapContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+       private InternalContainerResource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {

Review comment:
       I think it makes sense to still return `Resource` here. And we could use 
`Optional.ofNullable` again.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
##########
@@ -194,6 +198,41 @@ public void testMaxLimit() {
                
assertFalse(adapter.tryComputeContainerResource(workerSpec2).isPresent());
        }
 
+       @Test
+       public void testMatchResourceWithDifferentImplementation() {
+               final WorkerSpecContainerResourceAdapter.MatchingStrategy 
strategy =
+                       
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
+               final int minMemMB = 1;
+               final int minVcore = 1;
+
+               final WorkerSpecContainerResourceAdapter adapter =
+                       new WorkerSpecContainerResourceAdapter(
+                               getConfigProcessSpecEqualsWorkerSpec(),
+                               minMemMB,
+                               minVcore,
+                               Integer.MAX_VALUE,
+                               Integer.MAX_VALUE);
+
+               final WorkerResourceSpec workerSpec = new 
WorkerResourceSpec.Builder()
+                       .setCpuCores(1.0)
+                       .setTaskHeapMemoryMB(100)
+                       .setTaskOffHeapMemoryMB(200)
+                       .setNetworkMemoryMB(300)
+                       .setManagedMemoryMB(400)
+                       .build();
+
+               Optional<Resource> resourceOpt = 
adapter.tryComputeContainerResource(workerSpec);
+               assertTrue(resourceOpt.isPresent());
+               Resource resourceImpl1 = resourceOpt.get();
+
+               Resource resourceImpl2 = new TestingResourceImpl(
+                       resourceImpl1.getMemory(),
+                       resourceImpl1.getVirtualCores() + 1);

Review comment:
       Is it related to `MatchResourceWithDifferentImplementation `?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to