This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 82513fa90ccc4e0612726a8853df80988d3cc9dc
Author: Huizhi Lu <[email protected]>
AuthorDate: Wed Jun 9 21:01:30 2021 -0700

    Add management mode pipeline registry and switch logic (#1769)
    
    Management Mode Pipeline will help check the cluster status and determine 
whether the default pipelines can be run.
    One use case is, it will help controller to decide when it can exit the 
cluster freeze mode.
    
    This commit adds management mode pipeline and logic to switch from/to the 
default resource/task pipelines.
---
 .../helix/controller/GenericHelixController.java   | 151 ++++++++++++++++-----
 .../ManagementControllerDataProvider.java}         |  28 +---
 .../apache/helix/controller/pipeline/Pipeline.java |   9 +-
 .../helix/controller/stages/ClusterEventType.java  |   1 +
 .../controller/stages/ManagementModeStage.java     |  48 +++++++
 .../controller/stages/ResourceValidationStage.java |  10 ++
 .../main/java/org/apache/helix/util/HelixUtil.java |  12 ++
 .../java/org/apache/helix/util/RebalanceUtil.java  |  22 +++
 8 files changed, 223 insertions(+), 58 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 11cd9ee..7da6ef0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -63,6 +63,7 @@ import 
org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
 import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import 
org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
@@ -81,7 +82,7 @@ import 
org.apache.helix.controller.stages.CustomizedViewAggregationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.ManagementModeStage;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -141,6 +142,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
   private static final int ASYNC_TASKS_THREADPOOL_SIZE = 10;
   private final PipelineRegistry _registry;
   private final PipelineRegistry _taskRegistry;
+  private final PipelineRegistry _managementModeRegistry;
 
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
@@ -163,6 +165,11 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
   private final ClusterEventBlockingQueue _taskEventQueue;
   private final ClusterEventProcessor _taskEventThread;
 
+  // Controller will switch to run management mode pipeline when set to true.
+  private boolean _inManagementMode;
+  private final ClusterEventBlockingQueue _managementModeEventQueue;
+  private final ClusterEventProcessor _managementModeEventThread;
+
   private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> 
_asyncFIFOWorkerPool;
 
   private long _continuousRebalanceFailureCount = 0;
@@ -197,6 +204,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
    */
   private final ResourceControllerDataProvider _resourceControlDataProvider;
   private final WorkflowControllerDataProvider _workflowControlDataProvider;
+  private final ManagementControllerDataProvider 
_managementControllerDataProvider;
   private final ScheduledExecutorService _asyncTasksThreadPool;
 
   /**
@@ -285,13 +293,21 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
 
   public GenericHelixController(String clusterName) {
     this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
-        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName,
+        createTaskRegistry(Pipeline.Type.TASK.name()),
+        createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), 
clusterName,
         Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   public GenericHelixController(String clusterName, Set<Pipeline.Type> 
enabledPipelins) {
     this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
-        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, 
enabledPipelins);
+        createTaskRegistry(Pipeline.Type.TASK.name()),
+        createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+        clusterName,
+        enabledPipelins);
+  }
+
+  public void setInManagementMode(boolean enabled) {
+    _inManagementMode = enabled;
   }
 
   class RebalanceTask extends TimerTask {
@@ -566,6 +582,8 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       registry
           .register(ClusterEventType.OnDemandRebalance, dataRefresh, 
autoExitMaintenancePipeline,
               dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.ControllerChange, dataRefresh, 
autoExitMaintenancePipeline,
+          dataPreprocess, externalViewPipeline, rebalancePipeline);
       // TODO: We now include rebalance pipeline in customized state change 
for correctness.
       // However, it is not efficient, and we should improve this by splitting 
the pipeline or
       // controller roles to multiple hosts.
@@ -625,22 +643,49 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
           rebalancePipeline);
       registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, 
dataPreprocess,
           rebalancePipeline);
+      registry.register(ClusterEventType.ControllerChange, dataRefresh, 
dataPreprocess,
+          rebalancePipeline);
+      return registry;
+    }
+  }
+
+  private static PipelineRegistry createManagementModeRegistry(String 
pipelineName) {
+    logger.info("Creating management mode registry");
+    synchronized (GenericHelixController.class) {
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline(pipelineName);
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // cluster management mode process
+      Pipeline managementMode = new Pipeline(pipelineName);
+      managementMode.addStage(new ManagementModeStage());
+
+      PipelineRegistry registry = new PipelineRegistry();
+      Arrays.asList(
+          ClusterEventType.ControllerChange,
+          ClusterEventType.LiveInstanceChange,
+          ClusterEventType.MessageChange,
+          ClusterEventType.OnDemandRebalance,
+          ClusterEventType.PeriodicalRebalance
+      ).forEach(type -> registry.register(type, dataRefresh, managementMode));
+
       return registry;
     }
   }
 
   // TODO: refactor the constructor as providing both registry but only 
enabling one looks confusing
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry 
taskRegistry) {
-    this(registry, taskRegistry, null, Sets.newHashSet(
-        Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
+    this(registry, taskRegistry, 
createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+        null, Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   private GenericHelixController(PipelineRegistry registry, PipelineRegistry 
taskRegistry,
-      final String clusterName, Set<Pipeline.Type> enabledPipelineTypes) {
-    _paused = false;
+      PipelineRegistry managementModeRegistry, final String clusterName,
+      Set<Pipeline.Type> enabledPipelineTypes) {
     _enabledPipelineTypes = enabledPipelineTypes;
     _registry = registry;
     _taskRegistry = taskRegistry;
+    _managementModeRegistry = managementModeRegistry;
     _lastSeenInstances = new AtomicReference<>();
     _lastSeenSessions = new AtomicReference<>();
     _lastSeenCustomizedStateTypesMapRef = new AtomicReference<>();
@@ -660,6 +705,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
     _onDemandRebalanceTimer =
         new Timer("GenericHelixController_" + _clusterName + 
"_onDemand_Timer", true);
 
+    // TODO: refactor to simplify below similar code of the 3 pipelines
     // initialize pipelines at the end so we have everything else prepared
     if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
       logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
@@ -689,6 +735,16 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       _taskEventThread = null;
     }
 
+    logger.info("Initializing {} pipeline", 
Pipeline.Type.MANAGEMENT_MODE.name());
+    _managementControllerDataProvider =
+        new ManagementControllerDataProvider(clusterName, 
Pipeline.Type.MANAGEMENT_MODE.name());
+    _managementModeEventQueue = new ClusterEventBlockingQueue();
+    _managementModeEventThread =
+        new ClusterEventProcessor(_managementControllerDataProvider, 
_managementModeEventQueue,
+            Pipeline.Type.MANAGEMENT_MODE.name() + "-" + clusterName);
+    initPipeline(_managementModeEventThread, 
_managementControllerDataProvider);
+    logger.info("Initialized {} pipeline", 
Pipeline.Type.MANAGEMENT_MODE.name());
+
     addController(this);
   }
 
@@ -776,12 +832,36 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
 
     _helixManager = manager;
 
-    // TODO If init controller with paused = true, it may not take effect 
immediately
-    // _paused is default false. If any events come before 
controllerChangeEvent, the controller
-    // will be excuting in un-paused mode. Which might not be the config in ZK.
-    if (_paused) {
-      logger.info("Cluster " + manager.getClusterName() + " is paused. 
Ignoring the event:" + event
-          .getEventType());
+    // Prepare ClusterEvent
+    // TODO (harry): this is a temporal workaround - after controller is 
separated we should not
+    // have this instanceof clauses
+    List<Pipeline> pipelines;
+    boolean isTaskFrameworkPipeline = false;
+    Pipeline.Type pipelineType;
+
+    if (dataProvider instanceof ResourceControllerDataProvider) {
+      pipelines = _registry.getPipelinesForEvent(event.getEventType());
+      pipelineType = Pipeline.Type.DEFAULT;
+    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
+      pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
+      isTaskFrameworkPipeline = true;
+      pipelineType = Pipeline.Type.TASK;
+    } else if (dataProvider instanceof ManagementControllerDataProvider) {
+      pipelines = 
_managementModeRegistry.getPipelinesForEvent(event.getEventType());
+      pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+    } else {
+      logger.warn(String
+          .format("No %s pipeline to run for event: %s::%s", 
dataProvider.getPipelineName(),
+              event.getEventType(), event.getEventId()));
+      return;
+    }
+
+    // Should not run management mode and default/task pipelines at the same 
time.
+    if ((_inManagementMode && 
!Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
+        || (!_inManagementMode && 
Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) {
+      logger.info("Should not run management mode and default/task pipelines 
at the same time. "
+              + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring 
the event: {}",
+          manager.getClusterName(), _inManagementMode, pipelineType, 
event.getEventType());
       return;
     }
 
@@ -808,26 +888,6 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
 
     dataProvider.setClusterEventId(event.getEventId());
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), 
_lastPipelineEndTimestamp);
-
-    // Prepare ClusterEvent
-    // TODO (harry): this is a temporal workaround - after controller is 
separated we should not
-    // have this instanceof clauses
-    List<Pipeline> pipelines;
-    boolean isTaskFrameworkPipeline = false;
-
-    if (dataProvider instanceof ResourceControllerDataProvider) {
-      pipelines = _registry
-          .getPipelinesForEvent(event.getEventType());
-    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
-      pipelines = _taskRegistry
-          .getPipelinesForEvent(event.getEventType());
-      isTaskFrameworkPipeline = true;
-    } else {
-      logger.warn(String
-          .format("No %s pipeline to run for event: %s::%s", 
dataProvider.getPipelineName(),
-              event.getEventType(), event.getEventId()));
-      return;
-    }
     event.addAttribute(AttributeName.ControllerDataProvider.name(), 
dataProvider);
 
     logger.info("START: Invoking {} controller pipeline for cluster: {}. Event 
type: {}, ID: {}. "
@@ -1202,6 +1262,10 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
     if (_workflowControlDataProvider != null) {
       _workflowControlDataProvider.notifyDataChange(type, path);
     }
+
+    if (_managementControllerDataProvider != null) {
+      _managementControllerDataProvider.notifyDataChange(type, path);
+    }
   }
 
   private void requestDataProvidersFullRefresh() {
@@ -1212,6 +1276,10 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
     if (_workflowControlDataProvider != null) {
       _workflowControlDataProvider.requireFullRefresh();
     }
+
+    if (_managementControllerDataProvider != null) {
+      _managementControllerDataProvider.requireFullRefresh();
+    }
   }
 
   private void pushToEventQueues(ClusterEventType eventType, 
NotificationContext changeContext,
@@ -1228,6 +1296,14 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
     for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) {
       event.addAttribute(attr.getKey(), attr.getValue());
     }
+
+    // Management mode event will force management mode pipeline.
+    if (_inManagementMode) {
+      event.setEventId(uid + "_" + Pipeline.Type.MANAGEMENT_MODE.name());
+      enqueueEvent(_managementModeEventQueue, event);
+      return;
+    }
+
     enqueueEvent(_eventQueue, event);
     enqueueEvent(_taskEventQueue,
         event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
@@ -1269,7 +1345,10 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       boolean prevInMaintenanceMode = _inMaintenanceMode;
       _paused = updateControllerState(pauseSignal, _paused);
       _inMaintenanceMode = updateControllerState(maintenanceSignal, 
_inMaintenanceMode);
-      triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode);
+      // TODO: remove triggerResumeEvent when moving pause/maintenance to 
management pipeline
+      if (!triggerResumeEvent(changeContext, prevPaused, 
prevInMaintenanceMode)) {
+        pushToEventQueues(ClusterEventType.ControllerChange, changeContext, 
Collections.emptyMap());
+      }
 
       enableClusterStatusMonitor(true);
       _clusterStatusMonitor.setEnabled(!_paused);
@@ -1484,7 +1563,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
    * @param prevPaused the previous paused status.
    * @param prevInMaintenanceMode the previous in maintenance mode status.
    */
-  private void triggerResumeEvent(NotificationContext changeContext, boolean 
prevPaused,
+  private boolean triggerResumeEvent(NotificationContext changeContext, 
boolean prevPaused,
       boolean prevInMaintenanceMode) {
     /**
      * WARNING: the logic here is tricky.
@@ -1496,7 +1575,9 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
     if (!_paused && (prevPaused || (prevInMaintenanceMode && 
!_inMaintenanceMode))) {
       pushToEventQueues(ClusterEventType.Resume, changeContext, 
Collections.EMPTY_MAP);
       logger.info("controller is now resumed from paused/maintenance state");
+      return true;
     }
+    return false;
   }
 
   // TODO: refactor this to use common/ClusterEventProcessor.
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
similarity index 62%
copy from 
helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
copy to 
helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index cd0ce60..d178ca5 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -1,4 +1,4 @@
-package org.apache.helix.controller.stages;
+package org.apache.helix.controller.dataproviders;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,25 +19,9 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-public enum ClusterEventType {
-  IdealStateChange,
-  CurrentStateChange,
-  TaskCurrentStateChange,
-  CustomizedStateChange,
-  ConfigChange,
-  ClusterConfigChange,
-  ResourceConfigChange,
-  InstanceConfigChange,
-  CustomizeStateConfigChange,
-  LiveInstanceChange,
-  MessageChange,
-  ExternalViewChange,
-  CustomizedViewChange,
-  TargetExternalViewChange,
-  Resume,
-  PeriodicalRebalance,
-  OnDemandRebalance,
-  RetryRebalance,
-  StateVerifier,
-  Unknown
+public class ManagementControllerDataProvider extends 
BaseControllerDataProvider {
+  // TODO: implement this class to only refresh required event types
+  public ManagementControllerDataProvider(String clusterName, String name) {
+    super(clusterName, name);
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index ecf42da..4196e23 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -35,7 +35,14 @@ public class Pipeline {
 
   public enum Type {
     DEFAULT,
-    TASK
+    TASK,
+
+    /**
+     * A pipeline used to manage the cluster when it is in admin management 
mode:
+     * cluster freeze mode, controller pause mode, etc. Used by Helix 
internally,
+     * not meant to be used for Helix external users.
+     */
+    MANAGEMENT_MODE
   }
 
   public Pipeline() {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index cd0ce60..65f6bb4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -37,6 +37,7 @@ public enum ClusterEventType {
   Resume,
   PeriodicalRebalance,
   OnDemandRebalance,
+  ControllerChange,
   RetryRebalance,
   StateVerifier,
   Unknown
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
new file mode 100644
index 0000000..512224d
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import 
org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks the cluster status whether the cluster is in management mode.
+ */
+public class ManagementModeStage extends AbstractBaseStage {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ManagementModeStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    // TODO: implement the stage
+    String clusterName = event.getClusterName();
+    ManagementControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    if (!HelixUtil.inManagementMode(cache)) {
+      LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+      RebalanceUtil.enableManagementMode(clusterName, false);
+      throw new StageException("Exiting management mode pipeline for cluster " 
+ clusterName);
+    }
+  }
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 075c40f..a4d4783 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,14 @@ public class ResourceValidationStage extends 
AbstractBaseStage {
     if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". 
Requires DataCache");
     }
+
+    // Check if cluster is still in management mode. Eg. there exists any 
frozen live instance.
+    if (HelixUtil.inManagementMode(cache)) {
+      // Trigger an immediate management mode pipeline.
+      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+      throw new StageException("Pipeline should not be run because cluster is 
in management mode");
+    }
+
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
       throw new StageException("Resources must be computed prior to 
validation!");
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index ce22c5f..3151716 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -38,6 +38,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -547,4 +548,15 @@ public final class HelixUtil {
     instanceConfig.setInstanceEnabled(true);
     return instanceConfig;
   }
+
+  /**
+   * Checks whether or not the cluster is in management mode.
+   *
+   * @param cache
+   * @return
+   */
+  public static boolean inManagementMode(BaseControllerDataProvider cache) {
+    // TODO: implement the logic. Parameters can also change
+    return true;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 91ec406..db2b76f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -145,6 +145,28 @@ public class RebalanceUtil {
     return result;
   }
 
+  /**
+   * Enables/disables controller to run management mode pipeline.
+   *
+   * @param clusterName target cluster name
+   * @param enabled enable/disable controller to management mode pipeline
+   */
+  public static void enableManagementMode(String clusterName, boolean enabled) 
{
+    GenericHelixController leaderController =
+        GenericHelixController.getLeaderController(clusterName);
+    if (leaderController != null) {
+      LOG.info("Switching management mode pipeline for cluster={}, 
enabled={}", clusterName,
+          enabled);
+      leaderController.setInManagementMode(enabled);
+    } else {
+      LOG.error("Failed to switch management mode pipeline, enabled={}. "
+          + "Controller for cluster {} does not exist", clusterName, enabled);
+    }
+
+    // Triggers an event to immediately run the pipeline
+    scheduleOnDemandPipeline(clusterName, 0L);
+  }
+
   public static void scheduleOnDemandPipeline(String clusterName, long delay) {
     scheduleOnDemandPipeline(clusterName, delay, true);
   }

Reply via email to