This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 356b56c00848acd3a24ba148fcdae3304ab155ba Author: Junkai Xue <j...@linkedin.com> AuthorDate: Thu Nov 29 14:23:56 2018 -0800 Batch write support for previous assignment --- .../apache/helix/common/caches/TaskDataCache.java | 115 +++++++++++++++++---- .../java/org/apache/helix/task/JobDispatcher.java | 29 +----- .../org/apache/helix/task/WorkflowDispatcher.java | 6 +- 3 files changed, 100 insertions(+), 50 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java index 7e46c2a..84145f9 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java @@ -33,11 +33,13 @@ import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; import org.apache.helix.common.controllers.ControlContextProvider; import org.apache.helix.controller.LogUtil; +import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.RuntimeJobDag; +import org.apache.helix.task.Task; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; @@ -54,7 +56,13 @@ public class TaskDataCache extends AbstractDataCache { private Map<String, JobConfig> _jobConfigMap = new HashMap<>(); private Map<String, RuntimeJobDag> _runtimeJobDagMap = new HashMap<>(); private Map<String, WorkflowConfig> _workflowConfigMap = new ConcurrentHashMap<>(); + + // TODO: context and previous assignment should be wrapped into a class. Otherwise, int the future, + // concurrency will be hard to handle. private Map<String, ZNRecord> _contextMap = new HashMap<>(); + private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>(); + private Set<String> _prevAssignmentToUpdate = new HashSet<>(); + private Set<String> _prevAssignmentToRemove = new HashSet<>(); private Set<String> _contextToUpdate = new HashSet<>(); private Set<String> _contextToRemove = new HashSet<>(); // The following fields have been added for quota-based task scheduling @@ -64,6 +72,11 @@ public class TaskDataCache extends AbstractDataCache { // JobDispatcher from RESOURCE_TO_BALANCE to reduce the redundant computation. private Set<String> _dispatchedJobs = new HashSet<>(); + private enum TaskDataType { + CONTEXT, + PREV_ASSIGNMENT + } + public TaskDataCache(ControlContextProvider contextProvider) { super(contextProvider); @@ -87,7 +100,7 @@ public class TaskDataCache extends AbstractDataCache { */ public synchronized boolean refresh(HelixDataAccessor accessor, Map<String, ResourceConfig> resourceConfigMap) { - refreshJobContexts(accessor); + refreshContextsAndPreviousAssignments(accessor); // update workflow and job configs. _workflowConfigMap.clear(); Map<String, JobConfig> newJobConfigs = new HashMap<>(); @@ -140,10 +153,11 @@ public class TaskDataCache extends AbstractDataCache { return true; } - private void refreshJobContexts(HelixDataAccessor accessor) { + private void refreshContextsAndPreviousAssignments(HelixDataAccessor accessor) { // TODO: Need an optimize for reading context only if the refresh is needed. long start = System.currentTimeMillis(); _contextMap.clear(); + _prevAssignmentMap.clear(); if (_controlContextProvider.getClusterName() == null || _controlContextProvider.getClusterName() .equalsIgnoreCase(UNKNOWN_CLUSTER)) { return; @@ -151,15 +165,20 @@ public class TaskDataCache extends AbstractDataCache { String path = String.format("/%s/%s%s", _controlContextProvider.getClusterName(), PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT); List<String> contextPaths = new ArrayList<>(); + List<String> prevAssignmentPaths = new ArrayList<>(); List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0); if (childNames == null) { return; } - for (String context : childNames) { - contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE)); + for (String resourceName : childNames) { + contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT)); + prevAssignmentPaths.add(getTaskDataPath(resourceName, TaskDataType.PREV_ASSIGNMENT)); } List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0); + List<ZNRecord> prevAssignments = + accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0); + for (int i = 0; i < contexts.size(); i++) { ZNRecord context = contexts.get(i); if (context != null && context.getSimpleField(NAME) != null) { @@ -171,6 +190,12 @@ public class TaskDataCache extends AbstractDataCache { } } + for (ZNRecord prevAssignment : prevAssignments) { + if (prevAssignment != null) { + _prevAssignmentMap.put(prevAssignment.getId(), prevAssignment); + } + } + if (LOG.isDebugEnabled()) { LogUtil.logDebug(LOG, genEventInfo(), "# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + ( @@ -269,44 +294,61 @@ public class TaskDataCache extends AbstractDataCache { } public void persistDataChanges(HelixDataAccessor accessor) { - // Flush Context - List<String> contextUpdatePaths = new ArrayList<>(); - List<ZNRecord> contextUpdateData = new ArrayList<>(); // Do not update it if the is need to be remove _contextToUpdate.removeAll(_contextToRemove); - List<String> contextUpdateNames = new ArrayList<>(_contextToUpdate); - for (String resourceName : contextUpdateNames) { - if (_contextMap.get(resourceName) != null) { - contextUpdatePaths.add(getContextPath(resourceName)); - contextUpdateData.add(_contextMap.get(resourceName)); + batchUpdateData(accessor, new ArrayList<>(_contextToUpdate), _contextMap, _contextToUpdate, + TaskDataType.CONTEXT); + batchDeleteData(accessor, new ArrayList<>(_contextToRemove), TaskDataType.CONTEXT); + _contextToRemove.clear(); + + _prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove); + batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), _prevAssignmentMap, + _prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT); + batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove), + TaskDataType.PREV_ASSIGNMENT); + _prevAssignmentToRemove.clear(); + } + + private void batchUpdateData(HelixDataAccessor accessor, List<String> dataUpdateNames, + Map<String, ZNRecord> dataMap, Set<String> dataToUpdate, TaskDataType taskDataType) { + List<String> contextUpdatePaths = new ArrayList<>(); + List<ZNRecord> updatedData = new ArrayList<>(); + for (String resourceName : dataUpdateNames) { + if (dataMap.get(resourceName) != null) { + contextUpdatePaths.add(getTaskDataPath(resourceName, taskDataType)); + updatedData.add(dataMap.get(resourceName)); } } boolean[] updateSuccess = accessor.getBaseDataAccessor() - .setChildren(contextUpdatePaths, contextUpdateData, AccessOption.PERSISTENT); + .setChildren(contextUpdatePaths, updatedData, AccessOption.PERSISTENT); for (int i = 0; i < updateSuccess.length; i++) { if (updateSuccess[i]) { - _contextToUpdate.remove(contextUpdateNames.get(i)); + dataToUpdate.remove(dataUpdateNames.get(i)); + } else { + LogUtil.logWarn(LOG, _controlContextProvider.getClusterEventId(), String + .format("Failed to update the %s for %s", taskDataType.name(), dataUpdateNames.get(i))); } } + } + + private void batchDeleteData(HelixDataAccessor accessor, List<String> contextNamesToRemove, + TaskDataType taskDataType) { // Delete contexts // We can not leave the context here since some of the deletion happens for cleaning workflow // If we leave it in the memory, Helix will not allow user create it with same name. // TODO: Let's have periodical clean up thread that could remove deletion failed contexts. List<String> contextPathsToRemove = new ArrayList<>(); - List<String> contextNamesToRemove = new ArrayList<>(_contextToRemove); for (String resourceName : contextNamesToRemove) { - contextPathsToRemove.add(getContextPath(resourceName)); + contextPathsToRemove.add(getTaskDataPath(resourceName, taskDataType)); } // TODO: current behavior is when you delete non-existing data will return false. // Once the behavior fixed, we can add retry logic back. Otherwise, it will stay in memory and // not allow same workflow name recreation. accessor.getBaseDataAccessor().remove(contextPathsToRemove, AccessOption.PERSISTENT); - - _contextToRemove.clear(); } /** @@ -346,10 +388,24 @@ public class TaskDataCache extends AbstractDataCache { + _controlContextProvider.getClusterName() + '\'' + '}'; } - private String getContextPath(String resourceName) { - return String.format("/%s/%s%s/%s/%s", _controlContextProvider.getClusterName(), - PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, - TaskConstants.CONTEXT_NODE); + /** + * Get the path based on different data types. If the type does not exist, it will return null + * instead. + * + * @param resourceName + * @param taskDataType + * @return + */ + private String getTaskDataPath(String resourceName, TaskDataType taskDataType) { + String prevFix = String.format("/%s/%s%s/%s", _controlContextProvider.getClusterName(), + PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName); + switch (taskDataType) { + case CONTEXT: + return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE); + case PREV_ASSIGNMENT: + return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE); + } + return null; } public void dispatchJob(String jobName) { @@ -370,4 +426,19 @@ public class TaskDataCache extends AbstractDataCache { } return null; } + + public ResourceAssignment getPreviousAssignment(String resourceName) { + return _prevAssignmentMap.get(resourceName) != null ? new ResourceAssignment( + _prevAssignmentMap.get(resourceName)) : null; + } + + public void setPreviousAssignment(String resourceName, ResourceAssignment prevAssignment) { + _prevAssignmentMap.put(resourceName, prevAssignment.getRecord()); + _prevAssignmentToUpdate.add(resourceName); + } + + public void removePrevAssignment(String resourceName) { + _prevAssignmentMap.remove(resourceName); + _prevAssignmentToRemove.add(resourceName); + } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index d72db7f..2ae5b2f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -106,7 +106,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { } // Grab the old assignment, or an empty one if it doesn't exist - ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName); + ResourceAssignment prevAssignment = + _dataProvider.getTaskDataCache().getPreviousAssignment(jobName); if (prevAssignment == null) { prevAssignment = new ResourceAssignment(jobName); } @@ -161,8 +162,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { // Update Workflow and Job context in data cache and ZK. _dataProvider.updateJobContext(jobName, jobCtx); _dataProvider.updateWorkflowContext(workflowResource, workflowCtx); - - setPrevResourceAssignment(jobName, newAssignment); + _dataProvider.getTaskDataCache().setPreviousAssignment(jobName, newAssignment); LOG.debug("Job " + jobName + " new assignment " + Arrays.toString(newAssignment.getMappedPartitions().toArray())); @@ -317,29 +317,6 @@ public class JobDispatcher extends AbstractTaskDispatcher { } /** - * Get the last task assignment for a given job - * @param resourceName the name of the job - * @return {@link ResourceAssignment} instance, or null if no assignment is available - */ - private ResourceAssignment getPrevResourceAssignment(String resourceName) { - ZNRecord r = _manager.getHelixPropertyStore().get( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.PREV_RA_NODE), - null, AccessOption.PERSISTENT); - return r != null ? new ResourceAssignment(r) : null; - } - - /** - * Set the last task assignment for a given job - * @param resourceName the name of the job - * @param ra {@link ResourceAssignment} containing the task assignment - */ - private void setPrevResourceAssignment(String resourceName, ResourceAssignment ra) { - _manager.getHelixPropertyStore().set( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.PREV_RA_NODE), - ra.getRecord(), AccessOption.PERSISTENT); - } - - /** * Checks if the job has completed. Look at states of all tasks of the job, there're 3 kind: * completed, given up, not given up. The job is completed if all tasks are completed or given up, * and the number of given up tasks is within job failure threshold. diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index bb25ef1..eec2e87 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -556,7 +556,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Only remove from cache when remove all workflow success. Otherwise, batch write will // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows // and jobs will rescheduled again. - removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache()); + removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache()); } } else { LOG.info("Did not clean up workflow " + workflow @@ -564,10 +564,12 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { } } - private void removeContexts(String workflow, Set<String> jobs, TaskDataCache cache) { + private void removeContextsAndPreviousAssignment(String workflow, Set<String> jobs, + TaskDataCache cache) { if (jobs != null) { for (String job : jobs) { cache.removeContext(job); + cache.removePrevAssignment(job); } } cache.removeContext(workflow);