This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4181bb4  [FLINK-17917][yarn] Ignore the external resource with a value 
of 0 in constructing InternalContainerResource
4181bb4 is described below

commit 4181bb4ddb435d457a9c1c61074c131da2a96238
Author: Yangze Guo <[email protected]>
AuthorDate: Mon May 25 14:24:59 2020 +0800

    [FLINK-17917][yarn] Ignore the external resource with a value of 0 in 
constructing InternalContainerResource
    
    This closes #12315.
---
 .../yarn/WorkerSpecContainerResourceAdapter.java    | 12 +++++++++---
 .../WorkerSpecContainerResourceAdapterTest.java     | 21 +++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
index ad48a47..d563d92 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -166,15 +167,20 @@ class WorkerSpecContainerResourceAdapter {
         * This class is for {@link WorkerSpecContainerResourceAdapter} 
internal usages only, to overcome the problem that
         * hash codes are calculated inconsistently across different {@link 
Resource} implementations.
         */
-       private static final class InternalContainerResource {
+       @VisibleForTesting
+       static final class InternalContainerResource {
                private final int memory;
                private final int vcores;
                private final Map<String, Long> externalResources;
 
-               private InternalContainerResource(final int memory, final int 
vcores, final Map<String, Long> externalResources) {
+               @VisibleForTesting
+               InternalContainerResource(final int memory, final int vcores, 
final Map<String, Long> externalResources) {
                        this.memory = memory;
                        this.vcores = vcores;
-                       this.externalResources = externalResources;
+                       this.externalResources = externalResources.entrySet()
+                                               .stream()
+                                               .filter(entry -> 
!entry.getValue().equals(0L))
+                                               
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                }
 
                private InternalContainerResource(final Resource resource) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
index 771165a..1f466f4 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
@@ -29,12 +29,15 @@ import 
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -238,6 +241,24 @@ public class WorkerSpecContainerResourceAdapterTest 
extends TestLogger {
                assertThat(adapter.getWorkerSpecs(resourceImpl2, strategy), 
contains(workerSpec));
        }
 
+       @Test
+       public void 
testMatchInternalContainerResourceIgnoresZeroValueExternalResources() {
+               final Map<String, Long> externalResources1 = new HashMap<>();
+               final Map<String, Long> externalResources2 = new HashMap<>();
+
+               externalResources1.put("foo", 0L);
+               externalResources1.put("bar", 1L);
+               externalResources2.put("zoo", 0L);
+               externalResources2.put("bar", 1L);
+
+               final 
WorkerSpecContainerResourceAdapter.InternalContainerResource 
internalContainerResource1 =
+                       new 
WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1, 
externalResources1);
+               final 
WorkerSpecContainerResourceAdapter.InternalContainerResource 
internalContainerResource2 =
+                       new 
WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1, 
externalResources2);
+
+               assertEquals(internalContainerResource1, 
internalContainerResource2);
+       }
+
        private Configuration getConfigProcessSpecEqualsWorkerSpec() {
                final Configuration config = new Configuration();
                config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
MemorySize.ZERO);

Reply via email to