Refactor TaskAssignmentCalculator API

Refactoring TaskAssignmentCalculator API, since ClusterDataCache is too large 
and not all the contents inside are used.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb2a9db
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb2a9db
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb2a9db

Branch: refs/heads/helix-0.6.x
Commit: 7bb2a9db2396a00bb9a721634a2432240679c657
Parents: 0a18726
Author: Junkai Xue <j...@linkedin.com>
Authored: Tue Sep 13 16:00:08 2016 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Tue Sep 13 16:00:08 2016 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 36 ++++++++++++++------
 .../FixedTargetTaskAssignmentCalculator.java    | 35 +++++++------------
 .../helix/task/FixedTargetTaskRebalancer.java   |  4 +--
 .../task/GenericTaskAssignmentCalculator.java   |  6 ++--
 .../helix/task/GenericTaskRebalancer.java       |  4 +--
 .../org/apache/helix/task/JobRebalancer.java    | 11 +++---
 .../helix/task/TaskAssignmentCalculator.java    | 10 +++---
 7 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index dacf98d..c8ca941 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -453,21 +453,35 @@ public class ClusterDataCache {
   }
 
   /**
-   * Return all the nodes that are enabled and tagged same as the job.
-   * @param allInstances List of instances to filter with instance tag
-   * @param instanceTag The instance group tag
-   * @return A new set contains instance name and that are marked enabled and 
have same
-   *         tag with job. The original set will not be changed during the 
filtering
+   * Return all the live nodes that are enabled
+   * @return A new set contains live instance name and that are marked enabled
    */
-  public Set<String> getAllEnabledInstanceWithTag(final Set<String> 
allInstances,
-      String instanceTag) {
+  public Set<String> getAllEnabledLiveInstances() {
+    return getAllEnabledInstances(null);
+  }
+
+  /**
+   * Return all the live nodes that are enabled and tagged same as the job.
+   * @param instanceTag The instance group tag, could be null, when no 
instance group specified
+   * @return A new set contains live instance name and that are marked enabled 
and have same
+   *         tag with job, only if instance tag input is not null.
+   */
+  public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) {
+    return getAllEnabledInstances(instanceTag);
+  }
+
+  private Set<String> getAllEnabledInstances(String instanceTag) {
     Set<String> enabledTagInstances = new HashSet<String>();
-    for (String instance : allInstances) {
+    for (String instance : _liveInstanceMap.keySet()) {
       InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
 
-      if (instanceConfig != null && instanceConfig.getInstanceEnabled() && 
instanceConfig
-          .containsTag(instanceTag)) {
-        enabledTagInstances.add(instance);
+      // Check instance is enabled
+      if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
+        // Check whether it has instance group or not
+        // If it has instance group, check whether it belongs to that group or 
not
+        if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
+          enabledTagInstances.add(instance);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 09db616..0768b51 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -48,36 +48,37 @@ import org.apache.log4j.Logger;
 public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
   private static final Logger LOG = 
Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
 
-  @Override
-  public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
ClusterDataCache cache) {
-    return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, 
jobCtx);
+  @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, 
JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap) {
+    return getAllTaskPartitions(getTgtIdealState(jobCfg, idealStateMap), 
jobCfg, jobCtx);
   }
 
   @Override
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
-      Set<Integer> partitionSet, ClusterDataCache cache) {
-    IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+    IdealState tgtIs = getTgtIdealState(jobCfg, idealStateMap);
     if (tgtIs == null) {
       LOG.warn("Missing target resource for the scheduled job!");
       return Collections.emptyMap();
     }
     Set<String> tgtStates = jobCfg.getTargetPartitionStates();
     return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, 
tgtStates, partitionSet,
-        jobContext, cache);
+        jobContext);
   }
 
   /**
    * Gets the ideal state of the target resource of this job
    * @param jobCfg job config containing target resource id
-   * @param cache snapshot of the cluster containing the task and target 
resource
+   * @param idealStateMap the map of resource name map to ideal state
    * @return target resource ideal state, or null
    */
-  private static IdealState getTgtIdealState(JobConfig jobCfg, 
ClusterDataCache cache) {
+  private static IdealState getTgtIdealState(JobConfig jobCfg,
+      Map<String, IdealState> idealStateMap) {
     String tgtResourceId = jobCfg.getTargetResource();
-    return cache.getIdealState(tgtResourceId);
+    return idealStateMap.get(tgtResourceId);
   }
 
   /**
@@ -131,7 +132,7 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
    */
   private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
       CurrentStateOutput currStateOutput, Iterable<String> instances, 
IdealState tgtIs,
-      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, 
ClusterDataCache cache) {
+      Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
     Map<String, SortedSet<Integer>> result = new HashMap<String, 
SortedSet<Integer>>();
     for (String instance : instances) {
       result.put(instance, new TreeSet<Integer>());
@@ -153,18 +154,6 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
             continue;
           }
 
-          InstanceConfig instanceConfig = 
cache.getInstanceConfigMap().get(instance);
-
-          if (instanceConfig == null) {
-            LOG.error("Instance config not found for instance : " + instance);
-            continue;
-          }
-
-          if (!instanceConfig.getInstanceEnabled()) {
-            LOG.debug("Instance has been disabled, ignore instance : " + 
instance);
-            continue;
-          }
-
           String s =
               currStateOutput.getCurrentState(tgtIs.getResourceName(), new 
Partition(pName),
                   instance);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index 569fe03..1589c1a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -43,7 +43,7 @@ import org.apache.helix.model.ResourceAssignment;
   @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, 
JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
ClusterDataCache cache) {
     return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, 
cache.getIdealStates());
   }
 
   @Override public Map<String, SortedSet<Integer>> getTaskAssignment(
@@ -53,6 +53,6 @@ import org.apache.helix.model.ResourceAssignment;
       ClusterDataCache cache) {
     return taskAssignmentCalculator
         .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, 
jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache);
+            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index fbc7af3..58ba670 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -34,6 +34,7 @@ import java.util.TreeSet;
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.util.JenkinsHash;
@@ -52,7 +53,8 @@ public class GenericTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
 
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
ClusterDataCache cache) {
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap) {
     Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
     Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
     for (TaskConfig taskCfg : taskMap.values()) {
@@ -69,7 +71,7 @@ public class GenericTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
   public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput 
currStateOutput,
       ResourceAssignment prevAssignment, Collection<String> instances, 
JobConfig jobCfg,
       final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx,
-      Set<Integer> partitionSet, ClusterDataCache cache) {
+      Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
     // Gather input to the full auto rebalancing algorithm
     LinkedHashMap<String, Integer> states = new LinkedHashMap<String, 
Integer>();
     states.put("ONLINE", 1);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
index 6a005b9..1720fbb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -42,7 +42,7 @@ public class GenericTaskRebalancer extends 
DeprecatedTaskRebalancer {
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
ClusterDataCache cache) {
     return taskAssignmentCalculator
-        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, 
cache.getIdealStates());
   }
 
   @Override
@@ -52,6 +52,6 @@ public class GenericTaskRebalancer extends 
DeprecatedTaskRebalancer {
       Set<Integer> partitionSet, ClusterDataCache cache) {
     return taskAssignmentCalculator
         .getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, 
jobContext,
-            workflowCfg, workflowCtx, partitionSet, cache);
+            workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 378ad95..cf7f5e6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -131,9 +131,8 @@ public class JobRebalancer extends TaskRebalancer {
     // Fetch the previous resource assignment from the property store. This is 
required because of
     // HELIX-230.
     Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
-        ? clusterData.getLiveInstances().keySet()
-        : 
clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(),
-            jobCfg.getInstanceGroupTag());
+        ? clusterData.getAllEnabledLiveInstances()
+        : 
clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
       LOG.error("No available instance found for job!");
@@ -222,8 +221,8 @@ public class JobRebalancer extends TaskRebalancer {
 
     // Process all the current assignments of tasks.
     TaskAssignmentCalculator taskAssignmentCal = 
getAssignmentCalulator(jobCfg);
-    Set<Integer> allPartitions =
-        taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, 
workflowCtx, cache);
+    Set<Integer> allPartitions = taskAssignmentCal
+        .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, 
cache.getIdealStates());
 
     if (allPartitions == null || allPartitions.isEmpty()) {
       // Empty target partitions, mark the job as FAILED.
@@ -424,7 +423,7 @@ public class JobRebalancer extends TaskRebalancer {
       // Get instance->[partition, ...] mappings for the target resource.
       Map<String, SortedSet<Integer>> tgtPartitionAssignments = 
taskAssignmentCal
           .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, 
jobCfg, jobCtx,
-              workflowConfig, workflowCtx, allPartitions, cache);
+              workflowConfig, workflowCtx, allPartitions, 
cache.getIdealStates());
       for (Map.Entry<String, SortedSet<Integer>> entry : 
taskAssignments.entrySet()) {
         String instance = entry.getKey();
         if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index a3ed5ab..a6a9ed3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -2,6 +2,7 @@ package org.apache.helix.task;
 
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.Collection;
@@ -17,11 +18,12 @@ public abstract class TaskAssignmentCalculator {
    * @param jobCtx the task context
    * @param workflowCfg the workflow configuration
    * @param workflowCtx the workflow context
-   * @param cache cluster snapshot
+   * @param idealStateMap the map of resource name map to ideal state
    * @return set of partition numbers
    */
   public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, 
JobContext jobCtx,
-      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
ClusterDataCache cache);
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      Map<String, IdealState> idealStateMap);
 
   /**
    * Compute an assignment of tasks to instances
@@ -34,12 +36,12 @@ public abstract class TaskAssignmentCalculator {
    * @param workflowCfg the workflow configuration
    * @param workflowCtx the workflow context
    * @param partitionSet the partitions to assign
-   * @param cache cluster snapshot
+   * @param idealStateMap the map of resource name map to ideal state
    * @return map of instances to set of partition numbers
    */
   public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
       CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
       Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> 
partitionSet,
-      ClusterDataCache cache);
+      Map<String, IdealState> idealStateMap);
 }

Reply via email to