This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 313affc2e020068a1e5e7c5e92224d3299b1a404 Author: narendly <[email protected]> AuthorDate: Mon Feb 25 18:05:34 2019 -0800 [HELIX-798] HELIX: Implement auto-exit of maintenance mode This diff contains the implementation and integration of a feature addition for Helix: auto-exit of maintenance mode. Changelist: 1. BestPossibleCalcStage logic was modified so that it will use a new API 2. IntermediateCalcStage logic was modified to check if the cluster is in maintenance first 3. enableMaintenance() API was deprecated and replaced with auto/manual APIs while preserving backward-compatibility 4. An async stage (MaintenanceRecoveryStage) was created and added to the resource pipeline 5. A series of integration tests were added for various exit/non-exit scenarios --- .../src/main/java/org/apache/helix/HelixAdmin.java | 25 +++ .../helix/controller/GenericHelixController.java | 11 +- .../dataproviders/BaseControllerDataProvider.java | 9 +- .../helix/controller/pipeline/AsyncWorkerType.java | 1 + .../stages/BestPossibleStateCalcStage.java | 5 +- .../stages/IntermediateStateCalcStage.java | 11 +- .../stages/MaintenanceRecoveryStage.java | 171 +++++++++++++++++ .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 68 ++++++- .../helix/manager/zk/ZKHelixDataAccessor.java | 5 +- .../java/org/apache/helix/model/ClusterConfig.java | 15 +- .../org/apache/helix/model/MaintenanceSignal.java | 36 +++- .../controller/TestClusterMaintenanceMode.java | 206 ++++++++++++++++++++- .../java/org/apache/helix/mock/MockHelixAdmin.java | 13 ++ 13 files changed, 547 insertions(+), 29 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 5cb8883..42932b4 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -275,21 +275,46 @@ public interface HelixAdmin { void enableCluster(String clusterName, boolean enabled, String reason); /** + * **Deprecated: use autoEnableMaintenanceMode or manuallyEnableMaintenanceMode instead** * Enable or disable maintenance mode for a cluster * @param clusterName * @param enabled */ + @Deprecated void enableMaintenanceMode(String clusterName, boolean enabled); /** + * **Deprecated: use autoEnableMaintenanceMode or manuallyEnableMaintenanceMode instead** * Enable or disable maintenance mode for a cluster * @param clusterName * @param enabled * @param reason */ + @Deprecated void enableMaintenanceMode(String clusterName, boolean enabled, String reason); /** + * Automatically enable maintenance mode. To be called by the Controller pipeline. + * @param clusterName + * @param enabled + * @param reason + * @param internalReason + */ + void autoEnableMaintenanceMode(String clusterName, boolean enabled, String reason, + MaintenanceSignal.AutoTriggerReason internalReason); + + /** + * Manually enable maintenance mode. To be called by the REST client that accepts KV mappings as + * the payload. + * @param clusterName + * @param enabled + * @param reason + * @param customFields user-specified KV mappings to be stored in the ZNode + */ + void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, String reason, + Map<String, String> customFields); + + /** * Reset a list of partitions in error state for an instance * The partitions are assume to be in error state and reset will bring them from error * to initial state. An error to initial state transition is required for reset. diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 4e691ed..1ba26f1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -66,6 +66,7 @@ import org.apache.helix.controller.stages.CompatibilityCheckStage; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.ExternalViewComputeStage; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MaintenanceRecoveryStage; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.PersistAssignmentStage; @@ -318,15 +319,19 @@ public class GenericHelixController implements IdealStateChangeListener, Pipeline liveInstancePipeline = new Pipeline(pipelineName); liveInstancePipeline.addStage(new CompatibilityCheckStage()); + // auto-exit maintenance mode if applicable + Pipeline autoExitMaintenancePipeline = new Pipeline(pipelineName); + autoExitMaintenancePipeline.addStage(new MaintenanceRecoveryStage()); + registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); - registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); - registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); - registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); return registry; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index a1cf763..8d2a4a5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -76,6 +76,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { private ClusterConfig _clusterConfig; private boolean _updateInstanceOfflineTime = true; + private MaintenanceSignal _maintenanceSignal; private boolean _isMaintenanceModeEnabled; private ExecutorService _asyncTasksThreadPool; @@ -268,8 +269,8 @@ public class BaseControllerDataProvider implements ControlContextProvider { } private void updateMaintenanceInfo(final HelixDataAccessor accessor) { - MaintenanceSignal maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance()); - _isMaintenanceModeEnabled = maintenanceSignal != null; + _maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance()); + _isMaintenanceModeEnabled = _maintenanceSignal != null; } private void updateIdealRuleMap() { @@ -689,6 +690,10 @@ public class BaseControllerDataProvider implements ControlContextProvider { return _isMaintenanceModeEnabled; } + public MaintenanceSignal getMaintenanceSignal() { + return _maintenanceSignal; + } + protected StringBuilder genCacheContentStringBuilder() { StringBuilder sb = new StringBuilder(); sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n"); diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java index f8d9967..ac938dc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java @@ -30,5 +30,6 @@ public enum AsyncWorkerType { TargetExternalViewCalcWorker, PersistAssignmentWorker, ExternalViewComputeWorker, + MaintenanceRecoveryWorker, TaskJobPurgeWorker } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index a81d617..b508796 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -38,6 +38,7 @@ import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -183,8 +184,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { if (manager != null) { if (manager.getHelixDataAccessor() .getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) { - manager.getClusterManagmentTool() - .enableMaintenanceMode(manager.getClusterName(), true, errMsg); + manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(), + true, errMsg, MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); LogUtil.logWarn(logger, _eventId, errMsg); } } else { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index cda7e62..16c3fe3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -41,6 +41,7 @@ import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; +import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.StateModelDefinition; @@ -241,8 +242,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { + "stop the rebalance and put the cluster %s into maintenance mode", instance, maxPartitionPerInstance, cache.getClusterName()); if (manager != null) { - manager.getClusterManagmentTool().enableMaintenanceMode(manager.getClusterName(), - true, errMsg); + if (manager.getHelixDataAccessor() + .getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) { + manager.getClusterManagmentTool().autoEnableMaintenanceMode( + manager.getClusterName(), true, errMsg, + MaintenanceSignal.AutoTriggerReason.MAX_PARTITION_PER_INSTANCE_EXCEEDED); + } LogUtil.logWarn(logger, _eventId, errMsg); } else { LogUtil.logError(logger, _eventId, @@ -257,7 +262,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { clusterStatusMonitor.setResourceRebalanceStates(Collections.singletonList(resource), ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); } - + // Throw an exception here so that messages won't be sent out based on this mapping throw new HelixException(errMsg); } instancePartitionCounts.put(instance, partitionCount); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java new file mode 100644 index 0000000..11d3c92 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java @@ -0,0 +1,171 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.LogUtil; +import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.AsyncWorkerType; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.MaintenanceSignal; +import org.apache.helix.model.Partition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MaintenanceRecoveryStage extends AbstractAsyncBaseStage { + private static Logger LOG = LoggerFactory.getLogger(MaintenanceRecoveryStage.class); + + @Override + public AsyncWorkerType getAsyncWorkerType() { + return AsyncWorkerType.MaintenanceRecoveryWorker; + } + + @Override + public void execute(final ClusterEvent event) throws Exception { + // Check the cache is there + ResourceControllerDataProvider cache = + event.getAttribute(AttributeName.ControllerDataProvider.name()); + if (cache == null) { + return; + } + + // Check for the maintenance signal + // If it was entered manually or the signal is null (which shouldn't happen), skip this stage + MaintenanceSignal maintenanceSignal = cache.getMaintenanceSignal(); + if (maintenanceSignal == null || maintenanceSignal + .getTriggeringEntity() != MaintenanceSignal.TriggeringEntity.CONTROLLER) { + return; + } + + HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); + if (manager == null || !manager.isConnected()) { + LogUtil.logInfo(LOG, _eventId, + "MaintenanceRecoveryStage failed due to HelixManager being null or not connected!"); + return; + } + + // At this point, the cluster entered maintenance mode automatically. Retrieve the + // auto-triggering reason + MaintenanceSignal.AutoTriggerReason internalReason = maintenanceSignal.getAutoTriggerReason(); + boolean shouldExitMaintenance; + String reason; + switch (internalReason) { + case MAX_OFFLINE_INSTANCES_EXCEEDED: + // Check on the number of offline/disabled instances + int numOfflineInstancesForAutoExit = + cache.getClusterConfig().getNumOfflineInstancesForAutoExit(); + if (numOfflineInstancesForAutoExit < 0) { + return; // Config is not set, no auto-exit + } + // Get the count of all instances that are either offline or disabled + int offlineDisabledCount = + cache.getAllInstances().size() - cache.getEnabledLiveInstances().size(); + shouldExitMaintenance = offlineDisabledCount <= numOfflineInstancesForAutoExit; + reason = String.format( + "Auto-exiting maintenance mode for cluster %s; Num. of offline/disabled instances is %d, less than or equal to the exit threshold %d", + event.getClusterName(), offlineDisabledCount, numOfflineInstancesForAutoExit); + break; + case MAX_PARTITION_PER_INSTANCE_EXCEEDED: + IntermediateStateOutput intermediateStateOutput = + event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + if (intermediateStateOutput == null) { + return; + } + shouldExitMaintenance = !violatesMaxPartitionsPerInstance(cache, intermediateStateOutput); + reason = String.format( + "Auto-exiting maintenance mode for cluster %s; All instances have fewer or equal number of partitions than maxPartitionsPerInstance threshold.", + event.getClusterName()); + break; + default: + shouldExitMaintenance = false; + reason = ""; + } + if (shouldExitMaintenance) { + // The cluster has recovered sufficiently, so proceed to exit the maintenance mode by removing + // MaintenanceSignal. AutoTriggerReason won't be recorded + manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(), false, + reason, internalReason); + LogUtil.logInfo(LOG, _eventId, reason); + } + } + + /** + * Check that the intermediateStateOutput assignment does not violate maxPartitionPerInstance + * threshold. + * @param cache + * @param intermediateStateOutput + * @return true if violation is found, false otherwise. + */ + private boolean violatesMaxPartitionsPerInstance(ResourceControllerDataProvider cache, + IntermediateStateOutput intermediateStateOutput) { + int maxPartitionPerInstance = cache.getClusterConfig().getMaxPartitionsPerInstance(); + if (maxPartitionPerInstance <= 0) { + // Config is not set; return + return false; + } + + Map<String, PartitionStateMap> resourceStatesMap = + intermediateStateOutput.getResourceStatesMap(); + Map<String, Integer> instancePartitionCounts = new HashMap<>(); + + for (String resource : resourceStatesMap.keySet()) { + IdealState idealState = cache.getIdealState(resource); + if (idealState != null + && idealState.getStateModelDefRef().equals(BuiltInStateModelDefinitions.Task.name())) { + // Ignore task here. Task has its own throttling logic + continue; + } + + PartitionStateMap partitionStateMap = resourceStatesMap.get(resource); + Map<Partition, Map<String, String>> stateMaps = partitionStateMap.getStateMap(); + for (Partition p : stateMaps.keySet()) { + Map<String, String> stateMap = stateMaps.get(p); + for (String instance : stateMap.keySet()) { + // If this replica is in DROPPED state, do not count it in the partition count since it is + // to be dropped + String state = stateMap.get(instance); + if (state.equals(HelixDefinedState.DROPPED.name())) { + continue; + } + if (!instancePartitionCounts.containsKey(instance)) { + instancePartitionCounts.put(instance, 0); + } + // Number of replicas (from different partitions) held in this instance + int partitionCount = instancePartitionCounts.get(instance); + partitionCount++; + if (partitionCount > maxPartitionPerInstance) { + // There exists an instance whose intermediate state assignment violates the maximum + // partitions per instance threshold, return! + return true; + } + instancePartitionCounts.put(instance, partitionCount); + } + } + } + // No violation found + return false; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 4262659..d388afe 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -82,6 +82,8 @@ import org.slf4j.LoggerFactory; public class ZKHelixAdmin implements HelixAdmin { public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec"; + private static final String MAINTENANCE_ZNODE_ID = "maintenance"; + private final HelixZkClient _zkClient; private final ConfigAccessor _configAccessor; @@ -380,25 +382,81 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override + @Deprecated public void enableMaintenanceMode(String clusterName, boolean enabled) { - enableMaintenanceMode(clusterName, enabled, null); + manuallyEnableMaintenanceMode(clusterName, enabled, null, null); } @Override + @Deprecated public void enableMaintenanceMode(String clusterName, boolean enabled, String reason) { - logger.info("Cluster {} {} maintenance mode for reason {}.", enabled ? "enters" : "exits", - clusterName, reason == null ? "NULL" : reason); + manuallyEnableMaintenanceMode(clusterName, enabled, reason, null); + } + + @Override + public void autoEnableMaintenanceMode(String clusterName, boolean enabled, String reason, + MaintenanceSignal.AutoTriggerReason internalReason) { + processMaintenanceMode(clusterName, enabled, reason, internalReason, null, + MaintenanceSignal.TriggeringEntity.CONTROLLER); + } + + @Override + public void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, String reason, + Map<String, String> customFields) { + processMaintenanceMode(clusterName, enabled, reason, + MaintenanceSignal.AutoTriggerReason.NOT_APPLICABLE, customFields, + MaintenanceSignal.TriggeringEntity.USER); + } + + /** + * Helper method for enabling/disabling maintenance mode. + * @param clusterName + * @param enabled + * @param reason + * @param internalReason + * @param customFields + * @param triggeringEntity + */ + private void processMaintenanceMode(String clusterName, boolean enabled, String reason, + MaintenanceSignal.AutoTriggerReason internalReason, Map<String, String> customFields, + MaintenanceSignal.TriggeringEntity triggeringEntity) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); Builder keyBuilder = accessor.keyBuilder(); - + logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName, + triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically" + : "manually", + enabled ? "enters" : "exits", reason == null ? "NULL" : reason); if (!enabled) { + // Exit maintenance mode accessor.removeProperty(keyBuilder.maintenance()); } else { - MaintenanceSignal maintenanceSignal = new MaintenanceSignal("maintenance"); + // Enter maintenance mode + MaintenanceSignal maintenanceSignal = new MaintenanceSignal(MAINTENANCE_ZNODE_ID); if (reason != null) { maintenanceSignal.setReason(reason); } + maintenanceSignal.setTimestamp(System.currentTimeMillis()); + maintenanceSignal.setTriggeringEntity(triggeringEntity); + switch (triggeringEntity) { + case CONTROLLER: + // autoEnable + maintenanceSignal.setAutoTriggerReason(internalReason); + break; + case USER: + case UNKNOWN: + // manuallyEnable + if (customFields != null && !customFields.isEmpty()) { + // Enter all custom fields provided by the user + Map<String, String> simpleFields = maintenanceSignal.getRecord().getSimpleFields(); + for (Map.Entry<String, String> entry : customFields.entrySet()) { + if (!simpleFields.containsKey(entry.getKey())) { + simpleFields.put(entry.getKey(), entry.getValue()); + } + } + } + break; + } if (!accessor.createMaintenance(maintenanceSignal)) { throw new HelixException("Failed to create maintenance signal"); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java index 1e2403f..c871573 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java @@ -115,9 +115,8 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { @Override public boolean createMaintenance(MaintenanceSignal maintenanceSignal) { - return _baseDataAccessor - .create(PropertyPathBuilder.maintenance(_clusterName), maintenanceSignal.getRecord(), - AccessOption.PERSISTENT); + return _baseDataAccessor.set(PropertyPathBuilder.maintenance(_clusterName), + maintenanceSignal.getRecord(), AccessOption.PERSISTENT); } @Override diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 920bd6a..713b18a 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -415,11 +415,12 @@ public class ClusterConfig extends HelixProperty { } /** - * Sets Maintenance recovery threshold so that the cluster could auto-exit maintenance mode. + * Sets the number of offline instances for auto-exit threshold so that MaintenanceRecoveryStage + * could use this number to determine whether the cluster could auto-exit maintenance mode. * Values less than 0 will disable auto-exit. * @param maintenanceRecoveryThreshold */ - public void setMaintenanceRecoveryThreshold(int maintenanceRecoveryThreshold) + public void setNumOfflineInstancesForAutoExit(int maintenanceRecoveryThreshold) throws HelixException { int maxOfflineInstancesAllowed = getMaxOfflineInstancesAllowed(); if (maxOfflineInstancesAllowed >= 0) { @@ -434,22 +435,22 @@ public class ClusterConfig extends HelixProperty { } /** - * Returns Maintenance recovery threshold. In order for the cluster to auto-exit maintenance mode, + * Returns number of offline instances for auto-exit threshold. In order for the cluster to + * auto-exit maintenance mode, * the number of offline/disabled instances must be less than or equal to this threshold. * -1 indicates that there will be no auto-exit. * @return */ - public int getMaintenanceRecoveryThreshold() { - return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), -1); + public int getNumOfflineInstancesForAutoExit() { + return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), + -1); } /** * Set the resource prioritization field. It should be Integer field and sortable. - * * IMPORTANT: The sorting order is DESCENDING order, which means the larger number will have * higher priority. If user did not set up the field in ResourceConfig or IdealState or the field * is not parseable, Helix will treat it as lowest priority. - * * @param priorityField */ public void setResourcePriorityField(String priorityField) { diff --git a/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java b/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java index 56eb826..41cbd0e 100644 --- a/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java +++ b/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java @@ -31,7 +31,8 @@ public class MaintenanceSignal extends PauseSignal { */ private enum MaintenanceSignalProperty { TRIGGERED_BY, - TIMESTAMP + TIMESTAMP, + AUTO_TRIGGER_REASON } /** @@ -43,6 +44,17 @@ public class MaintenanceSignal extends PauseSignal { UNKNOWN } + /** + * Reason for the maintenance mode being triggered automatically. This will allow checking more + * efficient because it will check against the exact condition for which the cluster entered + * maintenance mode. This field does not apply when triggered manually. + */ + public enum AutoTriggerReason { + MAX_OFFLINE_INSTANCES_EXCEEDED, + MAX_PARTITION_PER_INSTANCE_EXCEEDED, + NOT_APPLICABLE // Not triggered automatically or automatically exiting maintenance mode + } + public MaintenanceSignal(String id) { super(id); } @@ -51,8 +63,8 @@ public class MaintenanceSignal extends PauseSignal { super(record); } - public void setTriggeringEntity(String triggeringEntity) { - _record.setSimpleField(MaintenanceSignalProperty.TRIGGERED_BY.name(), triggeringEntity); + public void setTriggeringEntity(TriggeringEntity triggeringEntity) { + _record.setSimpleField(MaintenanceSignalProperty.TRIGGERED_BY.name(), triggeringEntity.name()); } /** @@ -68,6 +80,24 @@ public class MaintenanceSignal extends PauseSignal { } } + public void setAutoTriggerReason(AutoTriggerReason internalReason) { + _record.setSimpleField(MaintenanceSignalProperty.AUTO_TRIGGER_REASON.name(), + internalReason.name()); + } + + /** + * Returns auto-trigger reason. + * @return AutoTriggerReason.NOT_APPLICABLE if it was not triggered automatically + */ + public AutoTriggerReason getAutoTriggerReason() { + try { + return AutoTriggerReason + .valueOf(_record.getSimpleField(MaintenanceSignalProperty.AUTO_TRIGGER_REASON.name())); + } catch (Exception e) { + return AutoTriggerReason.NOT_APPLICABLE; + } + } + public void setTimestamp(long timestamp) { _record.setLongField(MaintenanceSignalProperty.TIMESTAMP.name(), timestamp); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java index 2eb8034..510cfe1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java @@ -1,18 +1,40 @@ package org.apache.helix.integration.controller; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.integration.task.TaskTestBase; import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.MaintenanceSignal; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestClusterMaintenanceMode extends TaskTestBase { - MockParticipantManager _newInstance; + private MockParticipantManager _newInstance; private String newResourceAddedDuringMaintenanceMode = String.format("%s_%s", WorkflowGenerator.DEFAULT_TGT_DB, 1); @@ -102,4 +124,186 @@ public class TestClusterMaintenanceMode extends TaskTestBase { Assert.assertTrue(stateMap.values().contains("MASTER")); } } + + /** + * Test that the auto-exit functionality works. + */ + @Test(dependsOnMethods = "testExitMaintenanceModeNewResourceRecovery") + public void testAutoExitMaintenanceMode() throws InterruptedException { + // Set the config for auto-exiting maintenance mode + ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME); + clusterConfig.setMaxOfflineInstancesAllowed(2); + clusterConfig.setNumOfflineInstancesForAutoExit(1); + _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig); + + // Kill 3 instances + for (int i = 0; i < 3; i++) { + _participants[i].syncStop(); + } + Thread.sleep(500L); + + // Check that the cluster is in maintenance + MaintenanceSignal maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNotNull(maintenanceSignal); + + // Now bring up 2 instances + for (int i = 0; i < 2; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + Thread.sleep(500L); + + // Check that the cluster is no longer in maintenance (auto-recovered) + maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNull(maintenanceSignal); + } + + @Test(dependsOnMethods = "testAutoExitMaintenanceMode") + public void testNoAutoExitWhenManuallyPutInMaintenance() throws InterruptedException { + // Manually put the cluster in maintenance + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); + + // Kill 2 instances, which makes it a total of 3 down instances + for (int i = 0; i < 2; i++) { + _participants[i].syncStop(); + } + Thread.sleep(500L); + + // Now bring up all instances + for (int i = 0; i < 3; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + Thread.sleep(500L); + + // The cluster should still be in maintenance because it was enabled manually + MaintenanceSignal maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNotNull(maintenanceSignal); + } + + /** + * Test that manual triggering of maintenance mode overrides auto-enabled maintenance. + * @throws InterruptedException + */ + @Test(dependsOnMethods = "testNoAutoExitWhenManuallyPutInMaintenance") + public void testManualEnablingOverridesAutoEnabling() throws InterruptedException { + // Exit maintenance mode manually + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); + + // Kill 3 instances, which would put cluster in maintenance automatically + for (int i = 0; i < 3; i++) { + _participants[i].syncStop(); + } + Thread.sleep(500L); + + // Check that maintenance signal was triggered by Controller + MaintenanceSignal maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNotNull(maintenanceSignal); + Assert.assertEquals(maintenanceSignal.getTriggeringEntity(), + MaintenanceSignal.TriggeringEntity.CONTROLLER); + + // Manually enable maintenance mode with customFields + Map<String, String> customFields = ImmutableMap.of("LDAP", "hulee", "JIRA", "HELIX-999", + "TRIGGERED_BY", "SHOULD NOT BE RECORDED"); + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, + customFields); + Thread.sleep(500L); + + // Check that maintenance mode has successfully overwritten with the right TRIGGERED_BY field + maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertEquals(maintenanceSignal.getTriggeringEntity(), + MaintenanceSignal.TriggeringEntity.USER); + for (Map.Entry<String, String> entry : customFields.entrySet()) { + if (entry.getKey().equals("TRIGGERED_BY")) { + Assert.assertEquals(maintenanceSignal.getRecord().getSimpleField(entry.getKey()), "USER"); + } else { + Assert.assertEquals(maintenanceSignal.getRecord().getSimpleField(entry.getKey()), + entry.getValue()); + } + } + } + + /** + * Test that maxNumPartitionPerInstance still applies (if any Participant has more replicas than + * the threshold, the cluster should not auto-exit maintenance mode). + * @throws InterruptedException + */ + @Test(dependsOnMethods = "testManualEnablingOverridesAutoEnabling") + public void testMaxPartitionLimit() throws InterruptedException { + // Manually exit maintenance mode + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, + null); + Thread.sleep(500L); + + // Since 3 instances are missing, the cluster should have gone back under maintenance + // automatically + MaintenanceSignal maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNotNull(maintenanceSignal); + Assert.assertEquals(maintenanceSignal.getTriggeringEntity(), + MaintenanceSignal.TriggeringEntity.CONTROLLER); + Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(), + MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); + + // Bring up all instances + for (int i = 0; i < 3; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + Thread.sleep(500L); + + // Check that the cluster exited maintenance + maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNull(maintenanceSignal); + + // Kill 3 instances, which would put cluster in maintenance automatically + for (int i = 0; i < 3; i++) { + _participants[i].syncStop(); + } + Thread.sleep(500L); + + // Check that cluster is back under maintenance + maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNotNull(maintenanceSignal); + Assert.assertEquals(maintenanceSignal.getTriggeringEntity(), + MaintenanceSignal.TriggeringEntity.CONTROLLER); + Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(), + MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); + + // Set the cluster config for auto-exiting maintenance mode + ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME); + // Setting MaxPartitionsPerInstance to 1 will prevent the cluster from exiting maintenance mode + // automatically because the instances currently have more than 1 + clusterConfig.setMaxPartitionsPerInstance(1); + _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig); + Thread.sleep(500L); + + // Now bring up all instances + for (int i = 0; i < 3; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + Thread.sleep(500L); + + // Check that the cluster is still in maintenance (should not have auto-exited because it would + // fail the MaxPartitionsPerInstance check) + maintenanceSignal = _manager.getHelixDataAccessor() + .getProperty(_manager.getHelixDataAccessor().keyBuilder().maintenance()); + Assert.assertNotNull(maintenanceSignal); + Assert.assertEquals(maintenanceSignal.getTriggeringEntity(), + MaintenanceSignal.TriggeringEntity.CONTROLLER); + Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(), + MaintenanceSignal.AutoTriggerReason.MAX_PARTITION_PER_INSTANCE_EXCEEDED); + } } diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 23b0df6..6586cf9 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -37,6 +37,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.StateModelDefinition; public class MockHelixAdmin implements HelixAdmin { @@ -267,6 +268,18 @@ public class MockHelixAdmin implements HelixAdmin { } + @Override + public void autoEnableMaintenanceMode(String clusterName, boolean enabled, String reason, + MaintenanceSignal.AutoTriggerReason internalReason) { + + } + + @Override + public void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, String reason, + Map<String, String> customFields) { + + } + @Override public void resetPartition(String clusterName, String instanceName, String resourceName, List<String> partitionNames) {
