Repository: flink Updated Branches: refs/heads/master fba72d073 -> 76e3156d0
[FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Summary: ResourceProfile denotes the resource requirements of a task. It should contains: 1. The resource for the operators: the resources in ResourceSpec (please refer to jira-7878) 2. The resource for the task to communicate with its upstreams. 3. The resource for the task to communicate with its downstreams. Now the ResourceProfile only contains the first part. Adding the last two parts. Test Plan: UnitTests Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D330364 This closes #4991. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5643d156 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5643d156 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5643d156 Branch: refs/heads/master Commit: 5643d156cea72314c2240119b30aa32a65a0aeb7 Parents: fba72d0 Author: shuai.xus <[email protected]> Authored: Thu Oct 26 17:38:04 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 14 15:51:18 2017 +0100 ---------------------------------------------------------------------- .../api/common/operators/ResourceSpec.java | 22 ++- .../clusterframework/types/ResourceProfile.java | 146 +++++++++++++++---- .../types/ResourceProfileTest.java | 100 ++++++++++++- .../flink/yarn/YarnResourceManagerTest.java | 3 +- 4 files changed, 234 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java index 4554b54..e82c738 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java @@ -54,7 +54,7 @@ public class ResourceSpec implements Serializable { public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0); - private static final String GPU_NAME = "GPU"; + public static final String GPU_NAME = "GPU"; /** How many cpu cores are needed, use double so we can specify cpu like 0.1. */ private final double cpuCores; @@ -146,7 +146,7 @@ public class ResourceSpec implements Serializable { public double getGPUResource() { Resource gpuResource = extendedResources.get(GPU_NAME); if (gpuResource != null) { - return gpuResource.value; + return gpuResource.getValue(); } return 0.0; } @@ -249,8 +249,8 @@ public class ResourceSpec implements Serializable { */ public static class Builder { - public double cpuCores; - public int heapMemoryInMB; + private double cpuCores; + private int heapMemoryInMB; private int directMemoryInMB; private int nativeMemoryInMB; private int stateSizeInMB; @@ -300,7 +300,7 @@ public class ResourceSpec implements Serializable { /** * Base class for additional resources one can specify. */ - protected abstract static class Resource implements Serializable { + public abstract static class Resource implements Serializable { private static final long serialVersionUID = 1L; @@ -372,6 +372,18 @@ public class ResourceSpec implements Serializable { return result; } + public String getName() { + return this.name; + } + + public ResourceAggregateType getAggregateType() { + return this.type; + } + + public double getValue() { + return this.value; + } + /** * Create a resource of the same resource type. * http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index faa93e5..fc0bf15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -18,19 +18,30 @@ package org.apache.flink.runtime.clusterframework.types; +import org.apache.flink.api.common.operators.ResourceSpec; + import javax.annotation.Nonnull; + import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; /** - * Describe the resource profile of the slot, either when requiring or offering it. The profile can be + * Describe the immutable resource profile of the slot, either when requiring or offering it. The profile can be * checked whether it can match another profile's requirement, and furthermore we may calculate a matching * score to decide which profile we should choose when we have lots of candidate slots. + * It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster. * * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence: * <ol> * <li>Memory Size</li> * <li>CPU cores</li> + * <li>Extended resources</li> * </ol> + * The extended resources are compared ordered by the resource names. */ public class ResourceProfile implements Serializable, Comparable<ResourceProfile> { @@ -52,6 +63,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile /** How many native memory in mb are needed */ private final int nativeMemoryInMB; + /** A extensible field for user specified resources from {@link ResourceSpec}. */ + private final Map<String, ResourceSpec.Resource> extendedResources = new HashMap<>(1); + // ------------------------------------------------------------------------ /** @@ -61,16 +75,21 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile * @param heapMemoryInMB The size of the heap memory, in megabytes. * @param directMemoryInMB The size of the direct memory, in megabytes. * @param nativeMemoryInMB The size of the native memory, in megabytes. + * @param extendedResources The extendiable resources such as GPU and FPGA */ public ResourceProfile( double cpuCores, int heapMemoryInMB, int directMemoryInMB, - int nativeMemoryInMB) { + int nativeMemoryInMB, + Map<String, ResourceSpec.Resource> extendedResources) { this.cpuCores = cpuCores; this.heapMemoryInMB = heapMemoryInMB; this.directMemoryInMB = directMemoryInMB; this.nativeMemoryInMB = nativeMemoryInMB; + if (extendedResources != null) { + this.extendedResources.putAll(extendedResources); + } } /** @@ -80,10 +99,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile * @param heapMemoryInMB The size of the heap memory, in megabytes. */ public ResourceProfile(double cpuCores, int heapMemoryInMB) { - this.cpuCores = cpuCores; - this.heapMemoryInMB = heapMemoryInMB; - this.directMemoryInMB = 0; - this.nativeMemoryInMB = 0; + this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP); } /** @@ -92,16 +108,14 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile * @param other The ResourceProfile to copy. */ public ResourceProfile(ResourceProfile other) { - this.cpuCores = other.cpuCores; - this.heapMemoryInMB = other.heapMemoryInMB; - this.directMemoryInMB = other.directMemoryInMB; - this.nativeMemoryInMB = other.nativeMemoryInMB; + this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources); } // ------------------------------------------------------------------------ /** - * Get the cpu cores needed + * Get the cpu cores needed. + * * @return The cpu cores, 1.0 means a full cpu thread */ public double getCpuCores() { @@ -109,15 +123,17 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile } /** - * Get the heap memory needed in MB + * Get the heap memory needed in MB. + * * @return The heap memory in MB */ - public long getHeapMemoryInMB() { + public int getHeapMemoryInMB() { return heapMemoryInMB; } /** - * Get the direct memory needed in MB + * Get the direct memory needed in MB. + * * @return The direct memory in MB */ public int getDirectMemoryInMB() { @@ -125,7 +141,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile } /** - * Get the native memory needed in MB + * Get the native memory needed in MB. + * * @return The native memory in MB */ public int getNativeMemoryInMB() { @@ -133,7 +150,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile } /** - * Get the total memory needed in MB + * Get the total memory needed in MB. + * * @return The total memory in MB */ public int getMemoryInMB() { @@ -141,23 +159,76 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile } /** - * Check whether required resource profile can be matched + * Get the memory the operators needed in MB. + * + * @return The operator memory in MB + */ + public int getOperatorsMemoryInMB() { + return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB; + } + + /** + * Get the extended resources. + * + * @return The extended resources + */ + public Map<String, ResourceSpec.Resource> getExtendedResources() { + return Collections.unmodifiableMap(extendedResources); + } + + /** + * Check whether required resource profile can be matched. * * @param required the required resource profile * @return true if the requirement is matched, otherwise false */ public boolean isMatching(ResourceProfile required) { - return cpuCores >= required.getCpuCores() && + if (cpuCores >= required.getCpuCores() && heapMemoryInMB >= required.getHeapMemoryInMB() && directMemoryInMB >= required.getDirectMemoryInMB() && - nativeMemoryInMB >= required.getNativeMemoryInMB(); + nativeMemoryInMB >= required.getNativeMemoryInMB()) { + for (Map.Entry<String, ResourceSpec.Resource> resource : required.extendedResources.entrySet()) { + if (!extendedResources.containsKey(resource.getKey()) || + !extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) || + extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) { + return false; + } + } + return true; + } + return false; } @Override public int compareTo(@Nonnull ResourceProfile other) { - int cmp1 = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB()); - int cmp2 = Double.compare(this.cpuCores, other.cpuCores); - return (cmp1 != 0) ? cmp1 : cmp2; + int cmp = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB()); + if (cmp == 0) { + cmp = Double.compare(this.cpuCores, other.cpuCores); + } + if (cmp == 0) { + Iterator<Map.Entry<String, ResourceSpec.Resource>> thisIterator = extendedResources.entrySet().iterator(); + Iterator<Map.Entry<String, ResourceSpec.Resource>> otherIterator = other.extendedResources.entrySet().iterator(); + while (thisIterator.hasNext() && otherIterator.hasNext()) { + Map.Entry<String, ResourceSpec.Resource> thisResource = thisIterator.next(); + Map.Entry<String, ResourceSpec.Resource> otherResource = otherIterator.next(); + if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) { + return cmp; + } + if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) { + return 1; + } + if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) { + return cmp; + } + } + if (thisIterator.hasNext()) { + return 1; + } + if (otherIterator.hasNext()) { + return -1; + } + } + return cmp; } // ------------------------------------------------------------------------ @@ -169,6 +240,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile result = 31 * result + heapMemoryInMB; result = 31 * result + directMemoryInMB; result = 31 * result + nativeMemoryInMB; + result = 31 * result + extendedResources.hashCode(); return result; } @@ -181,20 +253,40 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile ResourceProfile that = (ResourceProfile) obj; return this.cpuCores == that.cpuCores && this.heapMemoryInMB == that.heapMemoryInMB && - this.directMemoryInMB == that.directMemoryInMB; - } - else { - return false; + this.directMemoryInMB == that.directMemoryInMB && + Objects.equals(extendedResources, that.extendedResources); } + return false; } @Override public String toString() { + String resourceStr = ""; + for (Map.Entry<String, ResourceSpec.Resource> resource : extendedResources.entrySet()) { + resourceStr += ", " + resource.getKey() + "=" + resource.getValue(); + } return "ResourceProfile{" + "cpuCores=" + cpuCores + ", heapMemoryInMB=" + heapMemoryInMB + ", directMemoryInMB=" + directMemoryInMB + - ", nativeMemoryInMB=" + nativeMemoryInMB + + ", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr + '}'; } + + public static ResourceProfile fromResourceSpec( + ResourceSpec resourceSpec) { + Map<String, ResourceSpec.Resource> extendResource = new HashMap<>(1); + double gpu = resourceSpec.getGPUResource(); + if (gpu > 0) { + extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu)); + } + ResourceProfile resourceProfile = new ResourceProfile( + resourceSpec.getCpuCores(), + resourceSpec.getHeapMemory(), + resourceSpec.getDirectMemory(), + resourceSpec.getNativeMemory(), + extendResource + ); + return resourceProfile; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java index aacdcfa..25cb5fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java @@ -18,8 +18,12 @@ package org.apache.flink.runtime.clusterframework.types; +import org.apache.flink.api.common.operators.ResourceSpec; import org.junit.Test; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -27,10 +31,10 @@ public class ResourceProfileTest { @Test public void testMatchRequirement() throws Exception { - ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100); - ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200); - ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100); - ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200); + ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, Collections.EMPTY_MAP); + ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, Collections.EMPTY_MAP); + ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, Collections.EMPTY_MAP); + ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, Collections.EMPTY_MAP); assertFalse(rp1.isMatching(rp2)); assertTrue(rp2.isMatching(rp1)); @@ -45,10 +49,98 @@ public class ResourceProfileTest { assertTrue(rp4.isMatching(rp2)); assertTrue(rp4.isMatching(rp3)); assertTrue(rp4.isMatching(rp4)); + + ResourceSpec rs1 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(2.2). + build(); + ResourceSpec rs2 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(1.1). + build(); + + + assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1))); + assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2))); + assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1))); } @Test public void testUnknownMatchesUnknown() { assertTrue(ResourceProfile.UNKNOWN.isMatching(ResourceProfile.UNKNOWN)); } + + @Test + public void testEquals() throws Exception { + ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); + ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); + assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2))); + + ResourceSpec rs3 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(2.2). + build(); + ResourceSpec rs4 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(1.1). + build(); + assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4))); + + ResourceSpec rs5 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(2.2). + build(); + assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5))); + } + + @Test + public void testCompareTo() throws Exception { + ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); + ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build(); + assertEquals(0, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2))); + + ResourceSpec rs3 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(2.2). + build(); + assertEquals(-1, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3))); + assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1))); + + ResourceSpec rs4 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(1.1). + build(); + assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4))); + assertEquals(-1, ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3))); + + + ResourceSpec rs5 = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(2.2). + build(); + assertEquals(0, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5))); + } + + @Test + public void testGet() throws Exception { + ResourceSpec rs = ResourceSpec.newBuilder(). + setCpuCores(1.0). + setHeapMemoryInMB(100). + setGPUResource(1.6). + build(); + ResourceProfile rp = ResourceProfile.fromResourceSpec(rs); + + assertEquals(1.0, rp.getCpuCores(), 0.000001); + assertEquals(100, rp.getMemoryInMB()); + assertEquals(100, rp.getOperatorsMemoryInMB()); + assertEquals(1.6, rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 252b3a8..cd08fb9 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.File; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -346,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger { final SlotReport slotReport = new SlotReport( new SlotStatus( new SlotID(taskManagerResourceId, 1), - new ResourceProfile(10, 1, 1, 1))); + new ResourceProfile(10, 1, 1, 1, Collections.emptyMap()))); CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway .registerTaskExecutor(
