Repository: flink
Updated Branches:
  refs/heads/master c1734f4bf -> 37b4e2cef


[FLINK-8266] Add network memory to ResourceProfile for input/output memory of a 
task

This closes #5170.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47e6069d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47e6069d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47e6069d

Branch: refs/heads/master
Commit: 47e6069d7a299c02a81f062a7acb6a792b71c146
Parents: a4ecc7f
Author: shuai.xus <[email protected]>
Authored: Fri Dec 15 18:43:27 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jan 25 15:33:29 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java | 36 +++++++++++++++---
 .../types/ResourceProfileTest.java              | 40 ++++++++++----------
 .../flink/yarn/YarnResourceManagerTest.java     |  2 +-
 3 files changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 6eb9af4..87d6fc5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -64,6 +64,9 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        /** How many native memory in mb are needed. */
        private final int nativeMemoryInMB;
 
+       /** Memory used for the task in the slot to communicate with its 
upstreams. Set by job master. */
+       private final int networkMemoryInMB;
+
        /** A extensible field for user specified resources from {@link 
ResourceSpec}. */
        private final Map<String, Resource> extendedResources = new 
HashMap<>(1);
 
@@ -76,6 +79,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
         * @param heapMemoryInMB The size of the heap memory, in megabytes.
         * @param directMemoryInMB The size of the direct memory, in megabytes.
         * @param nativeMemoryInMB The size of the native memory, in megabytes.
+        * @param networkMemoryInMB The size of the memory for input and 
output, in megabytes.
         * @param extendedResources The extended resources such as GPU and FPGA
         */
        public ResourceProfile(
@@ -83,11 +87,13 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                        int heapMemoryInMB,
                        int directMemoryInMB,
                        int nativeMemoryInMB,
+                       int networkMemoryInMB,
                        Map<String, Resource> extendedResources) {
                this.cpuCores = cpuCores;
                this.heapMemoryInMB = heapMemoryInMB;
                this.directMemoryInMB = directMemoryInMB;
                this.nativeMemoryInMB = nativeMemoryInMB;
+               this.networkMemoryInMB = networkMemoryInMB;
                if (extendedResources != null) {
                        this.extendedResources.putAll(extendedResources);
                }
@@ -100,7 +106,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
         * @param heapMemoryInMB The size of the heap memory, in megabytes.
         */
        public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-               this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP);
+               this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.EMPTY_MAP);
        }
 
        /**
@@ -109,7 +115,12 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
         * @param other The ResourceProfile to copy.
         */
        public ResourceProfile(ResourceProfile other) {
-               this(other.cpuCores, other.heapMemoryInMB, 
other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
+               this(other.cpuCores,
+                               other.heapMemoryInMB,
+                               other.directMemoryInMB,
+                               other.nativeMemoryInMB,
+                               other.networkMemoryInMB,
+                               other.extendedResources);
        }
 
        // 
------------------------------------------------------------------------
@@ -151,12 +162,20 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        }
 
        /**
+        * Get the memory needed for task to communicate with its upstreams and 
downstreams in MB.
+        * @return The network memory in MB
+        */
+       public int getNetworkMemoryInMB() {
+               return networkMemoryInMB;
+       }
+
+       /**
         * Get the total memory needed in MB.
         *
         * @return The total memory in MB
         */
        public int getMemoryInMB() {
-               return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+               return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + 
networkMemoryInMB;
        }
 
        /**
@@ -187,7 +206,8 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                if (cpuCores >= required.getCpuCores() &&
                                heapMemoryInMB >= required.getHeapMemoryInMB() 
&&
                                directMemoryInMB >= 
required.getDirectMemoryInMB() &&
-                               nativeMemoryInMB >= 
required.getNativeMemoryInMB()) {
+                               nativeMemoryInMB >= 
required.getNativeMemoryInMB() &&
+                               networkMemoryInMB >= 
required.getNetworkMemoryInMB()) {
                        for (Map.Entry<String, Resource> resource : 
required.extendedResources.entrySet()) {
                                if 
(!extendedResources.containsKey(resource.getKey()) ||
                                                
!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType())
 ||
@@ -241,6 +261,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                result = 31 * result + heapMemoryInMB;
                result = 31 * result + directMemoryInMB;
                result = 31 * result + nativeMemoryInMB;
+               result = 31 * result + networkMemoryInMB;
                result = 31 * result + extendedResources.hashCode();
                return result;
        }
@@ -255,6 +276,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                        return this.cpuCores == that.cpuCores &&
                                        this.heapMemoryInMB == 
that.heapMemoryInMB &&
                                        this.directMemoryInMB == 
that.directMemoryInMB &&
+                                       this.networkMemoryInMB == 
that.networkMemoryInMB &&
                                        Objects.equals(extendedResources, 
that.extendedResources);
                }
                return false;
@@ -270,11 +292,12 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                        "cpuCores=" + cpuCores +
                        ", heapMemoryInMB=" + heapMemoryInMB +
                        ", directMemoryInMB=" + directMemoryInMB +
-                       ", nativeMemoryInMB=" + nativeMemoryInMB + resources +
+                       ", nativeMemoryInMB=" + nativeMemoryInMB +
+                       ", networkMemoryInMB=" + networkMemoryInMB + resources +
                        '}';
        }
 
-       static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) {
+       static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int 
networkMemory) {
                Map<String, Resource> copiedExtendedResources = new 
HashMap<>(resourceSpec.getExtendedResources());
 
                return new ResourceProfile(
@@ -282,6 +305,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                                resourceSpec.getHeapMemory(),
                                resourceSpec.getDirectMemory(),
                                resourceSpec.getNativeMemory(),
+                               networkMemory,
                                copiedExtendedResources);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 25cb5fb..7ed688a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
 
        @Test
        public void testMatchRequirement() throws Exception {
-               ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 
Collections.EMPTY_MAP);
-               ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 
Collections.EMPTY_MAP);
-               ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 
Collections.EMPTY_MAP);
-               ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 
Collections.EMPTY_MAP);
+               ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 
0, Collections.EMPTY_MAP);
+               ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 
0, Collections.EMPTY_MAP);
+               ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 
0, Collections.EMPTY_MAP);
+               ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 
0, Collections.EMPTY_MAP);
 
                assertFalse(rp1.isMatching(rp2));
                assertTrue(rp2.isMatching(rp1));
@@ -50,6 +50,9 @@ public class ResourceProfileTest {
                assertTrue(rp4.isMatching(rp3));
                assertTrue(rp4.isMatching(rp4));
 
+               ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 
100, null);
+               assertFalse(rp4.isMatching(rp5));
+
                ResourceSpec rs1 = ResourceSpec.newBuilder().
                                setCpuCores(1.0).
                                setHeapMemoryInMB(100).
@@ -61,10 +64,9 @@ public class ResourceProfileTest {
                                setGPUResource(1.1).
                                build();
 
-
-               
assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1)));
-               
assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2)));
-               
assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1)));
+               
assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1, 0)));
+               assertTrue(ResourceProfile.fromResourceSpec(rs1, 
0).isMatching(ResourceProfile.fromResourceSpec(rs2, 0)));
+               assertFalse(ResourceProfile.fromResourceSpec(rs2, 
0).isMatching(ResourceProfile.fromResourceSpec(rs1, 0)));
        }
 
        @Test
@@ -76,7 +78,7 @@ public class ResourceProfileTest {
        public void testEquals() throws Exception {
                ResourceSpec rs1 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
                ResourceSpec rs2 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
-               
assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2)));
+               assertTrue(ResourceProfile.fromResourceSpec(rs1, 
0).equals(ResourceProfile.fromResourceSpec(rs2, 0)));
 
                ResourceSpec rs3 = ResourceSpec.newBuilder().
                                setCpuCores(1.0).
@@ -88,37 +90,37 @@ public class ResourceProfileTest {
                                setHeapMemoryInMB(100).
                                setGPUResource(1.1).
                                build();
-               
assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4)));
+               assertFalse(ResourceProfile.fromResourceSpec(rs3, 
0).equals(ResourceProfile.fromResourceSpec(rs4, 0)));
 
                ResourceSpec rs5 = ResourceSpec.newBuilder().
                                setCpuCores(1.0).
                                setHeapMemoryInMB(100).
                                setGPUResource(2.2).
                                build();
-               
assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5)));
+               assertTrue(ResourceProfile.fromResourceSpec(rs3, 
100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
        }
 
        @Test
        public void testCompareTo() throws Exception {
                ResourceSpec rs1 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
                ResourceSpec rs2 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
-               assertEquals(0, 
ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2)));
+               assertEquals(0, ResourceProfile.fromResourceSpec(rs1, 
0).compareTo(ResourceProfile.fromResourceSpec(rs2, 0)));
 
                ResourceSpec rs3 = ResourceSpec.newBuilder().
                                setCpuCores(1.0).
                                setHeapMemoryInMB(100).
                                setGPUResource(2.2).
                                build();
-               assertEquals(-1, 
ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3)));
-               assertEquals(1, 
ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1)));
+               assertEquals(-1, ResourceProfile.fromResourceSpec(rs1,  
0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
+               assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 
0).compareTo(ResourceProfile.fromResourceSpec(rs1, 0)));
 
                ResourceSpec rs4 = ResourceSpec.newBuilder().
                                setCpuCores(1.0).
                                setHeapMemoryInMB(100).
                                setGPUResource(1.1).
                                build();
-               assertEquals(1, 
ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4)));
-               assertEquals(-1, 
ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+               assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 
0).compareTo(ResourceProfile.fromResourceSpec(rs4, 0)));
+               assertEquals(-1, ResourceProfile.fromResourceSpec(rs4, 
0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
 
 
                ResourceSpec rs5 = ResourceSpec.newBuilder().
@@ -126,7 +128,7 @@ public class ResourceProfileTest {
                                setHeapMemoryInMB(100).
                                setGPUResource(2.2).
                                build();
-               assertEquals(0, 
ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5)));
+               assertEquals(0, ResourceProfile.fromResourceSpec(rs3, 
0).compareTo(ResourceProfile.fromResourceSpec(rs5, 0)));
        }
 
        @Test
@@ -136,10 +138,10 @@ public class ResourceProfileTest {
                                setHeapMemoryInMB(100).
                                setGPUResource(1.6).
                                build();
-               ResourceProfile rp = ResourceProfile.fromResourceSpec(rs);
+               ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, 50);
 
                assertEquals(1.0, rp.getCpuCores(), 0.000001);
-               assertEquals(100, rp.getMemoryInMB());
+               assertEquals(150, rp.getMemoryInMB());
                assertEquals(100, rp.getOperatorsMemoryInMB());
                assertEquals(1.6, 
rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index cd08fb9..1e70169 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -347,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger {
                        final SlotReport slotReport = new SlotReport(
                                new SlotStatus(
                                        new SlotID(taskManagerResourceId, 1),
-                                       new ResourceProfile(10, 1, 1, 1, 
Collections.emptyMap())));
+                                       new ResourceProfile(10, 1, 1, 1, 0, 
Collections.emptyMap())));
 
                        CompletableFuture<Integer> numberRegisteredSlotsFuture 
= rmGateway
                                .registerTaskExecutor(

Reply via email to