KarmaGYZ commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415511862
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
}
Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec
workerResourceSpec) {
- return
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+ final InternalContainerResource internalContainerResource =
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
- this::createAndMapContainerResource));
+ this::createAndMapContainerResource);
+ if (internalContainerResource != null) {
+ return
Optional.of(internalContainerResource.toResource());
+ } else {
+ return Optional.empty();
+ }
}
Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
- return getEquivalentContainerResource(containerResource,
matchingStrategy).stream()
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
.flatMap(resource ->
containerResourceToWorkerSpecs.getOrDefault(resource,
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
+ .map(InternalContainerResource::toResource)
+ .collect(Collectors.toSet());
+ }
+
+ private Set<InternalContainerResource>
getEquivalentInternalContainerResource(final InternalContainerResource
internalContainerResource, final MatchingStrategy matchingStrategy) {
Review comment:
I would only keep the `Set<Resource>
getEquivalentContainerResource(final Resource containerResource, final
MatchingStrategy matchingStrategy)`. To avoid duplication code `final
InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);`
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
}
Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec
workerResourceSpec) {
- return
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+ final InternalContainerResource internalContainerResource =
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
- this::createAndMapContainerResource));
+ this::createAndMapContainerResource);
+ if (internalContainerResource != null) {
+ return
Optional.of(internalContainerResource.toResource());
+ } else {
+ return Optional.empty();
+ }
}
Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
- return getEquivalentContainerResource(containerResource,
matchingStrategy).stream()
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
Review comment:
Seems equivalent to `getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy)`.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -71,52 +72,65 @@
}
Optional<Resource> tryComputeContainerResource(final WorkerResourceSpec
workerResourceSpec) {
- return
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+ final InternalContainerResource internalContainerResource =
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
- this::createAndMapContainerResource));
+ this::createAndMapContainerResource);
+ if (internalContainerResource != null) {
+ return
Optional.of(internalContainerResource.toResource());
+ } else {
+ return Optional.empty();
+ }
}
Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
- return getEquivalentContainerResource(containerResource,
matchingStrategy).stream()
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
.flatMap(resource ->
containerResourceToWorkerSpecs.getOrDefault(resource,
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
+ return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
+ .map(InternalContainerResource::toResource)
+ .collect(Collectors.toSet());
+ }
+
+ private Set<InternalContainerResource>
getEquivalentInternalContainerResource(final InternalContainerResource
internalContainerResource, final MatchingStrategy matchingStrategy) {
// Yarn might ignore the requested vcores, depending on its
configurations.
// In such cases, we should also not matching vcores.
- final Set<Resource> equivalentContainerResources;
+ final Set<InternalContainerResource>
equivalentInternalContainerResources;
switch (matchingStrategy) {
case MATCH_VCORE:
- equivalentContainerResources =
Collections.singleton(containerResource);
+ equivalentInternalContainerResources =
Collections.singleton(internalContainerResource);
break;
case IGNORE_VCORE:
default:
- equivalentContainerResources =
containerMemoryToContainerResource
-
.getOrDefault(containerResource.getMemory(), Collections.emptySet());
+ equivalentInternalContainerResources =
containerMemoryToContainerResource
+
.getOrDefault(internalContainerResource.memory, Collections.emptySet());
break;
}
- return equivalentContainerResources;
+ return equivalentInternalContainerResources;
}
@Nullable
- private Resource createAndMapContainerResource(final WorkerResourceSpec
workerResourceSpec) {
+ private InternalContainerResource createAndMapContainerResource(final
WorkerResourceSpec workerResourceSpec) {
Review comment:
I think it makes sense to still return `Resource` here. And we could use
`Optional.ofNullable` again.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
##########
@@ -194,6 +198,41 @@ public void testMaxLimit() {
assertFalse(adapter.tryComputeContainerResource(workerSpec2).isPresent());
}
+ @Test
+ public void testMatchResourceWithDifferentImplementation() {
+ final WorkerSpecContainerResourceAdapter.MatchingStrategy
strategy =
+
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
+ final int minMemMB = 1;
+ final int minVcore = 1;
+
+ final WorkerSpecContainerResourceAdapter adapter =
+ new WorkerSpecContainerResourceAdapter(
+ getConfigProcessSpecEqualsWorkerSpec(),
+ minMemMB,
+ minVcore,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE);
+
+ final WorkerResourceSpec workerSpec = new
WorkerResourceSpec.Builder()
+ .setCpuCores(1.0)
+ .setTaskHeapMemoryMB(100)
+ .setTaskOffHeapMemoryMB(200)
+ .setNetworkMemoryMB(300)
+ .setManagedMemoryMB(400)
+ .build();
+
+ Optional<Resource> resourceOpt =
adapter.tryComputeContainerResource(workerSpec);
+ assertTrue(resourceOpt.isPresent());
+ Resource resourceImpl1 = resourceOpt.get();
+
+ Resource resourceImpl2 = new TestingResourceImpl(
+ resourceImpl1.getMemory(),
+ resourceImpl1.getVirtualCores() + 1);
Review comment:
Is it related to `MatchResourceWithDifferentImplementation `?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]