Repository: helix Updated Branches: refs/heads/master 7ec5313bc -> b235c4ee5
[HELIX-775] consolidate user content related apis for task driver Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b235c4ee Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b235c4ee Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b235c4ee Branch: refs/heads/master Commit: b235c4ee5a82c5970d29e839317ea242813a58bc Parents: 7ec5313 Author: Harry Zhang <hrzh...@linkedin.com> Authored: Thu Oct 4 11:25:08 2018 -0700 Committer: Harry Zhang <hrzh...@linkedin.com> Committed: Wed Oct 31 14:03:37 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 64 +++++++++++++------- .../java/org/apache/helix/task/TaskUtil.java | 18 +++--- .../helix/task/TestGetSetUserContentStore.java | 53 +++++++++------- 3 files changed, 83 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 25a4fe4..e675c86 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -1022,7 +1022,12 @@ public class TaskDriver { * @param taskName name of task. Optional if scope is WORKFLOW or JOB * @return null if key-value pair not found or this content store does not exist. Otherwise, * return a String + * + * @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)}, + * {@link #getJobUserContentMap(String, String)}, + * @{{@link #getTaskContentMap(String, String, String)}} */ + @Deprecated public String getUserContent(String key, UserContentStore.Scope scope, String workflowName, String jobName, String taskName) { return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName); @@ -1055,36 +1060,53 @@ public class TaskDriver { * @param taskPartitionId task partition id * @return user content map */ - public Map<String, String> getTaskContentMap(String workflowName, String jobName, String taskPartitionId) { + public Map<String, String> getTaskUserContentMap(String workflowName, String jobName, + String taskPartitionId) { String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId); return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName); } + /** + * Add or update workflow user content with the given map - new keys will be added, and old + * keys will be updated + * @param workflowName workflow name + * @param contentToAddOrUpdate map containing items to add or update + */ + public void addOrUpdateWorkflowUserContentMap(String workflowName, + final Map<String, String> contentToAddOrUpdate) { + TaskUtil + .addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName, contentToAddOrUpdate); + } + /** + * Add or update job user content with the given map - new keys will be added, and old keys will + * be updated + * @param workflowName workflow name + * @param jobName Un-namespaced job name + * @param contentToAddOrUpdate map containing items to add or update + */ + public void addOrUpdateJobUserContentMap(String workflowName, String jobName, + final Map<String, String> contentToAddOrUpdate) { + String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); + TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, namespacedJobName, + contentToAddOrUpdate); + } /** - * Set user content defined by the given key and string - * @param key content key - * @param value content value - * @param workflowName name of the workflow - must provide when scope is WORKFLOW - * @param jobName name of the job - must provide when scope is JOB or TASK - * @param taskName name of the task - must provide when scope is TASK - * @param scope scope of the content + * Add or update task user content with the given map - new keys will be added, and old keys + * will be updated + * @param workflowName workflow name + * @param jobName Un-namespaced job name + * @param taskPartitionId task partition id + * @param contentToAddOrUpdate map containing items to add or update */ - public void addUserContent(String key, String value, String workflowName, String jobName, String taskName, - UserContentStore.Scope scope) { - switch (scope) { - case WORKFLOW: - TaskUtil.addWorkflowJobUserContent(_propertyStore, workflowName, key, value); - break; - case JOB: - TaskUtil.addWorkflowJobUserContent(_propertyStore, jobName, key, value); - break; - default: - TaskUtil.addTaskUserContent(_propertyStore, jobName, taskName, key, value); - break; - } + public void addOrUpdateTaskUserContentMap(String workflowName, String jobName, + String taskPartitionId, final Map<String, String> contentToAddOrUpdate) { + String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); + String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId); + TaskUtil.addOrUpdateTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName, + contentToAddOrUpdate); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 5581b6f..379026d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -346,12 +346,13 @@ public class TaskUtil { */ protected static void addWorkflowJobUserContent(final HelixManager manager, String workflowJobResource, final String key, final String value) { - addWorkflowJobUserContent(manager.getHelixPropertyStore(), workflowJobResource, key, value); + addOrUpdateWorkflowJobUserContentMap(manager.getHelixPropertyStore(), workflowJobResource, + Collections.singletonMap(key, value)); } /* package */ - static void addWorkflowJobUserContent(final HelixPropertyStore<ZNRecord> propertyStore, - String workflowJobResource, final String key, final String value) { + static void addOrUpdateWorkflowJobUserContentMap(final HelixPropertyStore<ZNRecord> propertyStore, + String workflowJobResource, final Map<String, String> contentToAddOrUpdate) { if (workflowJobResource == null) { throw new IllegalArgumentException("workflowJobResource must be not null when adding workflow / job user content"); } @@ -361,7 +362,7 @@ public class TaskUtil { propertyStore.update(path, new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord znRecord) { - znRecord.setSimpleField(key, value); + znRecord.getSimpleFields().putAll(contentToAddOrUpdate); return znRecord; } }, AccessOption.PERSISTENT); @@ -407,12 +408,13 @@ public class TaskUtil { */ protected static void addTaskUserContent(final HelixManager manager, String job, final String task, final String key, final String value) { - addTaskUserContent(manager.getHelixPropertyStore(), job, task, key, value); + addOrUpdateTaskUserContentMap(manager.getHelixPropertyStore(), job, task, + Collections.singletonMap(key, value)); } /* package */ - static void addTaskUserContent(final HelixPropertyStore<ZNRecord> propertyStore, - final String job, final String task, final String key, final String value) { + static void addOrUpdateTaskUserContentMap(final HelixPropertyStore<ZNRecord> propertyStore, + final String job, final String task, final Map<String, String> contentToAddOrUpdate) { if (job == null || task == null) { throw new IllegalArgumentException("job and task must be not null when adding task user content"); } @@ -425,7 +427,7 @@ public class TaskUtil { if (znRecord.getMapField(task) == null) { znRecord.setMapField(task, new HashMap<String, String>()); } - znRecord.getMapField(task).put(key, value); + znRecord.getMapField(task).putAll(contentToAddOrUpdate); return znRecord; } }, AccessOption.PERSISTENT); http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java index d4ba29a..65f6b07 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,12 +55,12 @@ public class TestGetSetUserContentStore extends TaskTestBase { private class TaskRecord { String workflowName; String jobName; - String taskName; + String taskPartitionId; public TaskRecord(String workflow, String job, String task) { workflowName = workflow; jobName = job; - taskName = task; + taskPartitionId = task; } } @@ -131,11 +132,10 @@ public class TestGetSetUserContentStore extends TaskTestBase { taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>())); JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap); - String jobSuffix = "JOB" + i; - String jobName = workflowName + "_" + jobSuffix; - String taskName = jobName + "_0"; - workflowBuilder.addJob("JOB" + i, jobConfigBulider); - recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskName)); + String jobName = "JOB" + i; + String taskPartitionId = "0"; + workflowBuilder.addJob(jobName, jobConfigBulider); + recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskPartitionId)); } // Start the workflow and wait for all tasks started @@ -143,33 +143,40 @@ public class TestGetSetUserContentStore extends TaskTestBase { allTasksReady.await(); // add "workflow":"workflow" to the workflow's user content - _driver.addUserContent(workflowName, workflowName, workflowName, null, null, UserContentStore.Scope.WORKFLOW); + _driver.addOrUpdateWorkflowUserContentMap(workflowName, + Collections.singletonMap(workflowName, workflowName)); for (TaskRecord rec : recordMap.values()) { // add "job":"job" to the job's user content - _driver.addUserContent(rec.jobName, rec.jobName, null, rec.jobName, null, UserContentStore.Scope.JOB); - // String taskId = _driver.getJobContext(rec.jobName).getTaskIdForPartition(0); - + String namespacedJobName = TaskUtil.getNamespacedJobName(rec.workflowName, rec.jobName); + _driver.addOrUpdateJobUserContentMap(rec.workflowName, rec.jobName, + Collections.singletonMap(namespacedJobName, namespacedJobName)); + String namespacedTaskName = + TaskUtil.getNamespacedTaskName(namespacedJobName, rec.taskPartitionId); // add "taskId":"taskId" to the task's user content - _driver.addUserContent(rec.taskName, rec.taskName, null, rec.jobName, rec.taskName, UserContentStore.Scope.TASK); + _driver.addOrUpdateTaskUserContentMap(rec.workflowName, rec.jobName, rec.taskPartitionId, + Collections.singletonMap(namespacedTaskName, namespacedTaskName)); } adminReady.countDown(); _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); // Aggregate key-value mappings in UserContentStore for (TaskRecord rec : recordMap.values()) { - Assert.assertEquals(_driver - .getUserContent(TaskDumpResultKey.WorkflowContent.name(), UserContentStore.Scope.WORKFLOW, - rec.workflowName, rec.jobName, rec.taskName), + Assert.assertEquals(_driver.getWorkflowUserContentMap(rec.workflowName) + .get(TaskDumpResultKey.WorkflowContent.name()), constructContentStoreResultString(rec.workflowName, rec.workflowName)); - Assert.assertEquals(_driver - .getUserContent(TaskDumpResultKey.JobContent.name(), UserContentStore.Scope.JOB, - rec.workflowName, rec.jobName, rec.taskName), - constructContentStoreResultString(rec.jobName, rec.jobName)); - Assert.assertEquals(_driver - .getUserContent(TaskDumpResultKey.TaskContent.name(), UserContentStore.Scope.TASK, - rec.workflowName, rec.jobName, rec.taskName), - constructContentStoreResultString(rec.taskName, rec.taskName)); + + String namespacedJobName = TaskUtil.getNamespacedJobName(rec.workflowName, rec.jobName); + Assert.assertEquals(_driver.getJobUserContentMap(rec.workflowName, rec.jobName) + .get(TaskDumpResultKey.JobContent.name()), + constructContentStoreResultString(namespacedJobName, namespacedJobName)); + + String namespacedTaskName = + TaskUtil.getNamespacedTaskName(namespacedJobName, rec.taskPartitionId); + Assert.assertEquals( + _driver.getTaskUserContentMap(rec.workflowName, rec.jobName, rec.taskPartitionId) + .get(TaskDumpResultKey.TaskContent.name()), + constructContentStoreResultString(namespacedTaskName, namespacedTaskName)); } }