Repository: helix Updated Branches: refs/heads/master 1103fecb6 -> 7ec5313bc
[HELIX-775] add task driver support for helix rest to add/get task framework user content Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7ec5313b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7ec5313b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7ec5313b Branch: refs/heads/master Commit: 7ec5313bccb679014d6a0605ee5d7184063e555e Parents: 1103fec Author: Harry Zhang <hrzh...@linkedin.com> Authored: Wed Oct 31 13:55:44 2018 -0700 Committer: Harry Zhang <hrzh...@linkedin.com> Committed: Wed Oct 31 13:55:44 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 35 ++++++++++++++ .../java/org/apache/helix/task/TaskUtil.java | 50 +++++++++++++++++--- 2 files changed, 78 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7ec5313b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 54e3ab3..25a4fe4 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -1029,6 +1029,41 @@ public class TaskDriver { } /** + * Return the full user content map for workflow + * @param workflowName workflow name + * @return user content map + */ + public Map<String, String> getWorkflowUserContentMap(String workflowName) { + return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, workflowName); + } + + /** + * Return full user content map for job + * @param workflowName workflow name + * @param jobName Un-namespaced job name + * @return user content map + */ + public Map<String, String> getJobUserContentMap(String workflowName, String jobName) { + String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); + return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, namespacedJobName); + } + + /** + * Return full user content map for task + * @param workflowName workflow name + * @param jobName Un-namespaced job name + * @param taskPartitionId task partition id + * @return user content map + */ + public Map<String, String> getTaskContentMap(String workflowName, String jobName, String taskPartitionId) { + String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); + String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId); + return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName); + } + + + + /** * Set user content defined by the given key and string * @param key content key * @param value content value http://git-wip-us.apache.org/repos/asf/helix/blob/7ec5313b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 3461233..5581b6f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -26,7 +26,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; @@ -320,9 +319,22 @@ public class TaskUtil { */ protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource, String key) { - ZNRecord r = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, - workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); - return r != null ? r.getSimpleField(key) : null; + Map<String, String> userContentMap = getWorkflowJobUserContentMap(propertyStore, workflowJobResource); + return userContentMap != null ? userContentMap.get(key) : null; + } + + /** + * get workflow/job user content map + * @param propertyStore property store + * @param workflowJobResource workflow name or namespaced job name + * @return user content map + */ + protected static Map<String, String> getWorkflowJobUserContentMap( + HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource) { + ZNRecord record = propertyStore.get(Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null, + AccessOption.PERSISTENT); + return record != null ? record.getSimpleFields() : null; } /** @@ -365,10 +377,24 @@ public class TaskUtil { */ protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job, String task, String key) { - ZNRecord r = propertyStore.get( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null, + Map<String, String> userContentStore = getTaskUserContentMap(propertyStore, job, task); + return userContentStore != null ? userContentStore.get(key) : null; + } + + /** + * Return full task user content map + * @param propertyStore property store + * @param namespacedJobName namespaced job name + * @param taskPartitionId task partition id + * @return + */ + protected static Map<String, String> getTaskUserContentMap( + HelixPropertyStore<ZNRecord> propertyStore, String namespacedJobName, + String taskPartitionId) { + ZNRecord record = propertyStore.get(Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); - return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null; + return record != null ? record.getMapField(taskPartitionId) : null; } /** @@ -449,6 +475,16 @@ public class TaskUtil { } /** + * get a task name, namespaced by it's job and workflow + * @param namespacedJobName namespaced job name + * @param taskPartitionId task partition id + * @return + */ + public static String getNamespacedTaskName(String namespacedJobName, String taskPartitionId) { + return String.format("%s_%s", namespacedJobName, taskPartitionId); + } + + /** * Remove the workflow namespace from the job name * @param workflow the name of the workflow that owns the job * @param jobName the namespaced job name