For performance improvement, we need to differentiate the task pipeline and 
regular pipeline.

1. Split the task pipeline out from regular pipeline.
2. Remove unnecessary stages for Task pipeline which are independent from 
previous outputs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c45d3a66
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c45d3a66
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c45d3a66

Branch: refs/heads/master
Commit: c45d3a66f55dcddb2b95a1872e76f3030572d2c7
Parents: fc868b3
Author: Junkai Xue <[email protected]>
Authored: Thu Jun 7 17:37:54 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Mon Sep 17 15:45:04 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 56 ++++++++++++++++++--
 1 file changed, 53 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c45d3a66/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8d1e44b..9f94755 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -150,12 +150,12 @@ public class GenericHelixController implements 
IdealStateChangeListener,
    */
   public GenericHelixController() {
     this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createDefaultRegistry(PipelineTypes.TASK.name()));
+        createTaskRegistry(PipelineTypes.TASK.name()));
   }
 
   public GenericHelixController(String clusterName) {
     this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createDefaultRegistry(PipelineTypes.TASK.name()), clusterName);
+        createTaskRegistry(PipelineTypes.TASK.name()), clusterName);
   }
 
   class RebalanceTask extends TimerTask {
@@ -281,13 +281,63 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, 
dataPreprocess, rebalancePipeline);
       registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, 
liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.MessageChange, dataRefresh, 
dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.ExternalViewChange, dataRefresh);
       registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, 
externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, 
dataPreprocess, externalViewPipeline, rebalancePipeline);
       return registry;
     }
   }
 
+  private static PipelineRegistry createTaskRegistry(String pipelineName) {
+    logger.info("createDefaultRegistry");
+    synchronized (GenericHelixController.class) {
+      PipelineRegistry registry = new PipelineRegistry();
+
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline(pipelineName);
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // data pre-process pipeline
+      Pipeline dataPreprocess = new Pipeline(pipelineName);
+      dataPreprocess.addStage(new ResourceComputationStage());
+      dataPreprocess.addStage(new ResourceValidationStage());
+      dataPreprocess.addStage(new CurrentStateComputationStage());
+
+      // rebalance pipeline
+      // TODO: Junkai will work on refactoring existing pipeline log into 
abstract logic and
+      // extend the logic to separate pipeline
+      Pipeline rebalancePipeline = new Pipeline(pipelineName);
+      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new IntermediateStateCalcStage());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageSelectionStage());
+      rebalancePipeline.addStage(new MessageThrottleStage());
+      rebalancePipeline.addStage(new TaskAssignmentStage());
+
+      // backward compatibility check
+      Pipeline liveInstancePipeline = new Pipeline(pipelineName);
+      liveInstancePipeline.addStage(new CompatibilityCheckStage());
+
+      registry.register(ClusterEventType.IdealStateChange, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, 
liveInstancePipeline,
+          dataPreprocess, rebalancePipeline);
+      registry
+          .register(ClusterEventType.MessageChange, dataRefresh, 
dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, 
rebalancePipeline);
+      registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      return registry;
+    }
+  }
+
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry 
taskRegistry) {
     this(registry, taskRegistry, null);
   }

Reply via email to