This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 4181bb4 [FLINK-17917][yarn] Ignore the external resource with a value
of 0 in constructing InternalContainerResource
4181bb4 is described below
commit 4181bb4ddb435d457a9c1c61074c131da2a96238
Author: Yangze Guo <[email protected]>
AuthorDate: Mon May 25 14:24:59 2020 +0800
[FLINK-17917][yarn] Ignore the external resource with a value of 0 in
constructing InternalContainerResource
This closes #12315.
---
.../yarn/WorkerSpecContainerResourceAdapter.java | 12 +++++++++---
.../WorkerSpecContainerResourceAdapterTest.java | 21 +++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
index ad48a47..d563d92 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -166,15 +167,20 @@ class WorkerSpecContainerResourceAdapter {
* This class is for {@link WorkerSpecContainerResourceAdapter}
internal usages only, to overcome the problem that
* hash codes are calculated inconsistently across different {@link
Resource} implementations.
*/
- private static final class InternalContainerResource {
+ @VisibleForTesting
+ static final class InternalContainerResource {
private final int memory;
private final int vcores;
private final Map<String, Long> externalResources;
- private InternalContainerResource(final int memory, final int
vcores, final Map<String, Long> externalResources) {
+ @VisibleForTesting
+ InternalContainerResource(final int memory, final int vcores,
final Map<String, Long> externalResources) {
this.memory = memory;
this.vcores = vcores;
- this.externalResources = externalResources;
+ this.externalResources = externalResources.entrySet()
+ .stream()
+ .filter(entry ->
!entry.getValue().equals(0L))
+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private InternalContainerResource(final Resource resource) {
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
index 771165a..1f466f4 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
@@ -29,12 +29,15 @@ import
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.junit.Test;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -238,6 +241,24 @@ public class WorkerSpecContainerResourceAdapterTest
extends TestLogger {
assertThat(adapter.getWorkerSpecs(resourceImpl2, strategy),
contains(workerSpec));
}
+ @Test
+ public void
testMatchInternalContainerResourceIgnoresZeroValueExternalResources() {
+ final Map<String, Long> externalResources1 = new HashMap<>();
+ final Map<String, Long> externalResources2 = new HashMap<>();
+
+ externalResources1.put("foo", 0L);
+ externalResources1.put("bar", 1L);
+ externalResources2.put("zoo", 0L);
+ externalResources2.put("bar", 1L);
+
+ final
WorkerSpecContainerResourceAdapter.InternalContainerResource
internalContainerResource1 =
+ new
WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1,
externalResources1);
+ final
WorkerSpecContainerResourceAdapter.InternalContainerResource
internalContainerResource2 =
+ new
WorkerSpecContainerResourceAdapter.InternalContainerResource(1024, 1,
externalResources2);
+
+ assertEquals(internalContainerResource1,
internalContainerResource2);
+ }
+
private Configuration getConfigProcessSpecEqualsWorkerSpec() {
final Configuration config = new Configuration();
config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
MemorySize.ZERO);