This is an automated email from the ASF dual-hosted git repository. hzlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit f9fba2ac59ecdd6baeaeb005c4d0e428ab76c3c6 Author: Huizhi Lu <[email protected]> AuthorDate: Sat Jun 12 01:00:25 2021 -0700 Move pause and maintenance handling out of controller (#1793) With management mode pipeline, the pause and maintenance signals handling logic should be moved out of the onControllerChange() and moved to the management mode pipeline. This commit handles pause/maintenance signals enable/disable and update cluster status accordingly. --- .../helix/controller/GenericHelixController.java | 80 +++------------------- .../dataproviders/BaseControllerDataProvider.java | 21 +++++- .../ManagementControllerDataProvider.java | 24 ++++++- .../controller/stages/ManagementModeStage.java | 16 +++-- .../controller/stages/ResourceValidationStage.java | 29 ++++++-- .../main/java/org/apache/helix/model/Message.java | 5 ++ .../BestPossibleExternalViewVerifier.java | 2 +- .../main/java/org/apache/helix/util/HelixUtil.java | 35 ++++++++-- .../java/org/apache/helix/util/RebalanceUtil.java | 4 +- 9 files changed, 119 insertions(+), 97 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 7da6ef0..6cc1e5f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -103,9 +103,7 @@ import org.apache.helix.model.CustomizedStateConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.Message; -import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.ClusterEventMonitor; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; @@ -177,13 +175,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns private long _continuousTaskRebalanceFailureCount = 0; /** - * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent() - * will be no-op. Other event handling logic keeps the same when the flag is set. - */ - private boolean _paused; - private boolean _inMaintenanceMode; - - /** * The executors that can periodically run the rebalancing pipeline. A * SingleThreadScheduledExecutor will start if there is resource group that has the config to do * periodically rebalance. @@ -837,18 +828,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns // have this instanceof clauses List<Pipeline> pipelines; boolean isTaskFrameworkPipeline = false; - Pipeline.Type pipelineType; + boolean isManagementPipeline = false; if (dataProvider instanceof ResourceControllerDataProvider) { pipelines = _registry.getPipelinesForEvent(event.getEventType()); - pipelineType = Pipeline.Type.DEFAULT; } else if (dataProvider instanceof WorkflowControllerDataProvider) { pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType()); isTaskFrameworkPipeline = true; - pipelineType = Pipeline.Type.TASK; } else if (dataProvider instanceof ManagementControllerDataProvider) { pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType()); - pipelineType = Pipeline.Type.MANAGEMENT_MODE; + isManagementPipeline = true; } else { logger.warn(String .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(), @@ -857,11 +846,10 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns } // Should not run management mode and default/task pipelines at the same time. - if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType)) - || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) { + if (_inManagementMode != isManagementPipeline) { logger.info("Should not run management mode and default/task pipelines at the same time. " - + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}", - manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType()); + + "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}", + manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType()); return; } @@ -881,6 +869,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig()); } if (_isMonitoring) { + _clusterStatusMonitor.setEnabled(!_inManagementMode); + _clusterStatusMonitor.setPaused(_inManagementMode); event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor); } } @@ -1335,25 +1325,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns } if (controllerIsLeader) { - HelixManager manager = changeContext.getManager(); - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - - PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause()); - MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance()); - boolean prevPaused = _paused; - boolean prevInMaintenanceMode = _inMaintenanceMode; - _paused = updateControllerState(pauseSignal, _paused); - _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode); - // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline - if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) { - pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap()); - } - enableClusterStatusMonitor(true); - _clusterStatusMonitor.setEnabled(!_paused); - _clusterStatusMonitor.setPaused(_paused); - _clusterStatusMonitor.setMaintenance(_inMaintenanceMode); + pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap()); } else { enableClusterStatusMonitor(false); // Note that onControllerChange is executed in parallel with the event processing thread. It @@ -1543,43 +1516,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns } } - private boolean updateControllerState(PauseSignal signal, boolean statusFlag) { - if (signal != null) { - if (!statusFlag) { - statusFlag = true; - // This log is recorded for the first time entering PAUSE/MAINTENANCE mode - logger.info(String.format("controller is now %s", - (signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused")); - } - } else { - statusFlag = false; - } - return statusFlag; - } - - /** - * Trigger a Resume Event if the cluster is back to activated. - * @param changeContext - * @param prevPaused the previous paused status. - * @param prevInMaintenanceMode the previous in maintenance mode status. - */ - private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused, - boolean prevInMaintenanceMode) { - /** - * WARNING: the logic here is tricky. - * 1. Only resume if not paused. So if the Maintenance mode is removed but the cluster is still - * paused, the resume event should not be sent. - * 2. Only send resume event if the status is changed back to active. So we don't send multiple - * event unnecessarily. - */ - if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) { - pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP); - logger.info("controller is now resumed from paused/maintenance state"); - return true; - } - return false; - } - // TODO: refactor this to use common/ClusterEventProcessor. @Deprecated private class ClusterEventProcessor extends Thread { diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index 6ce25e2..3c705f4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -57,6 +57,7 @@ import org.apache.helix.model.LiveInstance; import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.Message; import org.apache.helix.model.ParticipantHistory; +import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.task.TaskConstants; @@ -87,6 +88,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { private boolean _updateInstanceOfflineTime = true; private MaintenanceSignal _maintenanceSignal; + private PauseSignal _pauseSignal; private boolean _isMaintenanceModeEnabled; private boolean _hasMaintenanceSignalChanged; private ExecutorService _asyncTasksThreadPool; @@ -300,8 +302,9 @@ public class BaseControllerDataProvider implements ControlContextProvider { } } - private void updateMaintenanceInfo(final HelixDataAccessor accessor) { + private void refreshManagementSignals(final HelixDataAccessor accessor) { _maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance()); + _pauseSignal = accessor.getProperty(accessor.keyBuilder().pause()); _isMaintenanceModeEnabled = _maintenanceSignal != null; // The following flag is to guarantee that there's only one update per pineline run because we // check for whether maintenance recovery could happen twice every pipeline @@ -373,7 +376,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { refreshResourceConfig(accessor, refreshedTypes); _stateModelDefinitionCache.refresh(accessor); _clusterConstraintsCache.refresh(accessor); - updateMaintenanceInfo(accessor); + refreshManagementSignals(accessor); timeoutNodesDuringMaintenance(accessor, _clusterConfig, _isMaintenanceModeEnabled); // TODO: once controller gets split, only one controller should update offline instance history @@ -622,6 +625,16 @@ public class BaseControllerDataProvider implements ControlContextProvider { } /** + * Gets all messages for each instance. + * + * @return Map of {instanceName -> Collection of Message}. + */ + public Map<String, Collection<Message>> getAllInstancesMessages() { + return getAllInstances().stream().collect( + Collectors.toMap(instance -> instance, instance -> getMessages(instance).values())); + } + + /** * This function is supposed to be only used by testing purpose for safety. For "get" usage, * please use getStaleMessagesByInstance. */ @@ -968,6 +981,10 @@ public class BaseControllerDataProvider implements ControlContextProvider { return _maintenanceSignal; } + public PauseSignal getPauseSignal() { + return _pauseSignal; + } + protected StringBuilder genCacheContentStringBuilder() { StringBuilder sb = new StringBuilder(); sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n"); diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java index d178ca5..fe940ff 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java @@ -19,9 +19,27 @@ package org.apache.helix.controller.dataproviders; * under the License. */ +import java.util.Arrays; +import java.util.List; + +import org.apache.helix.HelixConstants; + +/** + * Data provider for controller management mode pipeline. + */ public class ManagementControllerDataProvider extends BaseControllerDataProvider { - // TODO: implement this class to only refresh required event types - public ManagementControllerDataProvider(String clusterName, String name) { - super(clusterName, name); + // Only these types of properties are refreshed for the full refresh request. + private static final List<HelixConstants.ChangeType> FULL_REFRESH_PROPERTIES = + Arrays.asList(HelixConstants.ChangeType.LIVE_INSTANCE, HelixConstants.ChangeType.MESSAGE); + + public ManagementControllerDataProvider(String clusterName, String pipelineName) { + super(clusterName, pipelineName); + } + + @Override + public void requireFullRefresh() { + for (HelixConstants.ChangeType type : FULL_REFRESH_PROPERTIES) { + _propertyDataChangedMap.get(type).set(true); + } } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java index 512224d..042aa14 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java @@ -19,9 +19,9 @@ package org.apache.helix.controller.stages; * under the License. */ +import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; -import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.RebalanceUtil; import org.slf4j.Logger; @@ -36,13 +36,21 @@ public class ManagementModeStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { // TODO: implement the stage + _eventId = event.getEventId(); String clusterName = event.getClusterName(); ManagementControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); - if (!HelixUtil.inManagementMode(cache)) { - LOG.info("Exiting management mode pipeline for cluster {}", clusterName); + + // TODO: move to the last stage of management pipeline + checkInManagementMode(clusterName, cache); + } + + private void checkInManagementMode(String clusterName, ManagementControllerDataProvider cache) { + // Should exit management mode + if (!HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(), + cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) { + LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for cluster " + clusterName); RebalanceUtil.enableManagementMode(clusterName, false); - throw new StageException("Exiting management mode pipeline for cluster " + clusterName); } } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java index a4d4783..613ce2e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java @@ -28,6 +28,7 @@ import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.IdealState; import org.apache.helix.model.Resource; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.RebalanceUtil; import org.slf4j.Logger; @@ -44,12 +45,7 @@ public class ResourceValidationStage extends AbstractBaseStage { throw new StageException("Missing attributes in event:" + event + ". Requires DataCache"); } - // Check if cluster is still in management mode. Eg. there exists any frozen live instance. - if (HelixUtil.inManagementMode(cache)) { - // Trigger an immediate management mode pipeline. - RebalanceUtil.enableManagementMode(event.getClusterName(), true); - throw new StageException("Pipeline should not be run because cluster is in management mode"); - } + processManagementMode(event, cache); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); if (resourceMap == null) { @@ -91,6 +87,27 @@ public class ResourceValidationStage extends AbstractBaseStage { } } + private void processManagementMode(ClusterEvent event, BaseControllerDataProvider cache) + throws StageException { + // Set cluster status monitor for maintenance mode + ClusterStatusMonitor monitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); + if (monitor != null) { + monitor.setMaintenance(cache.isMaintenanceModeEnabled()); + } + + // Check if cluster is still in management mode. Eg. there exists any frozen live instance. + if (HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(), + cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) { + // Trigger an immediate management mode pipeline. + LogUtil.logInfo(LOG, _eventId, + "Enabling management mode pipeline for cluster " + event.getClusterName()); + RebalanceUtil.enableManagementMode(event.getClusterName(), true); + throw new StageException( + "Pipeline should not be run because cluster " + event.getClusterName() + + "is in management mode"); + } + } + /** * Check if the ideal state adheres to a rule * @param idealState the ideal state to check diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index b509506..6222df5 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -59,6 +59,7 @@ public class Message extends HelixProperty { NO_OP, PARTICIPANT_ERROR_REPORT, PARTICIPANT_SESSION_CHANGE, + PARTICIPANT_STATUS_CHANGE, CHAINED_MESSAGE, // this is a message subtype RELAYED_MESSAGE } @@ -927,6 +928,10 @@ public class Message extends HelixProperty { return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name()); } + public boolean isParticipantStatusChangeType() { + return MessageType.PARTICIPANT_STATUS_CHANGE.name().equalsIgnoreCase(getMsgType()); + } + /** * Get the {@link PropertyKey} for this message * @param keyBuilder PropertyKey Builder diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index eaab1a2..3b133a1 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -416,7 +416,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { */ private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider cache, Set<String> resources) throws Exception { - ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier); + ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.StateVerifier); event.addAttribute(AttributeName.ControllerDataProvider.name(), cache); RebalanceUtil.runStage(event, new ResourceComputationStage()); diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java index 3151716..c770583 100644 --- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java @@ -21,6 +21,7 @@ package org.apache.helix.util; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -38,7 +39,6 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyType; import org.apache.helix.controller.common.PartitionStateMap; -import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.AbstractRebalancer; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; @@ -60,6 +60,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.Partition; +import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; @@ -550,13 +551,33 @@ public final class HelixUtil { } /** - * Checks whether or not the cluster is in management mode. + * Checks whether or not the cluster is in management mode. It checks: + * - pause signal + * - live instances: whether any live instance is not in normal status, eg. frozen. + * - messages: whether live instance has a participant status change message * - * @param cache - * @return + * @param pauseSignal pause signal + * @param liveInstanceMap map of live instances + * @param enabledLiveInstances set of enabled live instance names. They should be all included + * in the liveInstanceMap. + * @param instancesMessages a map of all instances' messages. + * @return true if cluster is in management mode; otherwise, false */ - public static boolean inManagementMode(BaseControllerDataProvider cache) { - // TODO: implement the logic. Parameters can also change - return true; + public static boolean inManagementMode(PauseSignal pauseSignal, + Map<String, LiveInstance> liveInstanceMap, Set<String> enabledLiveInstances, + Map<String, Collection<Message>> instancesMessages) { + // Check pause signal and abnormal live instances (eg. in freeze mode) + // TODO: should check maintenance signal when moving maintenance to management pipeline + return pauseSignal != null || enabledLiveInstances.stream().anyMatch( + instance -> isInstanceInManagementMode(instance, liveInstanceMap, instancesMessages)); + } + + private static boolean isInstanceInManagementMode(String instance, + Map<String, LiveInstance> liveInstanceMap, + Map<String, Collection<Message>> instancesMessages) { + // Check live instance status and participant status change message + return LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus()) + || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream() + .anyMatch(Message::isParticipantStatusChangeType)); } } diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java index db2b76f..f74b98f 100644 --- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java @@ -159,8 +159,8 @@ public class RebalanceUtil { enabled); leaderController.setInManagementMode(enabled); } else { - LOG.error("Failed to switch management mode pipeline, enabled={}. " - + "Controller for cluster {} does not exist", clusterName, enabled); + throw new HelixException(String.format("Failed to switch management mode pipeline, " + + "enabled=%s. Controller for cluster %s does not exist", enabled, clusterName)); } // Triggers an event to immediately run the pipeline
