This is an automated email from the ASF dual-hosted git repository. hzlu pushed a commit to branch cluster-pause-mode in repository https://gitbox.apache.org/repos/asf/helix.git
commit b8f2331e91a984c372e747b44aecd1cc2d23347c Author: Huizhi Lu <[email protected]> AuthorDate: Tue Jun 22 11:22:49 2021 -0700 Check cluster management mode status (#1798) Controller needs to know the participant freeze status so it can send freeze/unfreeze messages for entering/exiting freeze mode. The status check is done in management mode stage. This commit adds methods to check cluster management mode status, and update the status and history accordingly. --- .../main/java/org/apache/helix/PropertyKey.java | 2 +- .../java/org/apache/helix/PropertyPathBuilder.java | 1 + .../pipeline/PipelineSwitchException.java | 29 +++++ .../controller/stages/ManagementModeStage.java | 134 +++++++++++++++++++ .../controller/stages/ResourceValidationStage.java | 6 +- .../java/org/apache/helix/model/ClusterStatus.java | 5 + .../org/apache/helix/model/ControllerHistory.java | 8 +- .../controller/stages/TestManagementModeStage.java | 142 +++++++++++++++++++++ 8 files changed, 322 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java index 254cb95..eb5d5c1 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java @@ -247,7 +247,7 @@ public class PropertyKey { * @return {@link PropertyKey} */ public PropertyKey clusterStatus() { - return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName); + return new PropertyKey(PropertyType.STATUS, ClusterStatus.class, _clusterName, _clusterName); } /** diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java index 34efd29..5c63304 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java @@ -89,6 +89,7 @@ public class PropertyPathBuilder { addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}"); addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}"); addEntry(PropertyType.STATUS, 1, "/{clusterName}/STATUS"); + addEntry(PropertyType.STATUS, 2, "/{clusterName}/STATUS/{clusterName}"); addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW"); addEntry(PropertyType.TARGETEXTERNALVIEW, 2, diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java new file mode 100644 index 0000000..1584708 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineSwitchException.java @@ -0,0 +1,29 @@ +package org.apache.helix.controller.pipeline; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Used to exit the current controller pipeline and switch to another pipeline. + */ +public class PipelineSwitchException extends StageException { + public PipelineSwitchException(String message) { + super(message); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java index 042aa14..94ff1d3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java @@ -19,11 +19,31 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.time.Instant; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; +import org.apache.helix.api.status.ClusterManagementMode; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.model.ClusterStatus; +import org.apache.helix.model.ControllerHistory; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.LiveInstance.LiveInstanceStatus; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.model.PauseSignal; import org.apache.helix.util.HelixUtil; import org.apache.helix.util.RebalanceUtil; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +58,23 @@ public class ManagementModeStage extends AbstractBaseStage { // TODO: implement the stage _eventId = event.getEventId(); String clusterName = event.getClusterName(); + HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); + if (manager == null) { + throw new StageException("HelixManager attribute value is null"); + } + + HelixDataAccessor accessor = manager.getHelixDataAccessor(); ManagementControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); + ClusterManagementMode managementMode = + checkClusterFreezeStatus(cache.getEnabledLiveInstances(), cache.getLiveInstances(), + cache.getAllInstancesMessages(), cache.getPauseSignal()); + + recordClusterStatus(managementMode, accessor); + recordManagementModeHistory(managementMode, cache.getPauseSignal(), manager.getInstanceName(), + accessor); + // TODO: move to the last stage of management pipeline checkInManagementMode(clusterName, cache); } @@ -53,4 +87,104 @@ public class ManagementModeStage extends AbstractBaseStage { RebalanceUtil.enableManagementMode(clusterName, false); } } + + // Checks cluster freeze, controller pause mode and status. + private ClusterManagementMode checkClusterFreezeStatus( + Set<String> enabledLiveInstances, + Map<String, LiveInstance> liveInstanceMap, + Map<String, Collection<Message>> allInstanceMessages, + PauseSignal pauseSignal) { + ClusterManagementMode.Type type; + ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED; + if (pauseSignal == null) { + // TODO: Should check maintenance mode after it's moved to management pipeline. + type = ClusterManagementMode.Type.NORMAL; + if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances, + allInstanceMessages)) { + status = ClusterManagementMode.Status.IN_PROGRESS; + } + } else if (pauseSignal.isClusterPause()) { + type = ClusterManagementMode.Type.CLUSTER_PAUSE; + if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) { + status = ClusterManagementMode.Status.IN_PROGRESS; + } + } else { + type = ClusterManagementMode.Type.CONTROLLER_PAUSE; + } + + return new ClusterManagementMode(type, status); + } + + private boolean instancesFullyFrozen(Set<String> enabledLiveInstances, + Map<String, LiveInstance> liveInstanceMap, + Map<String, Collection<Message>> allInstanceMessages) { + // 1. All live instances are frozen + // 2. No pending participant status change message. + return enabledLiveInstances.stream().noneMatch( + instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus()) + || hasPendingMessage(allInstanceMessages.get(instance), + MessageType.PARTICIPANT_STATUS_CHANGE)); + } + + private boolean hasPendingMessage(Collection<Message> messages, MessageType type) { + return messages != null && messages.stream() + .anyMatch(message -> type.name().equals(message.getMsgType())); + } + + private void recordClusterStatus(ClusterManagementMode mode, HelixDataAccessor accessor) { + // update cluster status + PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus(); + ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey); + if (clusterStatus == null) { + clusterStatus = new ClusterStatus(); + } + + ClusterManagementMode.Type recordedType = clusterStatus.getManagementMode(); + ClusterManagementMode.Status recordedStatus = clusterStatus.getManagementModeStatus(); + + // If there is any pending message sent by users, status could be computed as in progress. + // Skip recording status change to avoid confusion after cluster is already fully frozen. + if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(recordedType) + && ClusterManagementMode.Status.COMPLETED.equals(recordedStatus) + && ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode()) + && ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())) { + LOG.info("Skip recording status mode={}, status={}, because cluster is fully frozen", + mode.getMode(), mode.getStatus()); + return; + } + + if (!mode.getMode().equals(recordedType) || !mode.getStatus().equals(recordedStatus)) { + // Only update status when it's different with metadata store + clusterStatus.setManagementMode(mode.getMode()); + clusterStatus.setManagementModeStatus(mode.getStatus()); + if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) { + LOG.error("Failed to update cluster status {}", clusterStatus); + } + } + } + + private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal, + String controllerName, HelixDataAccessor accessor) { + // Only record completed status + if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) { + return; + } + + // Record a management mode history in controller history + String path = accessor.keyBuilder().controllerLeaderHistory().getPath(); + long timestamp = Instant.now().toEpochMilli(); + String fromHost = (pauseSignal == null ? null : pauseSignal.getFromHost()); + String reason = (pauseSignal == null ? null : pauseSignal.getReason()); + + // Need the updater to avoid race condition with controller/maintenance history updates. + if (!accessor.getBaseDataAccessor().update(path, oldRecord -> { + if (oldRecord == null) { + oldRecord = new ZNRecord(PropertyType.HISTORY.toString()); + } + return new ControllerHistory(oldRecord) + .updateManagementModeHistory(controllerName, mode, fromHost, timestamp, reason); + }, AccessOption.PERSISTENT)) { + LOG.error("Failed to write management mode history to ZK!"); + } + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java index 613ce2e..696506a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.PipelineSwitchException; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.IdealState; import org.apache.helix.model.Resource; @@ -102,9 +103,10 @@ public class ResourceValidationStage extends AbstractBaseStage { LogUtil.logInfo(LOG, _eventId, "Enabling management mode pipeline for cluster " + event.getClusterName()); RebalanceUtil.enableManagementMode(event.getClusterName(), true); - throw new StageException( + // TODO: redesign to terminate and switch pipeline more peacefully + throw new PipelineSwitchException( "Pipeline should not be run because cluster " + event.getClusterName() - + "is in management mode"); + + " is in management mode"); } } diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java index 6ed354c..a405fe3 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterStatus.java @@ -22,6 +22,7 @@ package org.apache.helix.model; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyType; import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.zookeeper.datamodel.ZNRecord; /** * Represents the cluster status. It can have fields for @@ -32,6 +33,10 @@ public class ClusterStatus extends HelixProperty { super(PropertyType.STATUS.name()); } + public ClusterStatus(ZNRecord record) { + super(record); + } + public enum ClusterStatusProperty { MANAGEMENT_MODE, MANAGEMENT_MODE_STATUS diff --git a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java index 4e418c3..47b0958 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java +++ b/helix-core/src/main/java/org/apache/helix/model/ControllerHistory.java @@ -162,8 +162,12 @@ public class ControllerHistory extends HelixProperty { historyEntry.put(ConfigProperty.TIME.name(), Instant.ofEpochMilli(time).toString()); historyEntry.put(ManagementModeConfigKey.MODE.name(), mode.getMode().name()); historyEntry.put(ManagementModeConfigKey.STATUS.name(), mode.getStatus().name()); - historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost); - historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason); + if (fromHost != null) { + historyEntry.put(PauseSignal.PauseSignalProperty.FROM_HOST.name(), fromHost); + } + if (reason != null) { + historyEntry.put(PauseSignal.PauseSignalProperty.REASON.name(), reason); + } return populateHistoryEntries(HistoryType.MANAGEMENT_MODE, historyEntry.toString()); } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java new file mode 100644 index 0000000..28ca524 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java @@ -0,0 +1,142 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; + +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.api.status.ClusterManagementModeRequest; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider; +import org.apache.helix.controller.pipeline.Pipeline; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.ClusterStatus; +import org.apache.helix.model.LiveInstance; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestManagementModeStage extends ZkTestBase { + HelixManager _manager; + HelixDataAccessor _accessor; + String _clusterName; + + @BeforeClass + public void beforeClass() { + _clusterName = "CLUSTER_" + TestHelper.getTestClassName(); + _accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + _manager = new DummyClusterManager(_clusterName, _accessor); + } + + @AfterClass + public void afterClass() { + deleteLiveInstances(_clusterName); + deleteCluster(_clusterName); + } + + @Test + public void testClusterFreezeStatus() throws Exception { + // ideal state: node0 is MASTER, node1 is SLAVE + // replica=2 means 1 master and 1 slave + setupIdealState(_clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2); + List<LiveInstance> liveInstances = setupLiveInstances(_clusterName, new int[]{0, 1}); + setupStateModel(_clusterName); + + ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Unknown); + ManagementControllerDataProvider cache = new ManagementControllerDataProvider(_clusterName, + Pipeline.Type.MANAGEMENT_MODE.name()); + event.addAttribute(AttributeName.helixmanager.name(), _manager); + event.addAttribute(AttributeName.ControllerDataProvider.name(), cache); + + // Freeze cluster + ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder() + .withClusterName(_clusterName) + .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE) + .withReason("test") + .build(); + _gSetupTool.getClusterManagementTool().setClusterManagementMode(request); + + Pipeline dataRefresh = new Pipeline(); + dataRefresh.addStage(new ReadClusterDataStage()); + runPipeline(event, dataRefresh, false); + ManagementModeStage managementModeStage = new ManagementModeStage(); + managementModeStage.process(event); + + // In frozen mode + ClusterStatus clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus()); + Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.CLUSTER_PAUSE); + + + // Mark a live instance to be pause state + LiveInstance liveInstance = liveInstances.get(0); + liveInstance.setStatus(LiveInstance.LiveInstanceStatus.PAUSED); + PropertyKey liveInstanceKey = + _accessor.keyBuilder().liveInstance(liveInstance.getInstanceName()); + _accessor.updateProperty(liveInstanceKey, liveInstance); + // Require cache refresh + cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE); + + // Unfreeze cluster + request = ClusterManagementModeRequest.newBuilder() + .withClusterName(_clusterName) + .withMode(ClusterManagementMode.Type.NORMAL) + .withReason("test") + .build(); + _gSetupTool.getClusterManagementTool().setClusterManagementMode(request); + runPipeline(event, dataRefresh, false); + managementModeStage.process(event); + clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus()); + + Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL); + // In progress because a live instance is still frozen + Assert.assertEquals(clusterStatus.getManagementModeStatus(), + ClusterManagementMode.Status.IN_PROGRESS); + + // remove froze status to mark the live instance to be normal status + liveInstance = _accessor.getProperty(liveInstanceKey); + liveInstance.getRecord().getSimpleFields() + .remove(LiveInstance.LiveInstanceProperty.STATUS.name()); + _accessor.setProperty(liveInstanceKey, liveInstance); + // Require cache refresh + cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE); + runPipeline(event, dataRefresh, false); + try { + managementModeStage.process(event); + } catch (HelixException expected) { + // It's expected because controller does not set for cluster. + Assert.assertTrue(expected.getMessage() + .startsWith("Failed to switch management mode pipeline, enabled=false")); + } + clusterStatus = _accessor.getProperty(_accessor.keyBuilder().clusterStatus()); + + // Fully existed frozen mode + Assert.assertEquals(clusterStatus.getManagementMode(), ClusterManagementMode.Type.NORMAL); + Assert.assertEquals(clusterStatus.getManagementModeStatus(), + ClusterManagementMode.Status.COMPLETED); + } +}
