This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 356b56c00848acd3a24ba148fcdae3304ab155ba
Author: Junkai Xue <j...@linkedin.com>
AuthorDate: Thu Nov 29 14:23:56 2018 -0800

    Batch write support for previous assignment
---
 .../apache/helix/common/caches/TaskDataCache.java  | 115 +++++++++++++++++----
 .../java/org/apache/helix/task/JobDispatcher.java  |  29 +-----
 .../org/apache/helix/task/WorkflowDispatcher.java  |   6 +-
 3 files changed, 100 insertions(+), 50 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 7e46c2a..84145f9 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -33,11 +33,13 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.controllers.ControlContextProvider;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.RuntimeJobDag;
+import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -54,7 +56,13 @@ public class TaskDataCache extends AbstractDataCache {
   private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
   private Map<String, RuntimeJobDag> _runtimeJobDagMap = new HashMap<>();
   private Map<String, WorkflowConfig> _workflowConfigMap = new 
ConcurrentHashMap<>();
+
+  // TODO: context and previous assignment should be wrapped into a class. 
Otherwise, int the future,
+  // concurrency will be hard to handle.
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
+  private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>();
+  private Set<String> _prevAssignmentToUpdate = new HashSet<>();
+  private Set<String> _prevAssignmentToRemove = new HashSet<>();
   private Set<String> _contextToUpdate = new HashSet<>();
   private Set<String> _contextToRemove = new HashSet<>();
   // The following fields have been added for quota-based task scheduling
@@ -64,6 +72,11 @@ public class TaskDataCache extends AbstractDataCache {
   // JobDispatcher from RESOURCE_TO_BALANCE to reduce the redundant 
computation.
   private Set<String> _dispatchedJobs = new HashSet<>();
 
+  private enum TaskDataType {
+    CONTEXT,
+    PREV_ASSIGNMENT
+  }
+
 
   public TaskDataCache(ControlContextProvider contextProvider) {
     super(contextProvider);
@@ -87,7 +100,7 @@ public class TaskDataCache extends AbstractDataCache {
    */
   public synchronized boolean refresh(HelixDataAccessor accessor,
       Map<String, ResourceConfig> resourceConfigMap) {
-    refreshJobContexts(accessor);
+    refreshContextsAndPreviousAssignments(accessor);
     // update workflow and job configs.
     _workflowConfigMap.clear();
     Map<String, JobConfig> newJobConfigs = new HashMap<>();
@@ -140,10 +153,11 @@ public class TaskDataCache extends AbstractDataCache {
     return true;
   }
 
-  private void refreshJobContexts(HelixDataAccessor accessor) {
+  private void refreshContextsAndPreviousAssignments(HelixDataAccessor 
accessor) {
     // TODO: Need an optimize for reading context only if the refresh is 
needed.
     long start = System.currentTimeMillis();
     _contextMap.clear();
+    _prevAssignmentMap.clear();
     if (_controlContextProvider.getClusterName() == null || 
_controlContextProvider.getClusterName()
         .equalsIgnoreCase(UNKNOWN_CLUSTER)) {
       return;
@@ -151,15 +165,20 @@ public class TaskDataCache extends AbstractDataCache {
     String path = String.format("/%s/%s%s", 
_controlContextProvider.getClusterName(),
         PropertyType.PROPERTYSTORE.name(), 
TaskConstants.REBALANCER_CONTEXT_ROOT);
     List<String> contextPaths = new ArrayList<>();
+    List<String> prevAssignmentPaths = new ArrayList<>();
     List<String> childNames = 
accessor.getBaseDataAccessor().getChildNames(path, 0);
     if (childNames == null) {
       return;
     }
-    for (String context : childNames) {
-      contextPaths.add(Joiner.on("/").join(path, context, 
TaskConstants.CONTEXT_NODE));
+    for (String resourceName : childNames) {
+      contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
+      prevAssignmentPaths.add(getTaskDataPath(resourceName, 
TaskDataType.PREV_ASSIGNMENT));
     }
 
     List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, 
null, 0);
+    List<ZNRecord> prevAssignments =
+        accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0);
+
     for (int i = 0; i < contexts.size(); i++) {
       ZNRecord context = contexts.get(i);
       if (context != null && context.getSimpleField(NAME) != null) {
@@ -171,6 +190,12 @@ public class TaskDataCache extends AbstractDataCache {
       }
     }
 
+    for (ZNRecord prevAssignment : prevAssignments) {
+      if (prevAssignment != null) {
+        _prevAssignmentMap.put(prevAssignment.getId(), prevAssignment);
+      }
+    }
+
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, genEventInfo(),
           "# of workflow/job context read from zk: " + _contextMap.size() + ". 
Take " + (
@@ -269,44 +294,61 @@ public class TaskDataCache extends AbstractDataCache {
   }
 
   public void persistDataChanges(HelixDataAccessor accessor) {
-    // Flush Context
-    List<String> contextUpdatePaths = new ArrayList<>();
-    List<ZNRecord> contextUpdateData = new ArrayList<>();
     // Do not update it if the is need to be remove
     _contextToUpdate.removeAll(_contextToRemove);
-    List<String> contextUpdateNames = new ArrayList<>(_contextToUpdate);
-    for (String resourceName : contextUpdateNames) {
-      if (_contextMap.get(resourceName) != null) {
-        contextUpdatePaths.add(getContextPath(resourceName));
-        contextUpdateData.add(_contextMap.get(resourceName));
+    batchUpdateData(accessor, new ArrayList<>(_contextToUpdate), _contextMap, 
_contextToUpdate,
+        TaskDataType.CONTEXT);
+    batchDeleteData(accessor, new ArrayList<>(_contextToRemove), 
TaskDataType.CONTEXT);
+    _contextToRemove.clear();
+
+    _prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove);
+    batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), 
_prevAssignmentMap,
+        _prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT);
+    batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove),
+        TaskDataType.PREV_ASSIGNMENT);
+    _prevAssignmentToRemove.clear();
+  }
+
+  private void batchUpdateData(HelixDataAccessor accessor, List<String> 
dataUpdateNames,
+      Map<String, ZNRecord> dataMap, Set<String> dataToUpdate, TaskDataType 
taskDataType) {
+    List<String> contextUpdatePaths = new ArrayList<>();
+    List<ZNRecord> updatedData = new ArrayList<>();
+    for (String resourceName : dataUpdateNames) {
+      if (dataMap.get(resourceName) != null) {
+        contextUpdatePaths.add(getTaskDataPath(resourceName, taskDataType));
+        updatedData.add(dataMap.get(resourceName));
       }
     }
 
     boolean[] updateSuccess = accessor.getBaseDataAccessor()
-        .setChildren(contextUpdatePaths, contextUpdateData, 
AccessOption.PERSISTENT);
+        .setChildren(contextUpdatePaths, updatedData, AccessOption.PERSISTENT);
 
     for (int i = 0; i < updateSuccess.length; i++) {
       if (updateSuccess[i]) {
-        _contextToUpdate.remove(contextUpdateNames.get(i));
+        dataToUpdate.remove(dataUpdateNames.get(i));
+      } else {
+        LogUtil.logWarn(LOG, _controlContextProvider.getClusterEventId(), 
String
+            .format("Failed to update the %s for %s", taskDataType.name(), 
dataUpdateNames.get(i)));
       }
     }
+  }
+
+  private void batchDeleteData(HelixDataAccessor accessor, List<String> 
contextNamesToRemove,
+      TaskDataType taskDataType) {
 
     // Delete contexts
     // We can not leave the context here since some of the deletion happens 
for cleaning workflow
     // If we leave it in the memory, Helix will not allow user create it with 
same name.
     // TODO: Let's have periodical clean up thread that could remove deletion 
failed contexts.
     List<String> contextPathsToRemove = new ArrayList<>();
-    List<String> contextNamesToRemove = new ArrayList<>(_contextToRemove);
     for (String resourceName : contextNamesToRemove) {
-      contextPathsToRemove.add(getContextPath(resourceName));
+      contextPathsToRemove.add(getTaskDataPath(resourceName, taskDataType));
     }
 
     // TODO: current behavior is when you delete non-existing data will return 
false.
     // Once the behavior fixed, we can add retry logic back. Otherwise, it 
will stay in memory and
     // not allow same workflow name recreation.
     accessor.getBaseDataAccessor().remove(contextPathsToRemove, 
AccessOption.PERSISTENT);
-
-    _contextToRemove.clear();
   }
 
   /**
@@ -346,10 +388,24 @@ public class TaskDataCache extends AbstractDataCache {
         + _controlContextProvider.getClusterName() + '\'' + '}';
   }
 
-  private String getContextPath(String resourceName) {
-    return String.format("/%s/%s%s/%s/%s", 
_controlContextProvider.getClusterName(),
-        PropertyType.PROPERTYSTORE.name(), 
TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
-        TaskConstants.CONTEXT_NODE);
+  /**
+   * Get the path based on different data types. If the type does not exist, 
it will return null
+   * instead.
+   *
+   * @param resourceName
+   * @param taskDataType
+   * @return
+   */
+  private String getTaskDataPath(String resourceName, TaskDataType 
taskDataType) {
+    String prevFix = String.format("/%s/%s%s/%s", 
_controlContextProvider.getClusterName(),
+        PropertyType.PROPERTYSTORE.name(), 
TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
+    switch (taskDataType) {
+    case CONTEXT:
+      return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE);
+    case PREV_ASSIGNMENT:
+      return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE);
+    }
+    return null;
   }
 
   public void dispatchJob(String jobName) {
@@ -370,4 +426,19 @@ public class TaskDataCache extends AbstractDataCache {
     }
     return null;
   }
+
+  public ResourceAssignment getPreviousAssignment(String resourceName) {
+    return _prevAssignmentMap.get(resourceName) != null ? new 
ResourceAssignment(
+        _prevAssignmentMap.get(resourceName)) : null;
+  }
+
+  public void setPreviousAssignment(String resourceName, ResourceAssignment 
prevAssignment) {
+    _prevAssignmentMap.put(resourceName, prevAssignment.getRecord());
+    _prevAssignmentToUpdate.add(resourceName);
+  }
+
+  public void removePrevAssignment(String resourceName) {
+    _prevAssignmentMap.remove(resourceName);
+    _prevAssignmentToRemove.add(resourceName);
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index d72db7f..2ae5b2f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -106,7 +106,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     }
 
     // Grab the old assignment, or an empty one if it doesn't exist
-    ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
+    ResourceAssignment prevAssignment =
+        _dataProvider.getTaskDataCache().getPreviousAssignment(jobName);
     if (prevAssignment == null) {
       prevAssignment = new ResourceAssignment(jobName);
     }
@@ -161,8 +162,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Update Workflow and Job context in data cache and ZK.
     _dataProvider.updateJobContext(jobName, jobCtx);
     _dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
-
-    setPrevResourceAssignment(jobName, newAssignment);
+    _dataProvider.getTaskDataCache().setPreviousAssignment(jobName, 
newAssignment);
 
     LOG.debug("Job " + jobName + " new assignment "
         + Arrays.toString(newAssignment.getMappedPartitions().toArray()));
@@ -317,29 +317,6 @@ public class JobDispatcher extends AbstractTaskDispatcher {
   }
 
   /**
-   * Get the last task assignment for a given job
-   * @param resourceName the name of the job
-   * @return {@link ResourceAssignment} instance, or null if no assignment is 
available
-   */
-  private ResourceAssignment getPrevResourceAssignment(String resourceName) {
-    ZNRecord r = _manager.getHelixPropertyStore().get(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, TaskConstants.PREV_RA_NODE),
-        null, AccessOption.PERSISTENT);
-    return r != null ? new ResourceAssignment(r) : null;
-  }
-
-  /**
-   * Set the last task assignment for a given job
-   * @param resourceName the name of the job
-   * @param ra {@link ResourceAssignment} containing the task assignment
-   */
-  private void setPrevResourceAssignment(String resourceName, 
ResourceAssignment ra) {
-    _manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
resourceName, TaskConstants.PREV_RA_NODE),
-        ra.getRecord(), AccessOption.PERSISTENT);
-  }
-
-  /**
    * Checks if the job has completed. Look at states of all tasks of the job, 
there're 3 kind:
    * completed, given up, not given up. The job is completed if all tasks are 
completed or given up,
    * and the number of given up tasks is within job failure threshold.
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index bb25ef1..eec2e87 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -556,7 +556,7 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
         // Only remove from cache when remove all workflow success. Otherwise, 
batch write will
         // clean all the contexts even if Configs and IdealStates are exists. 
Then all the workflows
         // and jobs will rescheduled again.
-        removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
+        removeContextsAndPreviousAssignment(workflow, jobs, 
_clusterDataCache.getTaskDataCache());
       }
      } else {
       LOG.info("Did not clean up workflow " + workflow
@@ -564,10 +564,12 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
     }
   }
 
-  private void removeContexts(String workflow, Set<String> jobs, TaskDataCache 
cache) {
+  private void removeContextsAndPreviousAssignment(String workflow, 
Set<String> jobs,
+      TaskDataCache cache) {
     if (jobs != null) {
       for (String job : jobs) {
         cache.removeContext(job);
+        cache.removePrevAssignment(job);
       }
     }
     cache.removeContext(workflow);

Reply via email to