Repository: helix
Updated Branches:
  refs/heads/master 7ec5313bc -> b235c4ee5


[HELIX-775] consolidate user content related apis for task driver


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b235c4ee
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b235c4ee
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b235c4ee

Branch: refs/heads/master
Commit: b235c4ee5a82c5970d29e839317ea242813a58bc
Parents: 7ec5313
Author: Harry Zhang <hrzh...@linkedin.com>
Authored: Thu Oct 4 11:25:08 2018 -0700
Committer: Harry Zhang <hrzh...@linkedin.com>
Committed: Wed Oct 31 14:03:37 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 64 +++++++++++++-------
 .../java/org/apache/helix/task/TaskUtil.java    | 18 +++---
 .../helix/task/TestGetSetUserContentStore.java  | 53 +++++++++-------
 3 files changed, 83 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 25a4fe4..e675c86 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -1022,7 +1022,12 @@ public class TaskDriver {
    * @param taskName name of task. Optional if scope is WORKFLOW or JOB
    * @return null if key-value pair not found or this content store does not 
exist. Otherwise,
    *         return a String
+   *
+   * @deprecated use the following equivalents: {@link 
#getWorkflowUserContentMap(String)},
+   * {@link #getJobUserContentMap(String, String)},
+   * @{{@link #getTaskContentMap(String, String, String)}}
    */
+  @Deprecated
   public String getUserContent(String key, UserContentStore.Scope scope, 
String workflowName,
       String jobName, String taskName) {
     return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, 
jobName, taskName);
@@ -1055,36 +1060,53 @@ public class TaskDriver {
    * @param taskPartitionId task partition id
    * @return user content map
    */
-  public Map<String, String> getTaskContentMap(String workflowName, String 
jobName, String taskPartitionId) {
+  public Map<String, String> getTaskUserContentMap(String workflowName, String 
jobName,
+      String taskPartitionId) {
     String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, 
jobName);
     String namespacedTaskName = 
TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
     return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, 
namespacedTaskName);
   }
 
+  /**
+   * Add or update workflow user content with the given map - new keys will be 
added, and old
+   * keys will be updated
+   * @param workflowName workflow name
+   * @param contentToAddOrUpdate map containing items to add or update
+   */
+  public void addOrUpdateWorkflowUserContentMap(String workflowName,
+      final Map<String, String> contentToAddOrUpdate) {
+    TaskUtil
+        .addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName, 
contentToAddOrUpdate);
+  }
 
+  /**
+   * Add or update job user content with the given map - new keys will be 
added, and old keys will
+   * be updated
+   * @param workflowName workflow name
+   * @param jobName Un-namespaced job name
+   * @param contentToAddOrUpdate map containing items to add or update
+   */
+  public void addOrUpdateJobUserContentMap(String workflowName, String jobName,
+      final Map<String, String> contentToAddOrUpdate) {
+    String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, 
jobName);
+    TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, 
namespacedJobName,
+        contentToAddOrUpdate);
+  }
 
   /**
-   * Set user content defined by the given key and string
-   * @param key content key
-   * @param value content value
-   * @param workflowName name of the workflow - must provide when scope is 
WORKFLOW
-   * @param jobName name of the job - must provide when scope is JOB or TASK
-   * @param taskName name of the task - must provide when scope is TASK
-   * @param scope scope of the content
+   * Add or update task user content with the given map - new keys will be 
added, and old keys
+   * will be updated
+   * @param workflowName workflow name
+   * @param jobName Un-namespaced job name
+   * @param taskPartitionId task partition id
+   * @param contentToAddOrUpdate map containing items to add or update
    */
-  public void addUserContent(String key, String value, String workflowName, 
String jobName, String taskName,
-      UserContentStore.Scope scope) {
-    switch (scope) {
-    case WORKFLOW:
-      TaskUtil.addWorkflowJobUserContent(_propertyStore, workflowName, key, 
value);
-      break;
-    case JOB:
-      TaskUtil.addWorkflowJobUserContent(_propertyStore, jobName, key, value);
-      break;
-    default:
-      TaskUtil.addTaskUserContent(_propertyStore, jobName, taskName, key, 
value);
-      break;
-    }
+  public void addOrUpdateTaskUserContentMap(String workflowName, String 
jobName,
+      String taskPartitionId, final Map<String, String> contentToAddOrUpdate) {
+    String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, 
jobName);
+    String namespacedTaskName = 
TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
+    TaskUtil.addOrUpdateTaskUserContentMap(_propertyStore, namespacedJobName, 
namespacedTaskName,
+        contentToAddOrUpdate);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 5581b6f..379026d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -346,12 +346,13 @@ public class TaskUtil {
    */
   protected static void addWorkflowJobUserContent(final HelixManager manager,
       String workflowJobResource, final String key, final String value) {
-   addWorkflowJobUserContent(manager.getHelixPropertyStore(), 
workflowJobResource, key, value);
+    addOrUpdateWorkflowJobUserContentMap(manager.getHelixPropertyStore(), 
workflowJobResource,
+        Collections.singletonMap(key, value));
   }
 
   /* package */
-  static void addWorkflowJobUserContent(final HelixPropertyStore<ZNRecord> 
propertyStore,
-      String workflowJobResource, final String key, final String value) {
+  static void addOrUpdateWorkflowJobUserContentMap(final 
HelixPropertyStore<ZNRecord> propertyStore,
+      String workflowJobResource, final Map<String, String> 
contentToAddOrUpdate) {
     if (workflowJobResource == null) {
       throw new IllegalArgumentException("workflowJobResource must be not null 
when adding workflow / job user content");
     }
@@ -361,7 +362,7 @@ public class TaskUtil {
     propertyStore.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord znRecord) {
-        znRecord.setSimpleField(key, value);
+        znRecord.getSimpleFields().putAll(contentToAddOrUpdate);
         return znRecord;
       }
     }, AccessOption.PERSISTENT);
@@ -407,12 +408,13 @@ public class TaskUtil {
    */
   protected static void addTaskUserContent(final HelixManager manager, String 
job,
       final String task, final String key, final String value) {
-    addTaskUserContent(manager.getHelixPropertyStore(), job, task, key, value);
+    addOrUpdateTaskUserContentMap(manager.getHelixPropertyStore(), job, task,
+        Collections.singletonMap(key, value));
   }
 
   /* package */
-  static void addTaskUserContent(final HelixPropertyStore<ZNRecord> 
propertyStore,
-      final String job, final String task, final String key, final String 
value) {
+  static void addOrUpdateTaskUserContentMap(final HelixPropertyStore<ZNRecord> 
propertyStore,
+      final String job, final String task, final Map<String, String> 
contentToAddOrUpdate) {
     if (job == null || task == null) {
       throw new IllegalArgumentException("job and task must be not null when 
adding task user content");
     }
@@ -425,7 +427,7 @@ public class TaskUtil {
         if (znRecord.getMapField(task) == null) {
           znRecord.setMapField(task, new HashMap<String, String>());
         }
-        znRecord.getMapField(task).put(key, value);
+        znRecord.getMapField(task).putAll(contentToAddOrUpdate);
         return znRecord;
       }
     }, AccessOption.PERSISTENT);

http://git-wip-us.apache.org/repos/asf/helix/blob/b235c4ee/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
index d4ba29a..65f6b07 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,12 +55,12 @@ public class TestGetSetUserContentStore extends 
TaskTestBase {
   private class TaskRecord {
     String workflowName;
     String jobName;
-    String taskName;
+    String taskPartitionId;
 
     public TaskRecord(String workflow, String job, String task) {
       workflowName = workflow;
       jobName = job;
-      taskName = task;
+      taskPartitionId = task;
     }
   }
 
@@ -131,11 +132,10 @@ public class TestGetSetUserContentStore extends 
TaskTestBase {
       taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, 
String>()));
       JobConfig.Builder jobConfigBulider = new 
JobConfig.Builder().setCommand(JOB_COMMAND)
           .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
-      String jobSuffix = "JOB" + i;
-      String jobName = workflowName + "_" + jobSuffix;
-      String taskName = jobName + "_0";
-      workflowBuilder.addJob("JOB" + i, jobConfigBulider);
-      recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskName));
+      String jobName = "JOB" + i;
+      String taskPartitionId = "0";
+      workflowBuilder.addJob(jobName, jobConfigBulider);
+      recordMap.put(jobName, new TaskRecord(workflowName, jobName, 
taskPartitionId));
     }
 
     // Start the workflow and wait for all tasks started
@@ -143,33 +143,40 @@ public class TestGetSetUserContentStore extends 
TaskTestBase {
     allTasksReady.await();
 
     // add "workflow":"workflow" to the workflow's user content
-    _driver.addUserContent(workflowName, workflowName, workflowName, null, 
null, UserContentStore.Scope.WORKFLOW);
+    _driver.addOrUpdateWorkflowUserContentMap(workflowName,
+        Collections.singletonMap(workflowName, workflowName));
     for (TaskRecord rec : recordMap.values()) {
       // add "job":"job" to the job's user content
-      _driver.addUserContent(rec.jobName, rec.jobName, null, rec.jobName, 
null, UserContentStore.Scope.JOB);
-      // String taskId = 
_driver.getJobContext(rec.jobName).getTaskIdForPartition(0);
-
+      String namespacedJobName = 
TaskUtil.getNamespacedJobName(rec.workflowName, rec.jobName);
+      _driver.addOrUpdateJobUserContentMap(rec.workflowName, rec.jobName,
+          Collections.singletonMap(namespacedJobName, namespacedJobName));
 
+      String namespacedTaskName =
+          TaskUtil.getNamespacedTaskName(namespacedJobName, 
rec.taskPartitionId);
       // add "taskId":"taskId" to the task's user content
-      _driver.addUserContent(rec.taskName, rec.taskName, null, rec.jobName, 
rec.taskName, UserContentStore.Scope.TASK);
+      _driver.addOrUpdateTaskUserContentMap(rec.workflowName, rec.jobName, 
rec.taskPartitionId,
+          Collections.singletonMap(namespacedTaskName, namespacedTaskName));
     }
     adminReady.countDown();
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
 
     // Aggregate key-value mappings in UserContentStore
     for (TaskRecord rec : recordMap.values()) {
-      Assert.assertEquals(_driver
-              .getUserContent(TaskDumpResultKey.WorkflowContent.name(), 
UserContentStore.Scope.WORKFLOW,
-                  rec.workflowName, rec.jobName, rec.taskName),
+      Assert.assertEquals(_driver.getWorkflowUserContentMap(rec.workflowName)
+              .get(TaskDumpResultKey.WorkflowContent.name()),
           constructContentStoreResultString(rec.workflowName, 
rec.workflowName));
-      Assert.assertEquals(_driver
-              .getUserContent(TaskDumpResultKey.JobContent.name(), 
UserContentStore.Scope.JOB,
-                  rec.workflowName, rec.jobName, rec.taskName),
-          constructContentStoreResultString(rec.jobName, rec.jobName));
-      Assert.assertEquals(_driver
-              .getUserContent(TaskDumpResultKey.TaskContent.name(), 
UserContentStore.Scope.TASK,
-                  rec.workflowName, rec.jobName, rec.taskName),
-          constructContentStoreResultString(rec.taskName, rec.taskName));
+
+      String namespacedJobName = 
TaskUtil.getNamespacedJobName(rec.workflowName, rec.jobName);
+      Assert.assertEquals(_driver.getJobUserContentMap(rec.workflowName, 
rec.jobName)
+              .get(TaskDumpResultKey.JobContent.name()),
+          constructContentStoreResultString(namespacedJobName, 
namespacedJobName));
+
+      String namespacedTaskName =
+          TaskUtil.getNamespacedTaskName(namespacedJobName, 
rec.taskPartitionId);
+      Assert.assertEquals(
+          _driver.getTaskUserContentMap(rec.workflowName, rec.jobName, 
rec.taskPartitionId)
+              .get(TaskDumpResultKey.TaskContent.name()),
+          constructContentStoreResultString(namespacedTaskName, 
namespacedTaskName));
     }
   }
 

Reply via email to