Repository: helix Updated Branches: refs/heads/master ebbd6ba2e -> 4c7661017
[HELIX-717] Add api for get / set quota type, ratio and participant capacity Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4c766101 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4c766101 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4c766101 Branch: refs/heads/master Commit: 4c7661017e856c42b69356665b908444f589fe2c Parents: ebbd6ba Author: Harry Zhang <[email protected]> Authored: Mon Jul 9 14:07:56 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Mon Jul 9 14:13:25 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/model/ClusterConfig.java | 52 +++++++++++++++++--- .../org/apache/helix/model/LiveInstance.java | 33 ++++++++++++- .../java/org/apache/helix/task/TaskConfig.java | 27 ++++++++-- 3 files changed, 99 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/4c766101/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 704ff5d..faa8da5 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -20,11 +20,11 @@ package org.apache.helix.model; */ import com.google.common.collect.Maps; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; - import java.util.Map; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; @@ -65,7 +65,7 @@ public class ClusterConfig extends HelixProperty { TARGET_EXTERNALVIEW_ENABLED, @Deprecated // ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE will take // precedence if it is set - ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state + ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state // transition if the number of partitons that need // recovery exceeds this limitation ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance @@ -73,10 +73,9 @@ public class ClusterConfig extends HelixProperty { // partitons that need recovery or in // error exceeds this limitation DISABLED_INSTANCES, - VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster - VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is - // ViewClusterSourceConfig JSON string - VIEW_CLUSTER_REFRESH_PERIOD, // In second + + // Specifies job types and used for quota allocation + QUOTA_TYPES } private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; @@ -89,6 +88,8 @@ public class ClusterConfig extends HelixProperty { private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30; + public final static String TASK_QUOTA_RATIO_NOT_SET = "-1"; + /** * Instantiate for a specific cluster * @param cluster the cluster identifier @@ -99,6 +100,7 @@ public class ClusterConfig extends HelixProperty { /** * Instantiate with a pre-populated record + * * @param record a ZNRecord corresponding to a cluster configuration */ public ClusterConfig(ZNRecord record) { @@ -106,6 +108,42 @@ public class ClusterConfig extends HelixProperty { } /** + * Set task quota type with the ratio of this quota + * @param quotaType + * @param quotaRatio + */ + public void setTaskQuotaRatio(String quotaType, String quotaRatio) { + if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) { + _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>()); + } + _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) + .put(quotaType, quotaRatio); + } + + /** + * Given quota type, return ratio of the quota. If quota type does not exist, return "0" + * @param quotaType quota type + * @return ratio of quota type + */ + public String getTaskQuotaRatio(String quotaType) { + if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null + || _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).get(quotaType) == null) { + return TASK_QUOTA_RATIO_NOT_SET; + } + + return _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).get(quotaType); + } + + /** + * Get all task quota and their ratios + * + * @return a task quota -> quota ratio mapping + */ + public Map<String, String> getTaskQuotaRatioMap() { + return _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()); + } + + /** * Whether to persist best possible assignment in a resource's idealstate. * * @return @@ -600,4 +638,4 @@ public class ClusterConfig extends HelixProperty { public String getClusterName() { return _record.getId(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/4c766101/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java index c024117..c078e59 100644 --- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java +++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java @@ -19,6 +19,7 @@ package org.apache.helix.model; * under the License. */ +import java.util.Map; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.slf4j.Logger; @@ -35,7 +36,15 @@ public class LiveInstance extends HelixProperty { SESSION_ID, HELIX_VERSION, LIVE_INSTANCE, - ZKPROPERTYTRANSFERURL + ZKPROPERTYTRANSFERURL, + RESOURCE_CAPACITY + } + + /** + * Resource this instance can provide, i.e. thread, memory heap size, CPU cores, etc + */ + public enum InstanceResourceType { + TASK_EXEC_THREAD } private static final Logger _logger = LoggerFactory.getLogger(LiveInstance.class.getName()); @@ -113,6 +122,28 @@ public class LiveInstance extends HelixProperty { } /** + * Get resource quota map of the live instance. Note that this resource name + * refers to compute / storage / network resource that this liveinstance + * has, i.e. thread count, CPU cores, heap size, etc. + * @return resource quota map: key=resourceName, value=quota + */ + public Map<String, String> getResourceCapacityMap() { + return _record.getMapField(LiveInstanceProperty.RESOURCE_CAPACITY.name()); + } + + /** + * Add a resource quota map to this LiveInstance. For resource quota map, key=resourceName; + * value=quota of that resource. We assume that value can be casted into integers. Note that + * this resourceName refers to compute / storage / network resource that this liveinstance + * has, i.e. thread count, CPU cores, heap size, etc. + * + * @param resourceQuotaMap resourceQuotaMap + */ + public void setResourceCapacityMap(Map<String, String> resourceQuotaMap) { + _record.setMapField(LiveInstanceProperty.RESOURCE_CAPACITY.name(), resourceQuotaMap); + } + + /** * Get the last modified time of this live instance * @return UNIX timestamp */ http://git-wip-us.apache.org/repos/asf/helix/blob/4c766101/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java index ad1fc73..a447929 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java @@ -19,16 +19,14 @@ package org.apache.helix.task; * under the License. */ +import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; import java.util.UUID; - import org.apache.helix.task.beans.TaskBean; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.common.collect.Maps; /** * Configuration for an individual task to be run as part of a job. @@ -39,10 +37,12 @@ public class TaskConfig { TASK_COMMAND, @Deprecated TASK_SUCCESS_OPTIONAL, - TASK_TARGET_PARTITION + TASK_TARGET_PARTITION, + TASK_QUOTA_TYPE } private static final Logger LOG = LoggerFactory.getLogger(TaskConfig.class); + public static final String QUOTA_TYPE_NOT_SET = "QUOTA_TYPE_NOT_SET"; private final Map<String, String> _configMap; @@ -120,6 +120,23 @@ public class TaskConfig { } /** + * Set the quota type of this task + * @param quotaType + */ + public void setQuotaType(String quotaType) { + _configMap.put(TaskConfigProperty.TASK_QUOTA_TYPE.name(), quotaType); + } + + /** + * Return the quota type of this task + * @return + */ + public String getQuotaType() { + return _configMap.containsKey(TaskConfigProperty.TASK_QUOTA_TYPE.name()) ? + _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : QUOTA_TYPE_NOT_SET; + } + + /** * Check if this task must succeed for a job to succeed * This field has been ignored by Helix * @return true if success is optional, false otherwise
