Repository: helix Updated Branches: refs/heads/master d02083e65 -> 1f3402c49
[HELIX-724] Refactor AssignableInstanceManager This RB refactors AssignableInstanceManager's constructor so that the actual building of AssignableInstances is separated into another public method. This allows the instantiation of AssignableInstanceManager in TaskDataCache without having to provide metadata for building AssignableInstances. ChangeList: 1. Add an empty constructor 2. Put build logic in a separate method 3. A boolean flag was added for buildAssignableInstances() to prevent it from building AssignableInstance objects from scratch every time this method is called by TaskDataCache's refresh() Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1f3402c4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1f3402c4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1f3402c4 Branch: refs/heads/master Commit: 1f3402c494dcaad776b0ffb914afc33d7ed9725a Parents: d02083e Author: Hunter Lee <[email protected]> Authored: Mon Jul 9 18:39:24 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Thu Jul 12 11:18:21 2018 -0700 ---------------------------------------------------------------------- .../helix/task/AssignableInstanceManager.java | 27 ++++++++++++++------ .../org/apache/helix/task/JobRebalancer.java | 8 +----- .../task/TestAssignableInstanceManager.java | 12 ++++++--- ...signableInstanceManagerControllerSwitch.java | 15 ++++++----- 4 files changed, 37 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1f3402c4/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java index 4ede2b8..829aa72 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java +++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java @@ -41,22 +41,32 @@ public class AssignableInstanceManager { private Map<String, AssignableInstance> _assignableInstanceMap; // TaskID -> TaskAssignResult TODO: Hunter: Move this if not needed private Map<String, TaskAssignResult> _taskAssignResultMap; + private boolean _hasBeenBuilt; // Flag for whether AssignableInstances have been built /** - * Constructor for AssignableInstanceManager. Builds AssignableInstances based on - * WorkflowContexts, JobContexts, and LiveInstances. Note that the lists of LiveInstances and - * InstanceConfigs must match, meaning a LiveInstance and an InstanceConfig at the same index - * represent the same instance. + * Basic constructor for AssignableInstanceManager to allow an empty instantiation. buildAssignableInstances() must be explicitly called after instantiation. + */ + public AssignableInstanceManager() { + _assignableInstanceMap = new HashMap<>(); + _taskAssignResultMap = new HashMap<>(); + _hasBeenBuilt = false; // AssignableInstances haven't been built + } + + /** + * Builds AssignableInstances and restores TaskAssignResults from scratch by reading from + * TaskDataCache. * @param clusterConfig * @param taskDataCache * @param liveInstances * @param instanceConfigs */ - public AssignableInstanceManager(ClusterConfig clusterConfig, TaskDataCache taskDataCache, + public void buildAssignableInstances(ClusterConfig clusterConfig, TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) { - // Build the cache from scratch - _assignableInstanceMap = new HashMap<>(); - _taskAssignResultMap = new HashMap<>(); + // Only need to build from scratch during Controller switch, etc. + // This keeps the pipeline from building from scratch every cache refresh + if (_hasBeenBuilt) { + return; + } // Create all AssignableInstance objects based on what's in liveInstances for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) { @@ -124,6 +134,7 @@ public class AssignableInstanceManager { } } } + _hasBeenBuilt = true; // Set the flag so that it's not re-building from cache every pipeline iteration } /** http://git-wip-us.apache.org/repos/asf/helix/blob/1f3402c4/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 5f05acc..0f09166 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -19,12 +19,10 @@ package org.apache.helix.task; * under the License. */ -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -33,14 +31,11 @@ import java.util.TreeSet; import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; -import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; @@ -50,7 +45,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; /** * Custom rebalancer implementation for the {@code Job} in task model. @@ -433,4 +427,4 @@ public class JobRebalancer extends TaskRebalancer { private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) { return TaskUtil.isGenericTaskJob(jobConfig) ? _genericTaskAssignmentCal : _fixTaskAssignmentCal; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/1f3402c4/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java index a0457a6..c17f4eb 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java @@ -36,6 +36,7 @@ import org.apache.helix.task.assigner.TaskAssignResult; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import sun.security.jca.GetInstance; public class TestAssignableInstanceManager { private static final int NUM_PARTICIPANTS = 3; @@ -100,8 +101,9 @@ public class TestAssignableInstanceManager { _taskIDs.clear(); } - // Create an AssignableInstanceManager - _assignableInstanceManager = new AssignableInstanceManager(_clusterConfig, _taskDataCache, + // Create an AssignableInstanceManager and build + _assignableInstanceManager = new AssignableInstanceManager(); + _assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache, _liveInstances, _instanceConfigs); } @@ -141,9 +143,11 @@ public class TestAssignableInstanceManager { // Check that the assignable instance map contains new instances and there are no // TaskAssignResults due to previous live instances being removed Assert.assertEquals(_assignableInstanceManager.getTaskAssignResultMap().size(), 0); - Assert.assertEquals(_assignableInstanceManager.getAssignableInstanceMap().size(), newLiveInstances.size()); + Assert.assertEquals(_assignableInstanceManager.getAssignableInstanceMap().size(), + newLiveInstances.size()); for (String instance : newLiveInstances.keySet()) { - Assert.assertTrue(_assignableInstanceManager.getAssignableInstanceMap().containsKey(instance)); + Assert + .assertTrue(_assignableInstanceManager.getAssignableInstanceMap().containsKey(instance)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/1f3402c4/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java index f07d6e3..b05e049 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java +++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java @@ -42,8 +42,8 @@ import org.testng.Assert; import org.testng.annotations.Test; public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase { - protected int numJobs = 2; - protected int numTasks = 3; + private int numJobs = 2; + private int numTasks = 3; @Test public void testControllerSwitch() throws InterruptedException { @@ -74,8 +74,10 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase Thread.sleep(2000); taskDataCache.refresh(accessor, resourceConfigMap); - AssignableInstanceManager prevAssignableInstanceManager = new AssignableInstanceManager( - clusterConfig, taskDataCache, liveInstanceMap, instanceConfigMap); + // Create prev manager and build + AssignableInstanceManager prevAssignableInstanceManager = new AssignableInstanceManager(); + prevAssignableInstanceManager.buildAssignableInstances(clusterConfig, taskDataCache, + liveInstanceMap, instanceConfigMap); Map<String, AssignableInstance> prevAssignableInstanceMap = new HashMap<>(prevAssignableInstanceManager.getAssignableInstanceMap()); Map<String, TaskAssignResult> prevTaskAssignResultMap = @@ -90,8 +92,9 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase // Generate a new AssignableInstanceManager taskDataCache.refresh(accessor, resourceConfigMap); - AssignableInstanceManager newAssignableInstanceManager = new AssignableInstanceManager( - clusterConfig, taskDataCache, liveInstanceMap, instanceConfigMap); + AssignableInstanceManager newAssignableInstanceManager = new AssignableInstanceManager(); + newAssignableInstanceManager.buildAssignableInstances(clusterConfig, taskDataCache, + liveInstanceMap, instanceConfigMap); Map<String, AssignableInstance> newAssignableInstanceMap = new HashMap<>(newAssignableInstanceManager.getAssignableInstanceMap()); Map<String, TaskAssignResult> newTaskAssignResultMap =
