Repository: flink
Updated Branches:
  refs/heads/master fba72d073 -> 76e3156d0


[FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely 
calculating the resource of task manager

Summary:
ResourceProfile denotes the resource requirements of a task. It should contains:
1. The resource for the operators: the resources in ResourceSpec (please refer 
to jira-7878)
2. The resource for the task to communicate with its upstreams.
3. The resource for the task to communicate with its downstreams.
Now the ResourceProfile only contains the first part. Adding the last two parts.

Test Plan: UnitTests

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D330364

This closes #4991.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5643d156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5643d156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5643d156

Branch: refs/heads/master
Commit: 5643d156cea72314c2240119b30aa32a65a0aeb7
Parents: fba72d0
Author: shuai.xus <[email protected]>
Authored: Thu Oct 26 17:38:04 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Thu Dec 14 15:51:18 2017 +0100

----------------------------------------------------------------------
 .../api/common/operators/ResourceSpec.java      |  22 ++-
 .../clusterframework/types/ResourceProfile.java | 146 +++++++++++++++----
 .../types/ResourceProfileTest.java              | 100 ++++++++++++-
 .../flink/yarn/YarnResourceManagerTest.java     |   3 +-
 4 files changed, 234 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 4554b54..e82c738 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -54,7 +54,7 @@ public class ResourceSpec implements Serializable {
 
        public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 
0);
 
-       private static final String GPU_NAME = "GPU";
+       public static final String GPU_NAME = "GPU";
 
        /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1. */
        private final double cpuCores;
@@ -146,7 +146,7 @@ public class ResourceSpec implements Serializable {
        public double getGPUResource() {
                Resource gpuResource = extendedResources.get(GPU_NAME);
                if (gpuResource != null) {
-                       return gpuResource.value;
+                       return gpuResource.getValue();
                }
                return 0.0;
        }
@@ -249,8 +249,8 @@ public class ResourceSpec implements Serializable {
         */
        public static class Builder {
 
-               public double cpuCores;
-               public int heapMemoryInMB;
+               private double cpuCores;
+               private int heapMemoryInMB;
                private int directMemoryInMB;
                private int nativeMemoryInMB;
                private int stateSizeInMB;
@@ -300,7 +300,7 @@ public class ResourceSpec implements Serializable {
        /**
         * Base class for additional resources one can specify.
         */
-       protected abstract static class Resource implements Serializable {
+       public abstract static class Resource implements Serializable {
 
                private static final long serialVersionUID = 1L;
 
@@ -372,6 +372,18 @@ public class ResourceSpec implements Serializable {
                        return result;
                }
 
+               public String getName() {
+                       return this.name;
+               }
+
+               public ResourceAggregateType getAggregateType() {
+                       return this.type;
+               }
+
+               public double getValue() {
+                       return this.value;
+               }
+
                /**
                 * Create a resource of the same resource type.
                 *

http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index faa93e5..fc0bf15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,19 +18,30 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.operators.ResourceSpec;
+
 import javax.annotation.Nonnull;
+
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * Describe the resource profile of the slot, either when requiring or 
offering it. The profile can be
+ * Describe the immutable resource profile of the slot, either when requiring 
or offering it. The profile can be
  * checked whether it can match another profile's requirement, and furthermore 
we may calculate a matching
  * score to decide which profile we should choose when we have lots of 
candidate slots.
+ * It should be generated from {@link ResourceSpec} with the input and output 
memory calculated in JobMaster.
  * 
  * <p>Resource Profiles have a total ordering, defined by comparing these 
fields in sequence:
  * <ol>
  *     <li>Memory Size</li>
  *     <li>CPU cores</li>
+ *     <li>Extended resources</li>
  * </ol>
+ * The extended resources are compared ordered by the resource names.
  */
 public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile> {
 
@@ -52,6 +63,9 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        /** How many native memory in mb are needed */
        private final int nativeMemoryInMB;
 
+       /** A extensible field for user specified resources from {@link 
ResourceSpec}. */
+       private final Map<String, ResourceSpec.Resource> extendedResources = 
new HashMap<>(1);
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -61,16 +75,21 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
         * @param heapMemoryInMB The size of the heap memory, in megabytes.
         * @param directMemoryInMB The size of the direct memory, in megabytes.
         * @param nativeMemoryInMB The size of the native memory, in megabytes.
+        * @param extendedResources The extendiable resources such as GPU and 
FPGA
         */
        public ResourceProfile(
                        double cpuCores,
                        int heapMemoryInMB,
                        int directMemoryInMB,
-                       int nativeMemoryInMB) {
+                       int nativeMemoryInMB,
+                       Map<String, ResourceSpec.Resource> extendedResources) {
                this.cpuCores = cpuCores;
                this.heapMemoryInMB = heapMemoryInMB;
                this.directMemoryInMB = directMemoryInMB;
                this.nativeMemoryInMB = nativeMemoryInMB;
+               if (extendedResources != null) {
+                       this.extendedResources.putAll(extendedResources);
+               }
        }
 
        /**
@@ -80,10 +99,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
         * @param heapMemoryInMB The size of the heap memory, in megabytes.
         */
        public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-               this.cpuCores = cpuCores;
-               this.heapMemoryInMB = heapMemoryInMB;
-               this.directMemoryInMB = 0;
-               this.nativeMemoryInMB = 0;
+               this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP);
        }
 
        /**
@@ -92,16 +108,14 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
         * @param other The ResourceProfile to copy. 
         */
        public ResourceProfile(ResourceProfile other) {
-               this.cpuCores = other.cpuCores;
-               this.heapMemoryInMB = other.heapMemoryInMB;
-               this.directMemoryInMB = other.directMemoryInMB;
-               this.nativeMemoryInMB = other.nativeMemoryInMB;
+               this(other.cpuCores, other.heapMemoryInMB, 
other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
        }
 
        // 
------------------------------------------------------------------------
 
        /**
-        * Get the cpu cores needed
+        * Get the cpu cores needed.
+        *
         * @return The cpu cores, 1.0 means a full cpu thread
         */
        public double getCpuCores() {
@@ -109,15 +123,17 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        }
 
        /**
-        * Get the heap memory needed in MB
+        * Get the heap memory needed in MB.
+        *
         * @return The heap memory in MB
         */
-       public long getHeapMemoryInMB() {
+       public int getHeapMemoryInMB() {
                return heapMemoryInMB;
        }
 
        /**
-        * Get the direct memory needed in MB
+        * Get the direct memory needed in MB.
+        *
         * @return The direct memory in MB
         */
        public int getDirectMemoryInMB() {
@@ -125,7 +141,8 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        }
 
        /**
-        * Get the native memory needed in MB
+        * Get the native memory needed in MB.
+        *
         * @return The native memory in MB
         */
        public int getNativeMemoryInMB() {
@@ -133,7 +150,8 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        }
 
        /**
-        * Get the total memory needed in MB
+        * Get the total memory needed in MB.
+        *
         * @return The total memory in MB
         */
        public int getMemoryInMB() {
@@ -141,23 +159,76 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
        }
 
        /**
-        * Check whether required resource profile can be matched
+        * Get the memory the operators needed in MB.
+        *
+        * @return The operator memory in MB
+        */
+       public int getOperatorsMemoryInMB() {
+               return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+       }
+
+       /**
+        * Get the extended resources.
+        *
+        * @return The extended resources
+        */
+       public Map<String, ResourceSpec.Resource> getExtendedResources() {
+               return Collections.unmodifiableMap(extendedResources);
+       }
+
+       /**
+        * Check whether required resource profile can be matched.
         *
         * @param required the required resource profile
         * @return true if the requirement is matched, otherwise false
         */
        public boolean isMatching(ResourceProfile required) {
-               return cpuCores >= required.getCpuCores() &&
+               if (cpuCores >= required.getCpuCores() &&
                                heapMemoryInMB >= required.getHeapMemoryInMB() 
&&
                                directMemoryInMB >= 
required.getDirectMemoryInMB() &&
-                               nativeMemoryInMB >= 
required.getNativeMemoryInMB();
+                               nativeMemoryInMB >= 
required.getNativeMemoryInMB()) {
+                       for (Map.Entry<String, ResourceSpec.Resource> resource 
: required.extendedResources.entrySet()) {
+                               if 
(!extendedResources.containsKey(resource.getKey()) ||
+                                               
!extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType())
 ||
+                                               
extendedResources.get(resource.getKey()).getValue() < 
resource.getValue().getValue()) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+               return false;
        }
 
        @Override
        public int compareTo(@Nonnull ResourceProfile other) {
-               int cmp1 = Integer.compare(this.getMemoryInMB(), 
other.getMemoryInMB());
-               int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
-               return (cmp1 != 0) ? cmp1 : cmp2;
+               int cmp = Integer.compare(this.getMemoryInMB(), 
other.getMemoryInMB());
+               if (cmp == 0) {
+                       cmp = Double.compare(this.cpuCores, other.cpuCores);
+               }
+               if (cmp == 0) {
+                       Iterator<Map.Entry<String, ResourceSpec.Resource>> 
thisIterator = extendedResources.entrySet().iterator();
+                       Iterator<Map.Entry<String, ResourceSpec.Resource>> 
otherIterator = other.extendedResources.entrySet().iterator();
+                       while (thisIterator.hasNext() && 
otherIterator.hasNext()) {
+                               Map.Entry<String, ResourceSpec.Resource> 
thisResource = thisIterator.next();
+                               Map.Entry<String, ResourceSpec.Resource> 
otherResource = otherIterator.next();
+                               if ((cmp = 
otherResource.getKey().compareTo(thisResource.getKey())) != 0) {
+                                       return cmp;
+                               }
+                               if 
(!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType()))
 {
+                                       return 1;
+                               }
+                               if ((cmp = 
Double.compare(thisResource.getValue().getValue(), 
otherResource.getValue().getValue())) != 0) {
+                                       return cmp;
+                               }
+                       }
+                       if (thisIterator.hasNext()) {
+                               return 1;
+                       }
+                       if (otherIterator.hasNext()) {
+                               return -1;
+                       }
+               }
+               return cmp;
        }
 
        // 
------------------------------------------------------------------------
@@ -169,6 +240,7 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                result = 31 * result + heapMemoryInMB;
                result = 31 * result + directMemoryInMB;
                result = 31 * result + nativeMemoryInMB;
+               result = 31 * result + extendedResources.hashCode();
                return result;
        }
 
@@ -181,20 +253,40 @@ public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile
                        ResourceProfile that = (ResourceProfile) obj;
                        return this.cpuCores == that.cpuCores &&
                                        this.heapMemoryInMB == 
that.heapMemoryInMB &&
-                                       this.directMemoryInMB == 
that.directMemoryInMB;
-               }
-               else {
-                       return false;
+                                       this.directMemoryInMB == 
that.directMemoryInMB &&
+                                       Objects.equals(extendedResources, 
that.extendedResources);
                }
+               return false;
        }
 
        @Override
        public String toString() {
+               String resourceStr = "";
+               for (Map.Entry<String, ResourceSpec.Resource> resource : 
extendedResources.entrySet()) {
+                       resourceStr += ", " + resource.getKey() + "=" + 
resource.getValue();
+               }
                return "ResourceProfile{" +
                        "cpuCores=" + cpuCores +
                        ", heapMemoryInMB=" + heapMemoryInMB +
                        ", directMemoryInMB=" + directMemoryInMB +
-                       ", nativeMemoryInMB=" + nativeMemoryInMB +
+                       ", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr +
                        '}';
        }
+
+       public static ResourceProfile fromResourceSpec(
+                       ResourceSpec resourceSpec) {
+               Map<String, ResourceSpec.Resource> extendResource = new 
HashMap<>(1);
+               double gpu = resourceSpec.getGPUResource();
+               if (gpu > 0) {
+                       extendResource.put(ResourceSpec.GPU_NAME, new 
ResourceSpec.GPUResource(gpu));
+               }
+               ResourceProfile resourceProfile = new ResourceProfile(
+                               resourceSpec.getCpuCores(),
+                               resourceSpec.getHeapMemory(),
+                               resourceSpec.getDirectMemory(),
+                               resourceSpec.getNativeMemory(),
+                               extendResource
+               );
+               return resourceProfile;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index aacdcfa..25cb5fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.junit.Test;
 
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -27,10 +31,10 @@ public class ResourceProfileTest {
 
        @Test
        public void testMatchRequirement() throws Exception {
-               ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100);
-               ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200);
-               ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100);
-               ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200);
+               ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 
Collections.EMPTY_MAP);
+               ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 
Collections.EMPTY_MAP);
+               ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 
Collections.EMPTY_MAP);
+               ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 
Collections.EMPTY_MAP);
 
                assertFalse(rp1.isMatching(rp2));
                assertTrue(rp2.isMatching(rp1));
@@ -45,10 +49,98 @@ public class ResourceProfileTest {
                assertTrue(rp4.isMatching(rp2));
                assertTrue(rp4.isMatching(rp3));
                assertTrue(rp4.isMatching(rp4));
+
+               ResourceSpec rs1 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(2.2).
+                               build();
+               ResourceSpec rs2 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(1.1).
+                               build();
+
+
+               
assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1)));
+               
assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2)));
+               
assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1)));
        }
 
        @Test
        public void testUnknownMatchesUnknown() {
                
assertTrue(ResourceProfile.UNKNOWN.isMatching(ResourceProfile.UNKNOWN));
        }
+
+       @Test
+       public void testEquals() throws Exception {
+               ResourceSpec rs1 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+               ResourceSpec rs2 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+               
assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2)));
+
+               ResourceSpec rs3 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(2.2).
+                               build();
+               ResourceSpec rs4 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(1.1).
+                               build();
+               
assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4)));
+
+               ResourceSpec rs5 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(2.2).
+                               build();
+               
assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5)));
+       }
+
+       @Test
+       public void testCompareTo() throws Exception {
+               ResourceSpec rs1 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+               ResourceSpec rs2 = 
ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+               assertEquals(0, 
ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2)));
+
+               ResourceSpec rs3 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(2.2).
+                               build();
+               assertEquals(-1, 
ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+               assertEquals(1, 
ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1)));
+
+               ResourceSpec rs4 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(1.1).
+                               build();
+               assertEquals(1, 
ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4)));
+               assertEquals(-1, 
ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+
+
+               ResourceSpec rs5 = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(2.2).
+                               build();
+               assertEquals(0, 
ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5)));
+       }
+
+       @Test
+       public void testGet() throws Exception {
+               ResourceSpec rs = ResourceSpec.newBuilder().
+                               setCpuCores(1.0).
+                               setHeapMemoryInMB(100).
+                               setGPUResource(1.6).
+                               build();
+               ResourceProfile rp = ResourceProfile.fromResourceSpec(rs);
+
+               assertEquals(1.0, rp.getCpuCores(), 0.000001);
+               assertEquals(100, rp.getMemoryInMB());
+               assertEquals(100, rp.getOperatorsMemoryInMB());
+               assertEquals(1.6, 
rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 252b3a8..cd08fb9 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -346,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger {
                        final SlotReport slotReport = new SlotReport(
                                new SlotStatus(
                                        new SlotID(taskManagerResourceId, 1),
-                                       new ResourceProfile(10, 1, 1, 1)));
+                                       new ResourceProfile(10, 1, 1, 1, 
Collections.emptyMap())));
 
                        CompletableFuture<Integer> numberRegisteredSlotsFuture 
= rmGateway
                                .registerTaskExecutor(

Reply via email to