This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit f11243f693eee572c2e681ef4a3feafd3d2dd88d Author: Ali Reza Zamani Zadeh Najari <[email protected]> AuthorDate: Wed Jun 17 11:26:27 2020 -0700 Remove previousAssignment read/write to ZK (#1074) Remove previousAssignment read/write to ZK In this commit, the previousAssignment has been removed from codebase and controller will no longer read/write previousAssignment from/to ZK. --- .../apache/helix/common/caches/TaskDataCache.java | 50 ++-------------------- .../java/org/apache/helix/task/JobDispatcher.java | 16 ++----- .../org/apache/helix/task/WorkflowDispatcher.java | 5 +-- .../task/TestTaskSchedulingTwoCurrentStates.java | 13 ------ .../helix/integration/task/TestTaskStopQueue.java | 10 +---- .../helix/task/TestTargetedTaskStateChange.java | 25 +---------- 6 files changed, 11 insertions(+), 108 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java index adb1c54..f6f6a72 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java @@ -59,9 +59,6 @@ public class TaskDataCache extends AbstractDataCache { // TODO: context and previous assignment should be wrapped into a class. Otherwise, int the future, // concurrency will be hard to handle. private Map<String, ZNRecord> _contextMap = new HashMap<>(); - private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>(); - private Set<String> _prevAssignmentToUpdate = new HashSet<>(); - private Set<String> _prevAssignmentToRemove = new HashSet<>(); private Set<String> _contextToUpdate = new HashSet<>(); private Set<String> _contextToRemove = new HashSet<>(); // The following fields have been added for quota-based task scheduling @@ -72,8 +69,7 @@ public class TaskDataCache extends AbstractDataCache { private Set<String> _dispatchedJobs = new HashSet<>(); private enum TaskDataType { - CONTEXT, - PREV_ASSIGNMENT + CONTEXT } @@ -99,7 +95,7 @@ public class TaskDataCache extends AbstractDataCache { */ public synchronized boolean refresh(HelixDataAccessor accessor, Map<String, ResourceConfig> resourceConfigMap) { - refreshContextsAndPreviousAssignments(accessor); + refreshContexts(accessor); // update workflow and job configs. _workflowConfigMap.clear(); Map<String, JobConfig> newJobConfigs = new HashMap<>(); @@ -175,11 +171,10 @@ public class TaskDataCache extends AbstractDataCache { return true; } - private void refreshContextsAndPreviousAssignments(HelixDataAccessor accessor) { + private void refreshContexts(HelixDataAccessor accessor) { // TODO: Need an optimize for reading context only if the refresh is needed. long start = System.currentTimeMillis(); _contextMap.clear(); - _prevAssignmentMap.clear(); if (_controlContextProvider.getClusterName() == null || _controlContextProvider.getClusterName() .equalsIgnoreCase(UNKNOWN_CLUSTER)) { return; @@ -187,22 +182,15 @@ public class TaskDataCache extends AbstractDataCache { String path = String.format("/%s/%s%s", _controlContextProvider.getClusterName(), PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT); List<String> contextPaths = new ArrayList<>(); - List<String> prevAssignmentPaths = new ArrayList<>(); List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0); if (childNames == null) { return; } for (String resourceName : childNames) { contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT)); - //Workflow does not have previous assignment - if (!_workflowConfigMap.containsKey(resourceName)) { - prevAssignmentPaths.add(getTaskDataPath(resourceName, TaskDataType.PREV_ASSIGNMENT)); - } } - + List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0, true); - List<ZNRecord> prevAssignments = - accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0, true); for (int i = 0; i < contexts.size(); i++) { ZNRecord context = contexts.get(i); @@ -215,12 +203,6 @@ public class TaskDataCache extends AbstractDataCache { } } - for (ZNRecord prevAssignment : prevAssignments) { - if (prevAssignment != null) { - _prevAssignmentMap.put(prevAssignment.getId(), prevAssignment); - } - } - if (LOG.isDebugEnabled()) { LogUtil.logDebug(LOG, genEventInfo(), "# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + ( @@ -325,13 +307,6 @@ public class TaskDataCache extends AbstractDataCache { TaskDataType.CONTEXT); batchDeleteData(accessor, new ArrayList<>(_contextToRemove), TaskDataType.CONTEXT); _contextToRemove.clear(); - - _prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove); - batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), _prevAssignmentMap, - _prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT); - batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove), - TaskDataType.PREV_ASSIGNMENT); - _prevAssignmentToRemove.clear(); } private void batchUpdateData(HelixDataAccessor accessor, List<String> dataUpdateNames, @@ -427,8 +402,6 @@ public class TaskDataCache extends AbstractDataCache { switch (taskDataType) { case CONTEXT: return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE); - case PREV_ASSIGNMENT: - return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE); } return null; } @@ -451,19 +424,4 @@ public class TaskDataCache extends AbstractDataCache { } return null; } - - public ResourceAssignment getPreviousAssignment(String resourceName) { - return _prevAssignmentMap.get(resourceName) != null ? new ResourceAssignment( - _prevAssignmentMap.get(resourceName)) : null; - } - - public void setPreviousAssignment(String resourceName, ResourceAssignment prevAssignment) { - _prevAssignmentMap.put(resourceName, prevAssignment.getRecord()); - _prevAssignmentToUpdate.add(resourceName); - } - - public void removePrevAssignment(String resourceName) { - _prevAssignmentMap.remove(resourceName); - _prevAssignmentToRemove.add(resourceName); - } } diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 191a2ea..c2b724b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -133,17 +133,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout()); } - // Grab the old assignment, or an empty one if it doesn't exist - ResourceAssignment prevAssignment = - _dataProvider.getTaskDataCache().getPreviousAssignment(jobName); - if (prevAssignment == null) { - prevAssignment = new ResourceAssignment(jobName); - } - // Will contain the list of partitions that must be explicitly dropped from the ideal state that // is stored in zk. - // Fetch the previous resource assignment from the property store. This is required because of - // HELIX-230. Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances() : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag()); @@ -191,7 +182,6 @@ public class JobDispatcher extends AbstractTaskDispatcher { // Update Workflow and Job context in data cache and ZK. _dataProvider.updateJobContext(jobName, jobCtx); _dataProvider.updateWorkflowContext(workflowResource, workflowCtx); - _dataProvider.getTaskDataCache().setPreviousAssignment(jobName, newAssignment); LOG.debug("Job " + jobName + " new assignment " + Arrays.toString(newAssignment.getMappedPartitions().toArray())); @@ -382,7 +372,7 @@ public class JobDispatcher extends AbstractTaskDispatcher { * are accounted for * @param jobName job name * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped - * @return instance -> partitionIds from previous assignment, if the instance is still live + * @return instance -> partitionIds from currentState, if the instance is still live */ protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments( Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName, @@ -424,8 +414,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { } /** - * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is - * deleted) it is added from context. Otherwise, the context won't be updated. + * If partition is missing from prevInstanceToTaskAssignments it is added from context. Otherwise, + * the context won't be updated. * @param jobCtx Job Context * @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput */ diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 4c143d2..53be558 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -588,7 +588,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // Only remove from cache when remove all workflow success. Otherwise, batch write will // clean all the contexts even if Configs and IdealStates are exists. Then all the workflows // and jobs will rescheduled again. - removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache()); + removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache()); } } else { LOG.info("Did not clean up workflow " + workflow @@ -596,12 +596,11 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { } } - private void removeContextsAndPreviousAssignment(String workflow, Set<String> jobs, + private void removeContexts(String workflow, Set<String> jobs, TaskDataCache cache) { if (jobs != null) { for (String job : jobs) { cache.removeContext(job); - cache.removePrevAssignment(job); } } cache.removeContext(workflow); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java index fe9cb3c..bb970c7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java @@ -35,15 +35,12 @@ import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.MasterSlaveSMD; -import org.apache.helix.model.Partition; -import org.apache.helix.model.ResourceAssignment; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; -import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskStateModelFactory; import org.apache.helix.task.TaskUtil; @@ -185,16 +182,6 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase { }, TestHelper.WAIT_DURATION); Assert.assertTrue(isCurrentStateCreated); - String previousAssignmentPath = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" - + namespacedJobName + "/PreviousResourceAssignment"; - ResourceAssignment prevAssignment = new ResourceAssignment(namespacedJobName); - Map<String, String> replicaMap = new HashMap<>(); - replicaMap.put(instanceP0, TaskPartitionState.RUNNING.name()); - Partition taskPartition = new Partition(namespacedJobName + "_0"); - prevAssignment.addReplicaMap(taskPartition, replicaMap); - _manager.getHelixDataAccessor().getBaseDataAccessor().set(previousAssignmentPath, - prevAssignment.getRecord(), AccessOption.PERSISTENT); - // Wait until the job is finished. _driver.pollForJobState(jobQueueName, namespacedJobName, TaskState.COMPLETED); Assert.assertEquals(CANCEL_COUNT.get(), 0); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java index add5687..f9ae845 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java @@ -33,8 +33,7 @@ import org.testng.Assert; import org.testng.annotations.Test; /** - * This test makes sure the workflow can be stopped if previousAssignment and currentState are - * deleted. + * This test makes sure the workflow can be stopped if currentState is deleted. */ public class TestTaskStopQueue extends TaskTestBase { private static final long TIMEOUT = 200000L; @@ -75,13 +74,6 @@ public class TestTaskStopQueue extends TaskTestBase { .exists(currentStatePath, AccessOption.PERSISTENT)); } - String previousAssignment = "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" - + namespacedJobName + "/PreviousResourceAssignment"; - _manager.getHelixDataAccessor().getBaseDataAccessor().remove(previousAssignment, - AccessOption.PERSISTENT); - Assert.assertFalse(_manager.getHelixDataAccessor().getBaseDataAccessor() - .exists(previousAssignment, AccessOption.PERSISTENT)); - // Start the Controller String controllerName = CONTROLLER_PREFIX + "_1"; _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java index 3913c2d..b79dcb9 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java @@ -20,7 +20,6 @@ package org.apache.helix.task; */ import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -35,7 +34,6 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Partition; -import org.apache.helix.model.ResourceAssignment; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -77,7 +75,6 @@ public class TestTargetedTaskStateChange { * different instances. * Scenario: * Instance0: Slave, Instance1: Master, Instance2: Slave - * PreviousAssignment of Task: Instance0: Running * CurrentState: Instance0: Running, Instance1: Running * Expected paMap: Instance0 -> Dropped */ @@ -91,8 +88,6 @@ public class TestTargetedTaskStateChange { when(mock._cache.getIdealStates()).thenReturn(mock._idealStates); when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet()); when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs); - when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME)) - .thenReturn(mock._resourceAssignment); when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig); when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag); _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache, @@ -114,12 +109,9 @@ public class TestTargetedTaskStateChange { } /** - * This test checks the behaviour of the controller while there is one current state which is - * different from - * Previous Assignment information. + * This test checks the behaviour of the controller while there is one current state. * Scenario: * Instance0: Slave, Instance1: Master, Instance2: Slave - * PreviousAssignment of Task: Instance0: Dropped * CurrentState: Instance0: Running * Expected paMap: Instance1 -> Running */ @@ -133,8 +125,6 @@ public class TestTargetedTaskStateChange { when(mock._cache.getIdealStates()).thenReturn(mock._idealStates); when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet()); when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs); - when(mock._cache.getTaskDataCache().getPreviousAssignment(JOB_NAME)) - .thenReturn(mock._resourceAssignment2); when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig); when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag); _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache, @@ -291,15 +281,6 @@ public class TestTargetedTaskStateChange { return currentStateOutput; } - private ResourceAssignment preparePreviousAssignment(String instance, String state) { - ResourceAssignment prevAssignment = new ResourceAssignment(JOB_NAME); - Map<String, String> replicaMap = new HashMap<>(); - replicaMap.put(instance, state); - Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME); - prevAssignment.addReplicaMap(taskPartition, replicaMap); - return prevAssignment; - } - private class MockTestInformation { private static final String SLAVE_INSTANCE = INSTANCE_PREFIX + "0"; private static final String MASTER_INSTANCE = INSTANCE_PREFIX + "1"; @@ -316,10 +297,6 @@ public class TestTargetedTaskStateChange { SLAVE_INSTANCE, TaskPartitionState.RUNNING.name(), TaskPartitionState.RUNNING.name()); private CurrentStateOutput _currentStateOutput2 = prepareCurrentState2(MASTER_INSTANCE, TaskPartitionState.RUNNING.name()); - private ResourceAssignment _resourceAssignment = - preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.RUNNING.name()); - private ResourceAssignment _resourceAssignment2 = - preparePreviousAssignment(SLAVE_INSTANCE, TaskPartitionState.DROPPED.name()); private TaskDataCache _taskDataCache = mock(TaskDataCache.class); private RuntimeJobDag _runtimeJobDag = mock(RuntimeJobDag.class);
