This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch cluster-pause-mode
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/cluster-pause-mode by this
push:
new c464832 Add message generation logic for management pipeline (#1803)
c464832 is described below
commit c464832bb9c3e418f6c0cfae4e046d0c74fa05fa
Author: Huizhi Lu <[email protected]>
AuthorDate: Mon Jun 28 22:53:46 2021 -0700
Add message generation logic for management pipeline (#1803)
In cluster freeze mode, controller sends freeze/unfreeze and pending ST
cancellation messages to participants.
In this commit, the existing message generation stage is leveraged to
create the ST cancellation messages.
Best possible out is built by copying the state map from current state for
generating cancellation messages.
For freeze/unfreeze messages, the logic is added in
ManagementMessageGenerationPhase.
The existing MessageDispatchStage is also used for dispatching all the
messages.
---
.../helix/api/status/ClusterManagementMode.java | 24 ++++
.../helix/controller/GenericHelixController.java | 11 +-
.../helix/controller/stages/AttributeName.java | 3 +
.../stages/ManagementMessageDispatchStage.java | 62 +++++++++
.../stages/ManagementMessageGenerationPhase.java | 119 ++++++++++++++++++
.../controller/stages/ManagementModeStage.java | 22 ++--
.../helix/controller/stages/MessageOutput.java | 11 +-
.../java/org/apache/helix/model/LiveInstance.java | 3 +-
.../java/org/apache/helix/util/RebalanceUtil.java | 27 ++++
.../stages/TestManagementMessageGeneration.java | 140 +++++++++++++++++++++
.../controller/stages/TestManagementModeStage.java | 2 +
11 files changed, 410 insertions(+), 14 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
index cbd2019..86817d1 100644
---
a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
+++
b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
@@ -19,6 +19,8 @@ package org.apache.helix.api.status;
* under the License.
*/
+import org.apache.helix.model.LiveInstance;
+
/**
* Represents the management mode of the cluster:
* 1. what type of mode it targets to be;
@@ -73,4 +75,26 @@ public class ClusterManagementMode {
public Type getMode() {
return mode;
}
+
+ public boolean isFullyInNormalMode() {
+ return Type.NORMAL.equals(mode) && Status.COMPLETED.equals(status);
+ }
+
+ /**
+ * Gets the desired live instance status for this management mode.
+ *
+ * @return The desired {@link
org.apache.helix.model.LiveInstance.LiveInstanceStatus}.
+ * If participants status is not expected to change for the management
mode, null is returned.
+ */
+ public LiveInstance.LiveInstanceStatus getDesiredParticipantStatus() {
+ switch (mode) {
+ case CLUSTER_PAUSE:
+ return LiveInstance.LiveInstanceStatus.PAUSED;
+ case NORMAL:
+ return LiveInstance.LiveInstanceStatus.NORMAL;
+ default:
+ // Other modes don't need to change participant status
+ return null;
+ }
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 00bdfcc..95416d4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -82,6 +82,8 @@ import
org.apache.helix.controller.stages.CustomizedViewAggregationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
+import org.apache.helix.controller.stages.ManagementMessageDispatchStage;
+import org.apache.helix.controller.stages.ManagementMessageGenerationPhase;
import org.apache.helix.controller.stages.ManagementModeStage;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
@@ -648,9 +650,16 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
Pipeline dataRefresh = new Pipeline(pipelineName);
dataRefresh.addStage(new ReadClusterDataStage());
+ // data pre-process pipeline
+ Pipeline dataPreprocess = new Pipeline(pipelineName);
+ dataPreprocess.addStage(new ResourceComputationStage());
+ dataPreprocess.addStage(new CurrentStateComputationStage());
+
// cluster management mode process
Pipeline managementMode = new Pipeline(pipelineName);
managementMode.addStage(new ManagementModeStage());
+ managementMode.addStage(new ManagementMessageGenerationPhase());
+ managementMode.addStage(new ManagementMessageDispatchStage());
PipelineRegistry registry = new PipelineRegistry();
Arrays.asList(
@@ -659,7 +668,7 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
ClusterEventType.MessageChange,
ClusterEventType.OnDemandRebalance,
ClusterEventType.PeriodicalRebalance
- ).forEach(type -> registry.register(type, dataRefresh, managementMode));
+ ).forEach(type -> registry.register(type, dataRefresh, dataPreprocess,
managementMode));
return registry;
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index d9ca40b..8771bff 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -45,6 +45,9 @@ public enum AttributeName {
/** This is the cluster manager's session id when event is received. */
EVENT_SESSION,
+ /** Represents cluster's status, used in management mode pipeline. */
+ CLUSTER_STATUS,
+
// This attribute should only be used in TaskGarbageCollectionStage, misuse
could cause race conditions.
TO_BE_PURGED_WORKFLOWS,
// This attribute should only be used in TaskGarbageCollectionStage, misuse
could cause race conditions.
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageDispatchStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageDispatchStage.java
new file mode 100644
index 0000000..ca7aa31
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageDispatchStage.java
@@ -0,0 +1,62 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.controller.LogUtil;
+import
org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.model.Message;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatches participant status change and pending state transition
cancellation messages
+ * for the management pipeline.
+ */
+public class ManagementMessageDispatchStage extends MessageDispatchStage {
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagementMessageDispatchStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ MessageOutput messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_ALL.name());
+ processEvent(event, messageOutput);
+
+ // Send participant status change messages.
+ HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
+ List<Message> messagesSent =
+ super.sendMessages(manager.getHelixDataAccessor(),
messageOutput.getStatusChangeMessages());
+ ManagementControllerDataProvider cache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ cache.cacheMessages(messagesSent);
+
+ // Can exit management mode pipeline after fully in normal mode
+ ClusterManagementMode managementMode =
event.getAttribute(AttributeName.CLUSTER_STATUS.name());
+ if (managementMode.isFullyInNormalMode()) {
+ LogUtil.logInfo(LOG, _eventId,
+ "Exiting management mode pipeline for cluster " +
event.getClusterName());
+ RebalanceUtil.enableManagementMode(event.getClusterName(), false);
+ }
+ }
+}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
new file mode 100644
index 0000000..4479d50
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
@@ -0,0 +1,119 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.controller.LogUtil;
+import
org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.util.MessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates participant status change (freeze/unfreeze) and pending state
transition cancellation
+ * messages for management mode pipeline.
+ */
+public class ManagementMessageGenerationPhase extends MessageGenerationPhase {
+ private static final Logger LOG =
LoggerFactory.getLogger(ManagementMessageGenerationPhase.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ _eventId = event.getEventId();
+ String clusterName = event.getClusterName();
+ HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
+ ClusterManagementMode managementMode =
event.getAttribute(AttributeName.CLUSTER_STATUS.name());
+ ManagementControllerDataProvider cache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ if (manager == null || managementMode == null || cache == null) {
+ throw new StageException("Missing attributes in event: " + event
+ + ". Requires HelixManager|ClusterStatus|DataCache");
+ }
+
+ PauseSignal pauseSignal = cache.getPauseSignal();
+ if (cache.getClusterConfig().isStateTransitionCancelEnabled()
+ && pauseSignal != null && pauseSignal.getCancelPendingST()) {
+ // Generate ST cancellation messages.
+ LogUtil.logInfo(LOG, _eventId,
+ "Generating ST cancellation messages for cluster " + clusterName);
+ super.process(event);
+ }
+
+ MessageOutput messageOutput =
+ event.getAttributeWithDefault(AttributeName.MESSAGES_ALL.name(), new
MessageOutput());
+ // Is participant status change still in progress? Create messages
+ if
(!ClusterManagementMode.Status.COMPLETED.equals(managementMode.getStatus())) {
+ LogUtil.logInfo(LOG, _eventId, "Generating messages as cluster " +
clusterName
+ + " is still in progress to change participant status");
+ List<Message> messages = generateStatusChangeMessages(managementMode,
+ cache.getEnabledLiveInstances(), cache.getLiveInstances(),
+ cache.getAllInstancesMessages(), manager.getInstanceName(),
manager.getSessionId());
+ messageOutput.addStatusChangeMessages(messages);
+ }
+
+ event.addAttribute(AttributeName.MESSAGES_ALL.name(), messageOutput);
+ }
+
+ private List<Message> generateStatusChangeMessages(ClusterManagementMode
managementMode,
+ Set<String> enabledLiveInstances, Map<String, LiveInstance>
liveInstanceMap,
+ Map<String, Collection<Message>> allInstanceMessages, String
managerInstance,
+ String managerSessionId) {
+ List<Message> messagesGenerated = new ArrayList<>();
+
+ LiveInstanceStatus desiredStatus =
managementMode.getDesiredParticipantStatus();
+
+ // Check status and pending status change messages for all enabled live
instances.
+ // Send freeze/unfreeze messages if necessary
+ for (String instanceName : enabledLiveInstances) {
+ LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+ Collection<Message> pendingMessages =
allInstanceMessages.get(instanceName);
+ String sessionId = liveInstance.getEphemeralOwner();
+ LiveInstanceStatus currentStatus = liveInstance.getStatus();
+
+ if (needStatusChangeMessage(pendingMessages, currentStatus,
desiredStatus)) {
+ Message statusChangeMessage =
MessageUtil.createStatusChangeMessage(currentStatus,
+ desiredStatus, managerInstance, managerSessionId, instanceName,
sessionId);
+ messagesGenerated.add(statusChangeMessage);
+ }
+ }
+
+ return messagesGenerated;
+ }
+
+ private boolean needStatusChangeMessage(Collection<Message> messages,
+ LiveInstanceStatus currentStatus, LiveInstanceStatus desiredStatus) {
+ // 1. current status is not equal to desired status
+ // 2. participant change status message is not sent
+ return currentStatus != desiredStatus && messages.stream().noneMatch(
+ message -> message.isParticipantStatusChangeType() &&
desiredStatus.name()
+ .equals(message.getToState()));
+ }
+}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 94ff1d3..b639685 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -40,7 +40,9 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.Resource;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -66,6 +68,14 @@ public class ManagementModeStage extends AbstractBaseStage {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
ManagementControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
+ final Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+
+ final BestPossibleStateOutput bestPossibleStateOutput =
+ RebalanceUtil.buildBestPossibleState(resourceMap.keySet(),
currentStateOutput);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
ClusterManagementMode managementMode =
checkClusterFreezeStatus(cache.getEnabledLiveInstances(),
cache.getLiveInstances(),
@@ -75,17 +85,7 @@ public class ManagementModeStage extends AbstractBaseStage {
recordManagementModeHistory(managementMode, cache.getPauseSignal(),
manager.getInstanceName(),
accessor);
- // TODO: move to the last stage of management pipeline
- checkInManagementMode(clusterName, cache);
- }
-
- private void checkInManagementMode(String clusterName,
ManagementControllerDataProvider cache) {
- // Should exit management mode
- if (!HelixUtil.inManagementMode(cache.getPauseSignal(),
cache.getLiveInstances(),
- cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
- LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for
cluster " + clusterName);
- RebalanceUtil.enableManagementMode(clusterName, false);
- }
+ event.addAttribute(AttributeName.CLUSTER_STATUS.name(), managementMode);
}
// Checks cluster freeze, controller pause mode and status.
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
index ad7e6c8..1f47ee4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
@@ -30,10 +30,11 @@ import org.apache.helix.model.Partition;
public class MessageOutput {
private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+ private final List<Message> _statusChangeMessages;
public MessageOutput() {
_messagesMap = new HashMap<>();
-
+ _statusChangeMessages = new ArrayList<>();
}
public void addMessage(String resourceName, Partition partition, Message
message) {
@@ -56,6 +57,10 @@ public class MessageOutput {
_messagesMap.get(resourceName).put(partition, messages);
}
+ public void addStatusChangeMessages(List<Message> messages) {
+ _statusChangeMessages.addAll(messages);
+ }
+
public List<Message> getMessages(String resourceName, Partition resource) {
Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
if (map != null && map.get(resource) != null) {
@@ -68,6 +73,10 @@ public class MessageOutput {
return _messagesMap.getOrDefault(resourceName, Collections.emptyMap());
}
+ public List<Message> getStatusChangeMessages() {
+ return _statusChangeMessages;
+ }
+
@Override
public String toString() {
return _messagesMap.toString();
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 7a9671d..fed2940 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -50,7 +50,8 @@ public class LiveInstance extends HelixProperty {
* Saved values for the {@link LiveInstanceProperty#STATUS} field
*/
public enum LiveInstanceStatus {
- PAUSED
+ PAUSED,
+ NORMAL
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index f74b98f..bc4054a 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.util;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -30,8 +31,11 @@ import org.apache.helix.HelixException;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -192,6 +196,29 @@ public class RebalanceUtil {
}
/**
+ * Build best possible state out by copying the state map from current state
output.
+ * It'll be used for generating pending ST cancellation messages.
+ *
+ * @param resourceNames collection of resource names
+ * @param currentStateOutput Current state output {@link CurrentStateOutput}
+ * @return {@link BestPossibleStateOutput}
+ */
+ public static BestPossibleStateOutput
buildBestPossibleState(Collection<String> resourceNames,
+ CurrentStateOutput currentStateOutput) {
+ BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+ for (String resource : resourceNames) {
+ Map<Partition, Map<String, String>> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resource);
+ if (currentStateMap != null) {
+ output.setState(resource, currentStateMap);
+ }
+ }
+
+ return output;
+ }
+
+ /**
* runStage allows the run of individual stages. It can be used to mock a
part of the Controller
* pipeline run.
*
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
new file mode 100644
index 0000000..c16b503
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
@@ -0,0 +1,140 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.status.ClusterManagementMode;
+import
org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.util.RebalanceUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestManagementMessageGeneration extends
ManagementMessageGenerationPhase {
+ private static final String TEST_CLUSTER = "testCluster";
+ private static final String TEST_RESOURCE = "resource0";
+ private static final String TEST_INSTANCE = "instance0";
+ private static final String TEST_PARTITION = "partition0";
+
+ @Test
+ public void testCancelPendingSTMessage() throws Exception {
+ List<Message> messages = generateMessages("ONLINE", "ONLINE", "OFFLINE",
true);
+
+ Assert.assertEquals(messages.size(), 1, "Should create cancellation
message");
+
+ Message msg = messages.get(0);
+ Assert.assertEquals(msg.getMsgType(),
Message.MessageType.STATE_TRANSITION_CANCELLATION.name());
+ Assert.assertEquals(msg.getFromState(), "ONLINE");
+ Assert.assertEquals(msg.getToState(), "OFFLINE");
+
+ messages = generateMessages("ONLINE", "ONLINE", "OFFLINE", false);
+ Assert.assertEquals(messages.size(), 0);
+ }
+
+ private List<Message> generateMessages(String currentState, String
fromState, String toState,
+ boolean cancelPendingST) throws Exception {
+ ClusterEvent event = new ClusterEvent(TEST_CLUSTER,
ClusterEventType.Unknown);
+
+ // Set current state to event
+ CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
+ Partition partition = mock(Partition.class);
+ when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
+ when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition,
TEST_INSTANCE))
+ .thenReturn(currentState);
+ when(currentStateOutput.getCurrentStateMap(TEST_RESOURCE))
+ .thenReturn(ImmutableMap.of(partition, ImmutableMap.of(TEST_INSTANCE,
currentState)));
+
+ // Pending message for error partition reset
+ Message pendingMessage = mock(Message.class);
+ when(pendingMessage.getFromState()).thenReturn(fromState);
+ when(pendingMessage.getToState()).thenReturn(toState);
+ when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition,
TEST_INSTANCE))
+ .thenReturn(pendingMessage);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+ // Set helix manager to event
+ event.addAttribute(AttributeName.helixmanager.name(),
mock(HelixManager.class));
+
+ StateModelDefinition stateModelDefinition = new
StateModelDefinition.Builder("TestStateModel")
+ .addState("ONLINE", 1).addState("OFFLINE")
+ .addState("DROPPED").addState("ERROR")
+ .initialState("OFFLINE")
+ .addTransition("ERROR", "OFFLINE", 1).addTransition("ONLINE",
"OFFLINE", 2)
+ .addTransition("OFFLINE", "DROPPED", 3).addTransition("OFFLINE",
"ONLINE", 4)
+ .build();
+
+ // Set controller data provider to event
+ ManagementControllerDataProvider cache =
mock(ManagementControllerDataProvider.class);
+
when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
+ Map<String, LiveInstance> liveInstances = mock(Map.class);
+ LiveInstance mockLiveInstance = mock(LiveInstance.class);
+ when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
+ when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
+
when(liveInstances.values()).thenReturn(Collections.singletonList(mockLiveInstance));
+ when(cache.getLiveInstances()).thenReturn(liveInstances);
+ ClusterConfig clusterConfig = mock(ClusterConfig.class);
+ when(cache.getClusterConfig()).thenReturn(clusterConfig);
+
when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(cancelPendingST);
+ PauseSignal pauseSignal = mock(PauseSignal.class);
+ when(pauseSignal.getCancelPendingST()).thenReturn(true);
+ when(cache.getPauseSignal()).thenReturn(pauseSignal);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+ // Set event attribute: resources to rebalance
+ Map<String, Resource> resourceMap = new HashMap<>();
+ Resource resource = mock(Resource.class);
+ when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
+ List<Partition> partitions = Collections.singletonList(partition);
+ when(resource.getPartitions()).thenReturn(partitions);
+
when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
+ resourceMap.put(TEST_RESOURCE, resource);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMap);
+
+ // set up resource state map
+ BestPossibleStateOutput bestPossibleStateOutput =
+ RebalanceUtil.buildBestPossibleState(resourceMap.keySet(),
currentStateOutput);
+
+ // Process the event
+ ClusterManagementMode mode = new
ClusterManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE,
+ ClusterManagementMode.Status.IN_PROGRESS);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ event.addAttribute(AttributeName.CLUSTER_STATUS.name(), mode);
+ process(event);
+ MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+
+ return output.getMessages(TEST_RESOURCE, partition);
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
index 28ca524..2737bf7 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -83,6 +83,8 @@ public class TestManagementModeStage extends ZkTestBase {
Pipeline dataRefresh = new Pipeline();
dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new ResourceComputationStage());
+ dataRefresh.addStage(new CurrentStateComputationStage());
runPipeline(event, dataRefresh, false);
ManagementModeStage managementModeStage = new ManagementModeStage();
managementModeStage.process(event);