Repository: flink Updated Branches: refs/heads/master c1734f4bf -> 37b4e2cef
[FLINK-8266] Add network memory to ResourceProfile for input/output memory of a task This closes #5170. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47e6069d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47e6069d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47e6069d Branch: refs/heads/master Commit: 47e6069d7a299c02a81f062a7acb6a792b71c146 Parents: a4ecc7f Author: shuai.xus <[email protected]> Authored: Fri Dec 15 18:43:27 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Jan 25 15:33:29 2018 +0100 ---------------------------------------------------------------------- .../clusterframework/types/ResourceProfile.java | 36 +++++++++++++++--- .../types/ResourceProfileTest.java | 40 ++++++++++---------- .../flink/yarn/YarnResourceManagerTest.java | 2 +- 3 files changed, 52 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index 6eb9af4..87d6fc5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -64,6 +64,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile /** How many native memory in mb are needed. */ private final int nativeMemoryInMB; + /** Memory used for the task in the slot to communicate with its upstreams. Set by job master. */ + private final int networkMemoryInMB; + /** A extensible field for user specified resources from {@link ResourceSpec}. */ private final Map<String, Resource> extendedResources = new HashMap<>(1); @@ -76,6 +79,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile * @param heapMemoryInMB The size of the heap memory, in megabytes. * @param directMemoryInMB The size of the direct memory, in megabytes. * @param nativeMemoryInMB The size of the native memory, in megabytes. + * @param networkMemoryInMB The size of the memory for input and output, in megabytes. * @param extendedResources The extended resources such as GPU and FPGA */ public ResourceProfile( @@ -83,11 +87,13 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile int heapMemoryInMB, int directMemoryInMB, int nativeMemoryInMB, + int networkMemoryInMB, Map<String, Resource> extendedResources) { this.cpuCores = cpuCores; this.heapMemoryInMB = heapMemoryInMB; this.directMemoryInMB = directMemoryInMB; this.nativeMemoryInMB = nativeMemoryInMB; + this.networkMemoryInMB = networkMemoryInMB; if (extendedResources != null) { this.extendedResources.putAll(extendedResources); } @@ -100,7 +106,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile * @param heapMemoryInMB The size of the heap memory, in megabytes. */ public ResourceProfile(double cpuCores, int heapMemoryInMB) { - this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP); + this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.EMPTY_MAP); } /** @@ -109,7 +115,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile * @param other The ResourceProfile to copy. */ public ResourceProfile(ResourceProfile other) { - this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources); + this(other.cpuCores, + other.heapMemoryInMB, + other.directMemoryInMB, + other.nativeMemoryInMB, + other.networkMemoryInMB, + other.extendedResources); } // ------------------------------------------------------------------------ @@ -151,12 +162,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile } /** + * Get the memory needed for task to communicate with its upstreams and downstreams in MB. + * @return The network memory in MB + */ + public int getNetworkMemoryInMB() { + return networkMemoryInMB; + } + + /** * Get the total memory needed in MB. * * @return The total memory in MB */ public int getMemoryInMB() { - return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB; + return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB; } /** @@ -187,7 +206,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile if (cpuCores >= required.getCpuCores() && heapMemoryInMB >= required.getHeapMemoryInMB() && directMemoryInMB >= required.getDirectMemoryInMB() && - nativeMemoryInMB >= required.getNativeMemoryInMB()) { + nativeMemoryInMB >= required.getNativeMemoryInMB() && + networkMemoryInMB >= required.getNetworkMemoryInMB()) { for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) { if (!extendedResources.containsKey(resource.getKey()) || !extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) || @@ -241,6 +261,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile result = 31 * result + heapMemoryInMB; result = 31 * result + directMemoryInMB; result = 31 * result + nativeMemoryInMB; + result = 31 * result + networkMemoryInMB; result = 31 * result + extendedResources.hashCode(); return result; } @@ -255,6 +276,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile return this.cpuCores == that.cpuCores && this.heapMemoryInMB == that.heapMemoryInMB && this.directMemoryInMB == that.directMemoryInMB && + this.networkMemoryInMB == that.networkMemoryInMB && Objects.equals(extendedResources, that.extendedResources); } return false; @@ -270,11 +292,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile "cpuCores=" + cpuCores + ", heapMemoryInMB=" + heapMemoryInMB + ", directMemoryInMB=" + directMemoryInMB + - ", nativeMemoryInMB=" + nativeMemoryInMB + resources + + ", nativeMemoryInMB=" + nativeMemoryInMB + + ", networkMemoryInMB=" + networkMemoryInMB + resources + '}'; } - static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) { + static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) { Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources()); return new ResourceProfile( @@ -282,6 +305,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile resourceSpec.getHeapMemory(), resourceSpec.getDirectMemory(), resourceSpec.getNativeMemory(), + networkMemory, copiedExtendedResources); } } http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java index 25cb5fb..7ed688a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java @@ -31,10 +31,10 @@ public class ResourceProfileTest { @Test public void testMatchRequirement() throws Exception { - ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, Collections.EMPTY_MAP); - ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, Collections.EMPTY_MAP); - ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, Collections.EMPTY_MAP); - ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, Collections.EMPTY_MAP); + ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.EMPTY_MAP); + ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.EMPTY_MAP); + ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.EMPTY_MAP); + ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.EMPTY_MAP); assertFalse(rp1.isMatching(rp2)); assertTrue(rp2.isMatching(rp1)); @@ -50,6 +50,9 @@ public class ResourceProfileTest { assertTrue(rp4.isMatching(rp3)); assertTrue(rp4.isMatching(rp4)); + ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, null); + assertFalse(rp4.isMatching(rp5)); + ResourceSpec rs1 = ResourceSpec.newBuilder(). setCpuCores(1.0). setHeapMemoryInMB(100). @@ -61,10 +64,9 @@ public class ResourceProfileTest { setGPUResource(1.1). build(); - - assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1))); - assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2))); - assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1))); + assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1, 0))); + assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).isMatching(ResourceProfile.fromResourceSpec(rs2, 0))); + assertFalse(ResourceProfile.fromResourceSpec(rs2, 0).isMatching(ResourceProfile.fromResourceSpec(rs1, 0))); } @Test @@ -76,7 +78,7 @@ public class ResourceProfileTest { public void testEquals() throws Exception { ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); - assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2))); + assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).equals(ResourceProfile.fromResourceSpec(rs2, 0))); ResourceSpec rs3 = ResourceSpec.newBuilder(). setCpuCores(1.0). @@ -88,37 +90,37 @@ public class ResourceProfileTest { setHeapMemoryInMB(100). setGPUResource(1.1). build(); - assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4))); + assertFalse(ResourceProfile.fromResourceSpec(rs3, 0).equals(ResourceProfile.fromResourceSpec(rs4, 0))); ResourceSpec rs5 = ResourceSpec.newBuilder(). setCpuCores(1.0). setHeapMemoryInMB(100). setGPUResource(2.2). build(); - assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5))); + assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100))); } @Test public void testCompareTo() throws Exception { ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); - assertEquals(0, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2))); + assertEquals(0, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs2, 0))); ResourceSpec rs3 = ResourceSpec.newBuilder(). setCpuCores(1.0). setHeapMemoryInMB(100). setGPUResource(2.2). build(); - assertEquals(-1, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3))); - assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1))); + assertEquals(-1, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0))); + assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs1, 0))); ResourceSpec rs4 = ResourceSpec.newBuilder(). setCpuCores(1.0). setHeapMemoryInMB(100). setGPUResource(1.1). build(); - assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4))); - assertEquals(-1, ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3))); + assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs4, 0))); + assertEquals(-1, ResourceProfile.fromResourceSpec(rs4, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0))); ResourceSpec rs5 = ResourceSpec.newBuilder(). @@ -126,7 +128,7 @@ public class ResourceProfileTest { setHeapMemoryInMB(100). setGPUResource(2.2). build(); - assertEquals(0, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5))); + assertEquals(0, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs5, 0))); } @Test @@ -136,10 +138,10 @@ public class ResourceProfileTest { setHeapMemoryInMB(100). setGPUResource(1.6). build(); - ResourceProfile rp = ResourceProfile.fromResourceSpec(rs); + ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, 50); assertEquals(1.0, rp.getCpuCores(), 0.000001); - assertEquals(100, rp.getMemoryInMB()); + assertEquals(150, rp.getMemoryInMB()); assertEquals(100, rp.getOperatorsMemoryInMB()); assertEquals(1.6, rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001); } http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index cd08fb9..1e70169 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -347,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger { final SlotReport slotReport = new SlotReport( new SlotStatus( new SlotID(taskManagerResourceId, 1), - new ResourceProfile(10, 1, 1, 1, Collections.emptyMap()))); + new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway .registerTaskExecutor(
