This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new b7d771e70 Deprecate HELIX_DISABLED_REASON and refactor how
InstanceOperation is represented in instance configs. (#2801)
b7d771e70 is described below
commit b7d771e7032dd542ab8b466f28d239c3e425f5de
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Jun 11 11:17:07 2024 -0700
Deprecate HELIX_DISABLED_REASON and refactor how InstanceOperation is
represented in instance configs. (#2801)
Deprecate HELIX_DISABLED_REASON and HELIX_DISABLED_TYPE; Refactor
INSTANCE_OPERATION to HELIX_INSTANCE_OPERATIONS List Field
To prevent conflicts from different clients setting the InstanceOperation,
we are introducing the HELIX_INSTANCE_OPERATIONS list.
Key changes:
- Clients using the old Helix enabled APIs will take precedence over
INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS when those fields are set.
- When the new InstanceOperation APIs set the operation type to DISABLE,
the old HELIX_ENABLED field will also be set for backwards compatibility.
- For all InstanceOperation API invocations, the source will default to
USER unless specified otherwise. An AUTOMATION source will create a separate
entry in the list.
- The most recent non-ENABLE InstanceOperation entry will be the active
InstanceOperation used by the controller and returned by the
getInstanceOperation API.
These changes ensure smoother operation transitions and maintain
compatibility with existing APIs.
---
.../apache/helix/constants/InstanceConstants.java | 37 ++-
.../src/main/java/org/apache/helix/HelixAdmin.java | 31 +-
.../event/helix/DefaultCloudEventCallbackImpl.java | 24 +-
.../cloud/event/helix/HelixEventHandlingUtil.java | 3 +
.../trimmer/InstanceConfigTrimmer.java | 16 +
.../dataproviders/BaseControllerDataProvider.java | 8 +-
.../stages/BestPossibleStateCalcStage.java | 8 +-
.../stages/CurrentStateComputationStage.java | 1 +
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 314 ++++++------------
.../org/apache/helix/model/InstanceConfig.java | 355 ++++++++++++++++++---
.../apache/helix/spectator/RoutingDataCache.java | 9 +-
.../java/org/apache/helix/util/InstanceUtil.java | 198 ++++++++++++
.../event/TestDefaultCloudEventCallbackImpl.java | 10 +-
.../rebalancer/TestInstanceOperation.java | 37 ++-
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 62 +++-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 29 +-
.../org/apache/helix/model/TestInstanceConfig.java | 114 ++++++-
.../StoppableInstancesSelector.java | 4 +-
.../resources/helix/PerInstanceAccessor.java | 18 +-
.../helix/rest/server/TestPerInstanceAccessor.java | 8 +-
20 files changed, 958 insertions(+), 328 deletions(-)
diff --git
a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
index 85c22c460..22f6c7c76 100644
---
a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
+++
b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
@@ -20,21 +20,54 @@ public class InstanceConstants {
* TODO: Remove this when the deprecated HELIX_ENABLED is removed.
*/
public static final Set<InstanceOperation>
INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS =
- ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE,
InstanceOperation.EVACUATE);
+ ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.EVACUATE);
/**
* The set of InstanceOperations that are not allowed to be populated in the
RoutingTableProvider.
*/
- public static final Set<InstanceOperation> UNSERVABLE_INSTANCE_OPERATIONS =
+ public static final Set<InstanceOperation> UNROUTABLE_INSTANCE_OPERATIONS =
ImmutableSet.of(InstanceOperation.SWAP_IN, InstanceOperation.UNKNOWN);
+ @Deprecated
public enum InstanceDisabledType {
CLOUD_EVENT,
USER_OPERATION,
DEFAULT_INSTANCE_DISABLE_TYPE
}
+ public enum InstanceOperationSource {
+ ADMIN(0), USER(1), AUTOMATION(2), DEFAULT(3);
+
+ private final int _priority;
+
+ InstanceOperationSource(int priority) {
+ _priority = priority;
+ }
+
+ public int getPriority() {
+ return _priority;
+ }
+
+ /**
+ * Convert from InstanceDisabledType to InstanceOperationTrigger
+ *
+ * @param disabledType InstanceDisabledType
+ * @return InstanceOperationTrigger
+ */
+ public static InstanceOperationSource
instanceDisabledTypeToInstanceOperationSource(
+ InstanceDisabledType disabledType) {
+ switch (disabledType) {
+ case CLOUD_EVENT:
+ return InstanceOperationSource.AUTOMATION;
+ case USER_OPERATION:
+ return InstanceOperationSource.USER;
+ default:
+ return InstanceOperationSource.DEFAULT;
+ }
+ }
+ }
+
public enum InstanceOperation {
/**
* Behavior: Replicas will be assigned to the node and will receive upward
state transitions if
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 84a7154b1..07afb55b6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -310,15 +310,38 @@ public interface HelixAdmin {
void enableInstance(String clusterName, List<String> instances, boolean
enabled);
/**
- * Set the instanceOperation field. Setting it to null is equivalent to
- * ENABLE.
+ * Set the instanceOperation of and instance with {@link
InstanceConstants.InstanceOperation}.
*
* @param clusterName The cluster name
* @param instanceName The instance name
- * @param instanceOperation The instance operation
+ * @param instanceOperation The instance operation type
*/
void setInstanceOperation(String clusterName, String instanceName,
- @Nullable InstanceConstants.InstanceOperation instanceOperation);
+ InstanceConstants.InstanceOperation instanceOperation);
+
+ /**
+ * Set the instanceOperation of and instance with {@link
InstanceConstants.InstanceOperation}.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation type
+ * @param reason The reason for the operation
+ */
+ void setInstanceOperation(String clusterName, String instanceName,
+ InstanceConstants.InstanceOperation instanceOperation, String reason);
+
+ /**
+ * Set the instanceOperation of and instance with {@link
InstanceConstants.InstanceOperation}.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation type
+ * @param reason The reason for the operation
+ * @param overrideAll Whether to override all existing instance
operations from all other
+ * instance operations
+ */
+ void setInstanceOperation(String clusterName, String instanceName,
+ InstanceConstants.InstanceOperation instanceOperation, String reason,
boolean overrideAll);
/**
* Disable or enable a resource
diff --git
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index 04ad4b798..20c500116 100644
---
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -23,6 +23,8 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.InstanceUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +51,14 @@ public class DefaultCloudEventCallbackImpl {
LOG.info("DefaultCloudEventCallbackImpl disable Instance {}",
manager.getInstanceName());
if (InstanceValidationUtil
.isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName()))
{
- manager.getClusterManagmentTool()
- .enableInstance(manager.getClusterName(), manager.getInstanceName(),
false,
- InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+ InstanceUtil.setInstanceOperation(manager.getConfigAccessor(),
+ manager.getHelixDataAccessor().getBaseDataAccessor(),
manager.getClusterName(),
+ manager.getInstanceName(),
+ new InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.DISABLE)
+
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION)
+ .setReason(message)
+ .build());
}
HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
manager.getInstanceName(),
manager.getHelixDataAccessor().getBaseDataAccessor(), false,
@@ -72,10 +79,13 @@ public class DefaultCloudEventCallbackImpl {
HelixEventHandlingUtil
.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
instanceName,
manager.getHelixDataAccessor().getBaseDataAccessor(), true,
message);
- if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName,
accessor)) {
-
manager.getClusterManagmentTool().enableInstance(manager.getClusterName(),
instanceName, true,
- InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
- }
+ InstanceUtil.setInstanceOperation(manager.getConfigAccessor(),
+ manager.getHelixDataAccessor().getBaseDataAccessor(),
manager.getClusterName(),
+ manager.getInstanceName(),
+ new InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.ENABLE)
+
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).setReason(message)
+ .build());
}
/**
diff --git
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
index ee96a13ee..ceff1d299 100644
---
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
@@ -48,7 +48,10 @@ class HelixEventHandlingUtil {
* @param dataAccessor
* @return return true only when instance is Helix disabled and the disabled
reason in
* instanceConfig is cloudEvent
+ * @deprecated No need to check this if using InstanceOperation and
specifying the trigger as CLOUD
+ * when enabling.
*/
+ @Deprecated
static boolean isInstanceDisabledForCloudEvent(String instanceName,
HelixDataAccessor dataAccessor) {
InstanceConfig instanceConfig =
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
index 45b0dde76..cd2b16f92 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.changedetector.trimmer;
* under the License.
*/
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -62,6 +63,21 @@ public class InstanceConfigTrimmer extends
HelixPropertyTrimmer<InstanceConfig>
return STATIC_TOPOLOGY_RELATED_FIELD_MAP;
}
+ /**
+ * We should trim HELIX_INSTANCE_OPERATIONS field, it is used to filter
instances in the
+ * BaseControllerDataProvider. That filtering will be used to determine if
ResourceChangeSnapshot
+ * has changed as opposed to checking the actual value of the field.
+ *
+ * @param property the instance config
+ * @return a map contains all non-trimmable field keys that need to be kept.
+ */
+ protected Map<FieldType, Set<String>> getNonTrimmableKeys(InstanceConfig
property) {
+ Map<FieldType, Set<String>> nonTrimmableKeys =
super.getNonTrimmableKeys(property);
+ nonTrimmableKeys.get(FieldType.LIST_FIELD)
+ .remove(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name());
+ return nonTrimmableKeys;
+ }
+
@Override
public InstanceConfig trimProperty(InstanceConfig property) {
return new InstanceConfig(doTrim(property));
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a91ae12d2..ce5d3de8c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -413,14 +413,15 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
newInstanceConfigMapByInstanceOperation.computeIfAbsent(
- currentInstanceConfig.getInstanceOperation(), k -> new
HashMap<>())
+ currentInstanceConfig.getInstanceOperation().getOperation(),
+ k -> new HashMap<>())
.put(node, currentInstanceConfig);
if (currentInstanceConfig.isAssignable()) {
newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
}
- if (currentInstanceConfig.getInstanceOperation()
+ if (currentInstanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
swapInLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
currentInstanceLogicalId);
@@ -1079,7 +1080,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
_disabledInstanceSet.clear();
for (InstanceConfig config : allInstanceConfigs) {
Map<String, List<String>> disabledPartitionMap =
config.getDisabledPartitionsMap();
- if
(config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE))
{
+ if (config.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.DISABLE)) {
_disabledInstanceSet.add(config.getInstanceName());
}
for (String resource : disabledPartitionMap.keySet()) {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1db0eccfc..714e9325d 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -36,7 +36,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
-import org.apache.helix.controller.common.ResourcesStateMap;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
@@ -358,11 +357,12 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
if (maxInstancesUnableToAcceptOnlineReplicas >= 0) {
// Instead of only checking the offline instances, we consider how many
instances in the cluster
// are not assignable and live. This is because some instances may be
online but have an unassignable
- // InstanceOperation such as EVACUATE, DISABLE, or UNKNOWN. We will
exclude SWAP_IN instances from
+ // InstanceOperation such as EVACUATE, and DISABLE. We will exclude
SWAP_IN and UNKNOWN instances from
// they should not account against the capacity of the cluster.
int instancesUnableToAcceptOnlineReplicas =
cache.getInstanceConfigMap().entrySet().stream()
- .filter(instanceEntry ->
!InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
-
instanceEntry.getValue().getInstanceOperation())).collect(Collectors.toSet())
+ .filter(instanceEntry ->
!InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
+ instanceEntry.getValue().getInstanceOperation().getOperation()))
+ .collect(Collectors.toSet())
.size() - cache.getEnabledLiveInstances().size();
if (instancesUnableToAcceptOnlineReplicas >
maxInstancesUnableToAcceptOnlineReplicas) {
String errMsg = String.format(
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 3bf23d22e..da972d682 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -109,6 +109,7 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
// Only update the currentStateExcludingUnknown if the instance is not
in UNKNOWN InstanceOperation.
if (instanceConfig == null || !instanceConfig.getInstanceOperation()
+ .getOperation()
.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
// update current states.
updateCurrentStates(instance,
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 8c873b4cd..39ae9ae67 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -42,7 +42,6 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableSet;
-import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
@@ -69,7 +68,6 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterStatus;
-import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.CurrentState;
@@ -89,11 +87,11 @@ import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.InstanceUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -125,6 +123,7 @@ public class ZKHelixAdmin implements HelixAdmin {
private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
+ private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
// true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false
otherwise
// This is used for close() to determine how ZKHelixAdmin should close the
underlying ZkClient
private final boolean _usesExternalZkClient;
@@ -142,6 +141,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public ZKHelixAdmin(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_configAccessor = new ConfigAccessor(zkClient);
+ _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
_usesExternalZkClient = true;
}
@@ -182,12 +182,14 @@ public class ZKHelixAdmin implements HelixAdmin {
_zkClient = zkClient;
_configAccessor = new ConfigAccessor(_zkClient);
+ _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
_usesExternalZkClient = false;
}
private ZKHelixAdmin(RealmAwareZkClient zkClient, boolean
usesExternalZkClient) {
_zkClient = zkClient;
_configAccessor = new ConfigAccessor(_zkClient);
+ _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
_usesExternalZkClient = usesExternalZkClient;
}
@@ -206,7 +208,8 @@ public class ZKHelixAdmin implements HelixAdmin {
}
List<InstanceConfig> matchingLogicalIdInstances =
- findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor,
clusterName,
+ instanceConfig);
if (matchingLogicalIdInstances.size() > 1) {
throw new HelixException(
"There are already more than one instance with the same logicalId in
the cluster: "
@@ -216,17 +219,16 @@ public class ZKHelixAdmin implements HelixAdmin {
}
InstanceConstants.InstanceOperation attemptedInstanceOperation =
- instanceConfig.getInstanceOperation();
+ instanceConfig.getInstanceOperation().getOperation();
try {
- validateInstanceOperationTransition(instanceConfig,
- !matchingLogicalIdInstances.isEmpty() ?
matchingLogicalIdInstances.get(0) : null,
- InstanceConstants.InstanceOperation.UNKNOWN,
- attemptedInstanceOperation, clusterName);
+ InstanceUtil.validateInstanceOperationTransition(_configAccessor,
clusterName, instanceConfig,
+ InstanceConstants.InstanceOperation.UNKNOWN,
attemptedInstanceOperation);
} catch (HelixException e) {
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
logger.error("Failed to add instance " +
instanceConfig.getInstanceName() + " to cluster "
+ clusterName + " with instance operation " +
attemptedInstanceOperation
+ ". Setting INSTANCE_OPERATION to " +
instanceConfig.getInstanceOperation()
+ .getOperation()
+ " instead.", e);
}
@@ -240,8 +242,7 @@ public class ZKHelixAdmin implements HelixAdmin {
_zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName,
nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName,
nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName,
nodeId), true);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.participantHistory(nodeId), new
ParticipantHistory(nodeId));
}
@@ -344,8 +345,7 @@ public class ZKHelixAdmin implements HelixAdmin {
"instance" + instanceName + " does not exist in cluster " +
clusterName);
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -364,8 +364,7 @@ public class ZKHelixAdmin implements HelixAdmin {
"instance" + instanceName + " does not exist in cluster " +
clusterName);
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey instanceConfigPropertyKey =
accessor.keyBuilder().instanceConfig(instanceName);
InstanceConfig currentInstanceConfig =
accessor.getProperty(instanceConfigPropertyKey);
if
(!newInstanceConfig.getHostName().equals(currentInstanceConfig.getHostName())
@@ -397,9 +396,6 @@ public class ZKHelixAdmin implements HelixAdmin {
// Eventually we will have all instances' enable/disable information in
clusterConfig. Now we
// update both instanceConfig and clusterConfig in transition period.
enableSingleInstance(clusterName, instanceName, enabled, baseAccessor,
disabledType, reason);
-// enableBatchInstances(clusterName,
Collections.singletonList(instanceName), enabled,
-// baseAccessor, disabledType, reason);
-
}
@Deprecated
@@ -413,62 +409,6 @@ public class ZKHelixAdmin implements HelixAdmin {
//enableInstance(clusterName, instances, enabled, null, null);
}
- private void validateInstanceOperationTransition(InstanceConfig
instanceConfig,
- InstanceConfig matchingLogicalIdInstance,
- InstanceConstants.InstanceOperation currentOperation,
- InstanceConstants.InstanceOperation targetOperation,
- String clusterName) {
- boolean targetStateEnableOrDisable =
- targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE)
- ||
targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE);
- switch (currentOperation) {
- case ENABLE:
- case DISABLE:
- // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any
time.
- if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE,
- InstanceConstants.InstanceOperation.DISABLE,
-
InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) {
- return;
- }
- case SWAP_IN:
- // We can only ENABLE or DISABLE a SWAP_IN instance if there is an
instance with matching logicalId
- // with an InstanceOperation set to UNKNOWN.
- if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
- || matchingLogicalIdInstance.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.UNKNOWN))) ||
targetOperation.equals(
- InstanceConstants.InstanceOperation.UNKNOWN)) {
- return;
- }
- case EVACUATE:
- // EVACUATE can only be set to ENABLE or DISABLE when there is no
instance with the same
- // logicalId in the cluster.
- if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null)
- ||
targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
- return;
- }
- case UNKNOWN:
- // UNKNOWN can be set to ENABLE or DISABLE when there is no instance
with the same logicalId in the cluster
- // or the instance with the same logicalId in the cluster has
InstanceOperation set to EVACUATE.
- // UNKNOWN can be set to SWAP_IN when there is an instance with the
same logicalId in the cluster set to ENABLE,
- // or DISABLE.
- if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
- || matchingLogicalIdInstance.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE)))) {
- return;
- } else if
(targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN)
- && matchingLogicalIdInstance != null && !ImmutableSet.of(
- InstanceConstants.InstanceOperation.UNKNOWN,
- InstanceConstants.InstanceOperation.EVACUATE)
- .contains(matchingLogicalIdInstance.getInstanceOperation())) {
- return;
- }
- default:
- throw new HelixException(
- "InstanceOperation cannot be set to " + targetOperation + " when
the instance is in "
- + currentOperation + " state");
- }
- }
-
/**
* Set the InstanceOperation of an instance in the cluster.
*
@@ -479,75 +419,57 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setInstanceOperation(String clusterName, String instanceName,
@Nullable InstanceConstants.InstanceOperation instanceOperation) {
+ setInstanceOperation(clusterName, instanceName, instanceOperation, null,
false);
+ }
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<>(_zkClient);
- String path = PropertyPathBuilder.instanceConfig(clusterName,
instanceName);
-
- InstanceConfig instanceConfig = getInstanceConfig(clusterName,
instanceName);
- if (instanceConfig == null) {
- throw new HelixException("Cluster " + clusterName + ", instance: " +
instanceName
- + ", instance config does not exist");
- }
- List<InstanceConfig> matchingLogicalIdInstances =
- findInstancesMatchingLogicalId(clusterName, instanceConfig);
- validateInstanceOperationTransition(instanceConfig,
- !matchingLogicalIdInstances.isEmpty() ?
matchingLogicalIdInstances.get(0) : null,
- instanceConfig.getInstanceOperation(),
- instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE
: instanceOperation,
- clusterName);
-
- boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- throw new HelixException(
- "Cluster: " + clusterName + ", instance: " + instanceName + ",
participant config is null");
- }
-
- InstanceConfig config = new InstanceConfig(currentData);
- config.setInstanceOperation(instanceOperation);
- return config.getRecord();
- }
- }, AccessOption.PERSISTENT);
+ /**
+ * Set the instanceOperation of and instance with {@link
InstanceConstants.InstanceOperation}.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation type
+ * @param reason The reason for the operation
+ */
+ @Override
+ public void setInstanceOperation(String clusterName, String instanceName,
+ @Nullable InstanceConstants.InstanceOperation instanceOperation, String
reason) {
+ setInstanceOperation(clusterName, instanceName, instanceOperation, reason,
false);
+ }
- if (!succeeded) {
- throw new HelixException("Failed to update instance operation. Please
check if instance is disabled.");
- }
+ /**
+ * Set the instanceOperation of and instance with {@link
InstanceConstants.InstanceOperation}.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation type
+ * @param reason The reason for the operation
+ * @param overrideAll Whether to override all existing instance
operations from all other
+ * instance operations
+ */
+ @Override
+ public void setInstanceOperation(String clusterName, String instanceName,
+ @Nullable InstanceConstants.InstanceOperation instanceOperation, String
reason,
+ boolean overrideAll) {
+ InstanceConfig.InstanceOperation instanceOperationObj =
+ new InstanceConfig.InstanceOperation.Builder().setOperation(
+ instanceOperation == null ?
InstanceConstants.InstanceOperation.ENABLE
+ : instanceOperation).setReason(reason).setSource(
+ overrideAll ? InstanceConstants.InstanceOperationSource.ADMIN
+ : InstanceConstants.InstanceOperationSource.USER).build();
+ InstanceUtil.setInstanceOperation(_configAccessor, _baseDataAccessor,
clusterName, instanceName,
+ instanceOperationObj);
}
@Override
public boolean isEvacuateFinished(String clusterName, String instanceName) {
if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
- return config != null && config.getInstanceOperation()
+ return config != null && config.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.EVACUATE);
}
return false;
}
- /**
- * Find the instance that the passed instance has a matching logicalId with.
- *
- * @param clusterName The cluster name
- * @param instanceConfig The instance to find the matching instance for
- * @return The matching instance if found, null otherwise.
- */
- private List<InstanceConfig> findInstancesMatchingLogicalId(String
clusterName,
- InstanceConfig instanceConfig) {
- String logicalIdKey =
-
ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName))
- .getEndNodeType();
- return getConfigKeys(
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
- clusterName).build()).stream()
- .map(instanceName -> getInstanceConfig(clusterName,
instanceName)).filter(
- potentialInstanceConfig ->
-
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
- && potentialInstanceConfig.getLogicalId(logicalIdKey)
- .equals(instanceConfig.getLogicalId(logicalIdKey)))
- .collect(Collectors.toList());
- }
-
/**
* Check to see if swapping between two instances is ready to be completed.
Checks: 1. Both
* instances must be alive. 2. Both instances must only have one session and
not be carrying over
@@ -563,7 +485,7 @@ public class ZKHelixAdmin implements HelixAdmin {
*/
private boolean canCompleteSwap(String clusterName, String
swapOutInstanceName,
String swapInInstanceName) {
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -579,8 +501,8 @@ public class ZKHelixAdmin implements HelixAdmin {
"SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {}
for cluster {}. Swap will"
+ " not complete unless SwapInInstance instance is ONLINE.",
swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" :
"OFFLINE",
- swapOutInstanceConfig.getInstanceOperation(), swapInInstanceName,
- swapInInstanceConfig.getInstanceOperation(), clusterName);
+ swapOutInstanceConfig.getInstanceOperation().getOperation(),
swapInInstanceName,
+ swapInInstanceConfig.getInstanceOperation().getOperation(),
clusterName);
return false;
}
@@ -619,7 +541,7 @@ public class ZKHelixAdmin implements HelixAdmin {
// 4. If the swap-out instance is not alive or is disabled, we return true
without checking
// the current states on the swap-in instance.
- if (swapOutLiveInstance == null ||
swapOutInstanceConfig.getInstanceOperation()
+ if (swapOutLiveInstance == null ||
swapOutInstanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.DISABLE)) {
return true;
}
@@ -697,7 +619,8 @@ public class ZKHelixAdmin implements HelixAdmin {
}
List<InstanceConfig> swappingInstances =
- findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor,
clusterName,
+ instanceConfig);
if (swappingInstances.size() != 1) {
logger.warn(
"Instance {} in cluster {} is not swapping with any other instance.
Cannot determine if the swap is complete.",
@@ -705,10 +628,10 @@ public class ZKHelixAdmin implements HelixAdmin {
return false;
}
- InstanceConfig swapOutInstanceConfig =
-
!instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+ InstanceConfig swapOutInstanceConfig =
!instanceConfig.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN)
? instanceConfig : swappingInstances.get(0);
- InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+ InstanceConfig swapInInstanceConfig =
instanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
: swappingInstances.get(0);
if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
@@ -735,7 +658,8 @@ public class ZKHelixAdmin implements HelixAdmin {
}
List<InstanceConfig> swappingInstances =
- findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor,
clusterName,
+ instanceConfig);
if (swappingInstances.size() != 1) {
logger.warn(
"Instance {} in cluster {} is not swapping with any other instance.
Cannot determine if the swap is complete.",
@@ -743,10 +667,10 @@ public class ZKHelixAdmin implements HelixAdmin {
return false;
}
- InstanceConfig swapOutInstanceConfig =
-
!instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+ InstanceConfig swapOutInstanceConfig =
!instanceConfig.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN)
? instanceConfig : swappingInstances.get(0);
- InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+ InstanceConfig swapInInstanceConfig =
instanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
: swappingInstances.get(0);
if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
@@ -802,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null &&
INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
- config.getInstanceOperation());
+ config.getInstanceOperation().getOperation());
}
return false;
}
@@ -816,7 +740,7 @@ public class ZKHelixAdmin implements HelixAdmin {
*/
private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
String instanceName) {
- HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// check the instance is alive
@@ -827,7 +751,7 @@ public class ZKHelixAdmin implements HelixAdmin {
return false;
}
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
// count number of sessions under CurrentState folder. If it is carrying
over from prv session,
// then there are > 1 session ZNodes.
List<String> sessions =
baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName,
instanceName), 0);
@@ -867,7 +791,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public void enableResource(final String clusterName, final String
resourceName, final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" :
"Disable", resourceName, clusterName);
String path = PropertyPathBuilder.idealState(clusterName, resourceName);
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
if (!baseAccessor.exists(path, 0)) {
throw new HelixException("Cluster " + clusterName + ", resource: " +
resourceName
+ ", ideal-state does not exist");
@@ -894,7 +818,7 @@ public class ZKHelixAdmin implements HelixAdmin {
instanceName, clusterName);
String path = PropertyPathBuilder.instanceConfig(clusterName,
instanceName);
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
// check instanceConfig exists
if (!baseAccessor.exists(path, 0)) {
@@ -973,8 +897,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public void enableCluster(String clusterName, boolean enabled, String
reason) {
logger.info("{} cluster {} for reason {}.", enabled ? "Enable" :
"Disable", clusterName,
reason == null ? "NULL" : reason);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
if (enabled) {
@@ -998,8 +921,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public boolean isInMaintenanceMode(String clusterName) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getBaseDataAccessor()
.exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
@@ -1248,8 +1170,7 @@ public class ZKHelixAdmin implements HelixAdmin {
final String reason, final MaintenanceSignal.AutoTriggerReason
internalReason,
final Map<String, String> customFields,
final MaintenanceSignal.TriggeringEntity triggeringEntity) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
logger.info("Cluster {} {} {} maintenance mode for reason {}.",
clusterName,
triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ?
"automatically"
@@ -1512,8 +1433,7 @@ public class ZKHelixAdmin implements HelixAdmin {
List<String> instances = _zkClient.getChildren(memberInstancesPath);
List<String> result = new ArrayList<String>();
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (String instanceName : instances) {
@@ -1659,8 +1579,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public List<String> getResourcesInClusterWithTag(String clusterName, String
tag) {
List<String> resourcesWithTag = new ArrayList<String>();
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (String resourceName : getResourcesInCluster(clusterName)) {
@@ -1675,8 +1594,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public IdealState getResourceIdealState(String clusterName, String
resourceName) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.idealStates(resourceName));
@@ -1688,8 +1606,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger
.info("Set IdealState for resource {} in cluster {} with new
IdealState {}.", resourceName,
clusterName, idealState == null ? "NULL" : idealState.toString());
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
@@ -1731,8 +1648,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ExternalView getResourceExternalView(String clusterName, String
resourceName) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.externalView(resourceName));
}
@@ -1740,8 +1656,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public CustomizedView getResourceCustomizedView(String clusterName, String
resourceName,
String customizedStateType) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.customizedView(customizedStateType,
resourceName));
}
@@ -1774,8 +1689,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
}
@@ -1786,8 +1700,7 @@ public class ZKHelixAdmin implements HelixAdmin {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("Cluster " + clusterName + " is not setup yet");
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.removeProperty(keyBuilder.idealStates(resourceName));
@@ -1806,8 +1719,7 @@ public class ZKHelixAdmin implements HelixAdmin {
CloudConfig.Builder builder = new CloudConfig.Builder(cloudConfig);
CloudConfig cloudConfigBuilder = builder.build();
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.cloudConfig(), cloudConfigBuilder);
}
@@ -1815,8 +1727,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void removeCloudConfig(String clusterName) {
logger.info("Remove Cloud Config for cluster {}.", clusterName);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.removeProperty(keyBuilder.cloudConfig());
}
@@ -1847,8 +1758,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public StateModelDefinition getStateModelDef(String clusterName, String
stateModelName) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
@@ -1857,8 +1767,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropCluster(String clusterName) {
logger.info("Deleting cluster {}.", clusterName);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
String root = "/" + clusterName;
@@ -1935,8 +1844,7 @@ public class ZKHelixAdmin implements HelixAdmin {
new CustomizedStateConfig.Builder(customizedStateConfig);
CustomizedStateConfig customizedStateConfigFromBuilder = builder.build();
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.customizedStateConfig(),
customizedStateConfigFromBuilder);
@@ -1947,8 +1855,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info(
"Remove CustomizedStateConfig from cluster {}.", clusterName);
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.removeProperty(keyBuilder.customizedStateConfig());
@@ -1967,8 +1874,7 @@ public class ZKHelixAdmin implements HelixAdmin {
builder.addAggregationEnabledType(type);
CustomizedStateConfig customizedStateConfigFromBuilder = builder.build();
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
if(!accessor.updateProperty(keyBuilder.customizedStateConfig(),
customizedStateConfigFromBuilder)) {
@@ -1997,8 +1903,7 @@ public class ZKHelixAdmin implements HelixAdmin {
builder.removeAggregationEnabledType(type);
CustomizedStateConfig customizedStateConfigFromBuilder = builder.build();
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.customizedStateConfig(),
customizedStateConfigFromBuilder);
@@ -2021,7 +1926,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void onDemandRebalance(String clusterName) {
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
String path = PropertyPathBuilder.clusterConfig(clusterName);
if (!baseAccessor.exists(path, 0)) {
@@ -2204,7 +2109,7 @@ public class ZKHelixAdmin implements HelixAdmin {
final String constraintId, final ConstraintItem constraintItem) {
logger.info("Set constraint type {} with constraint id {} for cluster
{}.", constraintType,
constraintId, clusterName);
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -2227,7 +2132,7 @@ public class ZKHelixAdmin implements HelixAdmin {
final String constraintId) {
logger.info("Remove constraint type {} with constraint id {} for cluster
{}.", constraintType,
constraintId, clusterName);
- BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -2248,8 +2153,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ClusterConstraints getConstraints(String clusterName, ConstraintType
constraintType) {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
return
accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
@@ -2331,8 +2235,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException(
"cluster " + clusterName + " instance " + instanceName + " is not
setup yet");
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config =
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -2352,8 +2255,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException(
"cluster " + clusterName + " instance " + instanceName + " is not
setup yet");
}
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config =
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -2373,8 +2275,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException(
"cluster " + clusterName + " instance " + instanceName + " is not
setup yet");
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config =
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -2431,23 +2332,19 @@ public class ZKHelixAdmin implements HelixAdmin {
}
InstanceConfig config = new InstanceConfig(currentData);
- config.setInstanceEnabled(enabled);
- if (!enabled) {
- // new disabled type and reason will overwrite existing ones.
- config.resetInstanceDisabledTypeAndReason();
- if (reason != null) {
- config.setInstanceDisabledReason(reason);
- }
- if (disabledType != null) {
- config.setInstanceDisabledType(disabledType);
- }
- }
+ config.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ enabled ? InstanceConstants.InstanceOperation.ENABLE
+ :
InstanceConstants.InstanceOperation.DISABLE).setReason(reason).setSource(
+ disabledType != null
+ ?
InstanceConstants.InstanceOperationSource.instanceDisabledTypeToInstanceOperationSource(
+ disabledType) : null).build());
return config.getRecord();
}
}, AccessOption.PERSISTENT);
}
// TODO: Add history ZNode for all batched enabling/disabling histories with
metadata.
+ @Deprecated
private void enableBatchInstances(final String clusterName, final
List<String> instances,
final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor,
InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -2783,8 +2680,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
}
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
_baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<String> instanceConfigNames =
accessor.getChildNames(keyBuilder.instanceConfigs());
List<String> instancePathNames =
accessor.getChildNames(keyBuilder.instances());
diff --git
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index de41646c3..1b3acd68d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -29,6 +29,11 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
@@ -48,11 +53,13 @@ public class InstanceConfig extends HelixProperty {
* Configurable characteristics of an instance
*/
public enum InstanceConfigProperty {
- HELIX_HOST, HELIX_PORT, HELIX_ZONE_ID, @Deprecated
- HELIX_ENABLED,
+ HELIX_HOST,
+ HELIX_PORT,
+ HELIX_ZONE_ID,
+ @Deprecated HELIX_ENABLED,
HELIX_ENABLED_TIMESTAMP,
- HELIX_DISABLED_REASON,
- HELIX_DISABLED_TYPE,
+ @Deprecated HELIX_DISABLED_REASON,
+ @Deprecated HELIX_DISABLED_TYPE,
HELIX_DISABLED_PARTITION,
TAG_LIST,
INSTANCE_WEIGHT,
@@ -60,15 +67,128 @@ public class InstanceConfig extends HelixProperty {
DELAY_REBALANCE_ENABLED,
MAX_CONCURRENT_TASK,
INSTANCE_INFO_MAP,
- INSTANCE_CAPACITY_MAP,
- TARGET_TASK_THREAD_POOL_SIZE,
- INSTANCE_OPERATION
+ INSTANCE_CAPACITY_MAP, TARGET_TASK_THREAD_POOL_SIZE,
HELIX_INSTANCE_OPERATIONS
+ }
+
+ public static class InstanceOperation {
+ private final Map<String, String> _properties;
+
+ private enum InstanceOperationProperties {
+ OPERATION, REASON, SOURCE, TIMESTAMP
+ }
+
+ private InstanceOperation(@Nullable Map<String, String> properties) {
+ // Default to ENABLE operation if no operation type is provided.
+ _properties = properties == null ? new HashMap<>() : properties;
+ if
(!_properties.containsKey(InstanceOperationProperties.OPERATION.name())) {
+ _properties.put(InstanceOperationProperties.OPERATION.name(),
+ InstanceConstants.InstanceOperation.ENABLE.name());
+ }
+ }
+
+ public static class Builder {
+ private Map<String, String> _properties = new HashMap<>();
+
+ /**
+ * Set the operation type for this instance operation.
+ * @param operationType InstanceOperation type of this instance
operation.
+ */
+ public Builder setOperation(@Nullable
InstanceConstants.InstanceOperation operationType) {
+ _properties.put(InstanceOperationProperties.OPERATION.name(),
+ operationType == null ?
InstanceConstants.InstanceOperation.ENABLE.name()
+ : operationType.name());
+ return this;
+ }
+
+ /**
+ * Set the reason for this instance operation.
+ * @param reason
+ */
+ public Builder setReason(String reason) {
+ _properties.put(InstanceOperationProperties.REASON.name(), reason !=
null ? reason : "");
+ return this;
+ }
+
+ /**
+ * Set the source for this instance operation.
+ * @param source InstanceOperationSource
+ * that caused this instance operation to be triggered.
+ */
+ public Builder setSource(InstanceConstants.InstanceOperationSource
source) {
+ _properties.put(InstanceOperationProperties.SOURCE.name(),
+ source == null ?
InstanceConstants.InstanceOperationSource.USER.name()
+ : source.name());
+ return this;
+ }
+
+ public InstanceOperation build() throws IllegalArgumentException {
+ if
(!_properties.containsKey(InstanceOperationProperties.OPERATION.name())) {
+ throw new IllegalArgumentException(
+ "Instance operation type is not set, this is a required field.");
+ }
+ _properties.put(InstanceOperationProperties.TIMESTAMP.name(),
+ String.valueOf(System.currentTimeMillis()));
+ return new InstanceOperation(_properties);
+ }
+ }
+
+ /**
+ * Get the operation type of this instance operation.
+ * @return the InstanceOperation type
+ */
+ public InstanceConstants.InstanceOperation getOperation() throws
IllegalArgumentException {
+ return InstanceConstants.InstanceOperation.valueOf(
+ _properties.get(InstanceOperationProperties.OPERATION.name()));
+ }
+
+ /**
+ * Get the reason for this instance operation.
+ * If the reason is not set, it will default to an empty string.
+ *
+ * @return the reason for this instance operation.
+ */
+ public String getReason() {
+ return
_properties.getOrDefault(InstanceOperationProperties.REASON.name(), "");
+ }
+
+ /**
+ * Get the InstanceOperationSource
+ * that caused this instance operation to be triggered.
+ * If the source is not set, it will default to DEFAULT.
+ *
+ * @return the InstanceOperationSource
+ *that caused this instance operation to be triggered.
+ */
+ public InstanceConstants.InstanceOperationSource getSource() {
+ return InstanceConstants.InstanceOperationSource.valueOf(
+ _properties.getOrDefault(InstanceOperationProperties.SOURCE.name(),
+ InstanceConstants.InstanceOperationSource.USER.name()));
+ }
+
+ /**
+ * Get the timestamp (milliseconds from epoch) when this instance
operation was triggered.
+ *
+ * @return the timestamp when the instance operation was triggered.
+ */
+ public long getTimestamp() {
+ return
Long.parseLong(_properties.get(InstanceOperationProperties.TIMESTAMP.name()));
+ }
+
+ private void setTimestamp(long timestamp) {
+ _properties.put(InstanceOperationProperties.TIMESTAMP.name(),
String.valueOf(timestamp));
+ }
+
+ private Map<String, String> getProperties() {
+ return _properties;
+ }
}
public static final int WEIGHT_NOT_SET = -1;
public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final boolean HELIX_ENABLED_DEFAULT_VALUE = true;
+ private static final long HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE = -1;
+ private static final ObjectMapper _objectMapper = new ObjectMapper();
// These fields are not allowed to be overwritten by the merge method because
// they are unique properties of an instance.
@@ -79,6 +199,8 @@ public class InstanceConfig extends HelixProperty {
private static final Logger _logger =
LoggerFactory.getLogger(InstanceConfig.class.getName());
+ private List<InstanceOperation> _deserializedInstanceOperations;
+
/**
* Instantiate for a specific instance
* @param instanceId the instance identifier
@@ -264,25 +386,28 @@ public class InstanceConfig extends HelixProperty {
* enabled/disabled, return -1.
*/
public long getInstanceEnabledTime() {
- return
_record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
+ return
_record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(),
+ HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE);
}
/**
* Set the enabled state of the instance If user enables the instance,
HELIX_DISABLED_REASON filed
* will be removed.
- * @deprecated This method is deprecated. Please use setInstanceOperation
instead.
* @param enabled true to enable, false to disable
+ * @deprecated This method is deprecated. Please use setInstanceOperation
instead.
*/
@Deprecated
public void setInstanceEnabled(boolean enabled) {
// set instance operation only when we need to change InstanceEnabled
value.
- setInstanceEnabledHelper(enabled);
+ setInstanceEnabledHelper(enabled, null);
}
- private void setInstanceEnabledHelper(boolean enabled) {
- _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(),
enabled);
-
_record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(),
System.currentTimeMillis());
+ private void setInstanceEnabledHelper(boolean enabled, Long
timestampOverride) {
+ _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
enabled);
+ _record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(),
+ timestampOverride != null ? timestampOverride :
System.currentTimeMillis());
if (enabled) {
+ // TODO: Replace this when HELIX_ENABLED and HELIX_DISABLED_REASON is
removed.
resetInstanceDisabledTypeAndReason();
}
}
@@ -290,6 +415,7 @@ public class InstanceConfig extends HelixProperty {
/**
* Removes HELIX_DISABLED_REASON and HELIX_DISABLED_TYPE entry from simple
field.
*/
+ @Deprecated
public void resetInstanceDisabledTypeAndReason() {
_record.getSimpleFields().remove(InstanceConfigProperty.HELIX_DISABLED_REASON.name());
_record.getSimpleFields().remove(InstanceConfigProperty.HELIX_DISABLED_TYPE.name());
@@ -298,19 +424,25 @@ public class InstanceConfig extends HelixProperty {
/**
* Set the instance disabled reason when instance is disabled.
* It will be a no-op when instance is enabled.
+ * @deprecated This method is deprecated. Please use .
*/
+ @Deprecated
public void setInstanceDisabledReason(String disabledReason) {
- if
(getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
-
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(),
disabledReason);
- }
+ if
(getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE))
{
+
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(),
disabledReason);
+ }
}
/**
* Set the instance disabled type when instance is disabled.
* It will be a no-op when instance is enabled.
+ * @deprecated This method is deprecated. Please use setInstanceOperation
along with
+ * InstanceOperation.Builder().setSource
+ *(...)
*/
+ @Deprecated
public void setInstanceDisabledType(InstanceConstants.InstanceDisabledType
disabledType) {
- if
(getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
+ if
(getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE))
{
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
disabledType.name());
}
@@ -319,7 +451,9 @@ public class InstanceConfig extends HelixProperty {
/**
* Get the instance disabled reason when instance is disabled.
* @return Return instance disabled reason. Default is am empty string.
+ * @deprecated This method is deprecated. Please use
getInstanceOperation().getReason() instead.
*/
+ @Deprecated
public String getInstanceDisabledReason() {
return
_record.getStringField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), "");
}
@@ -328,63 +462,184 @@ public class InstanceConfig extends HelixProperty {
*
* @return Return instance disabled type
(org.apache.helix.constants.InstanceConstants.InstanceDisabledType)
* Default is am empty string.
+ * @deprecated This method is deprecated. Please use
getInstanceOperation().getSource
+ *() instead.
*/
+ @Deprecated
public String getInstanceDisabledType() {
- if
(!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
+ if (_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
+ HELIX_ENABLED_DEFAULT_VALUE)) {
return InstanceConstants.INSTANCE_NOT_DISABLED;
}
return
_record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
}
+ private List<InstanceOperation> getInstanceOperations() {
+ if (_deserializedInstanceOperations == null ||
_deserializedInstanceOperations.isEmpty()) {
+ // If the _deserializedInstanceOperations is not set, then we need to
build it from the real
+ // helix property HELIX_INSTANCE_OPERATIONS.
+ List<String> instanceOperations =
+
_record.getListField(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name());
+ List<InstanceOperation> newDeserializedInstanceOperations = new
ArrayList<>();
+ if (instanceOperations != null) {
+ for (String serializedInstanceOperation : instanceOperations) {
+ try {
+ Map<String, String> properties =
_objectMapper.readValue(serializedInstanceOperation,
+ new TypeReference<Map<String, String>>() {
+ });
+ newDeserializedInstanceOperations.add(new
InstanceOperation(properties));
+ } catch (JsonProcessingException e) {
+ _logger.error(
+ "Failed to deserialize instance operation for instance: " +
_record.getId(), e);
+ }
+ }
+ }
+ _deserializedInstanceOperations = newDeserializedInstanceOperations;
+ }
+
+ return _deserializedInstanceOperations;
+ }
+
/**
* Set the instance operation for this instance.
+ * This method also sets the HELIX_ENABLED, HELIX_DISABLED_REASON, and
HELIX_DISABLED_TYPE fields
+ * for backwards compatibility.
*
* @param operation the instance operation
*/
- public void setInstanceOperation(InstanceConstants.InstanceOperation
operation) {
- _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
- operation == null ? "" : operation.name());
- if (operation == null || operation ==
InstanceConstants.InstanceOperation.ENABLE
- || operation == InstanceConstants.InstanceOperation.DISABLE) {
+ public void setInstanceOperation(InstanceOperation operation) {
+ List<InstanceOperation> deserializedInstanceOperations =
getInstanceOperations();
+
+ if (operation.getSource() ==
InstanceConstants.InstanceOperationSource.ADMIN) {
+ deserializedInstanceOperations.clear();
+ } else {
+ // Remove the instance operation with the same source if it exists.
+ deserializedInstanceOperations.removeIf(
+ instanceOperation -> instanceOperation.getSource() ==
operation.getSource());
+ }
+ if (operation.getOperation() ==
InstanceConstants.InstanceOperation.ENABLE) {
+ // Insert the operation after the last ENABLE or at the beginning if
there isn't ENABLE in the list.
+ int insertIndex = 0;
+ for (int i = deserializedInstanceOperations.size() - 1; i >= 0; i--) {
+ if (deserializedInstanceOperations.get(i).getOperation()
+ == InstanceConstants.InstanceOperation.ENABLE) {
+ insertIndex = i + 1;
+ break;
+ }
+ }
+ deserializedInstanceOperations.add(insertIndex, operation);
+ } else {
+ deserializedInstanceOperations.add(operation);
+ }
+ // Set the actual field in the ZnRecord
+
_record.setListField(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name(),
+ deserializedInstanceOperations.stream().map(instanceOperation -> {
+ try {
+ return
_objectMapper.writeValueAsString(instanceOperation.getProperties());
+ } catch (JsonProcessingException e) {
+ throw new HelixException(
+ "Failed to serialize instance operation for instance: " +
_record.getId()
+ + " Can't set the instance operation to: " +
operation.getOperation(), e);
+ }
+ }).collect(Collectors.toList()));
+
+ // TODO: Remove this when we are sure that all users are using the new
InstanceOperation only and HELIX_ENABLED is removed.
+ if (operation.getOperation() ==
InstanceConstants.InstanceOperation.DISABLE) {
// We are still setting the HELIX_ENABLED field for backwards
compatibility.
// It is possible that users will be using earlier version of HelixAdmin
or helix-rest
// is on older version.
- // TODO: Remove this when we are sure that all users are using the new
field INSTANCE_OPERATION.
- setInstanceEnabledHelper(!(operation ==
InstanceConstants.InstanceOperation.DISABLE));
+
+ if (_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
true)) {
+ // Check if it is already disabled, if yes, then we don't need to set
HELIX_ENABLED and HELIX_ENABLED_TIMESTAMP
+ setInstanceEnabledHelper(false, operation.getTimestamp());
+ }
+
+
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(),
+ operation.getReason());
+ _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
+
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+ } else if (operation.getOperation() ==
InstanceConstants.InstanceOperation.ENABLE) {
+ // If any of the other InstanceOperations are of type DISABLE, set that
in the HELIX_ENABLED,
+ // HELIX_DISABLED_REASON, and HELIX_DISABLED_TYPE fields.
+ InstanceOperation latestDisableInstanceOperation = null;
+ for (InstanceOperation instanceOperation : getInstanceOperations()) {
+ if (instanceOperation.getOperation() ==
InstanceConstants.InstanceOperation.DISABLE && (
+ latestDisableInstanceOperation == null ||
instanceOperation.getTimestamp()
+ > latestDisableInstanceOperation.getTimestamp())) {
+ latestDisableInstanceOperation = instanceOperation;
+ }
+ }
+
+ if (latestDisableInstanceOperation != null) {
+
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(),
+ latestDisableInstanceOperation.getReason());
+
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
+
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+ } else {
+ setInstanceEnabledHelper(true, operation.getTimestamp());
+ }
}
}
+ /**
+ * Set the instance operation for this instance. Provide the
InstanceOperation enum and the reason
+ * and source will be set to default values.
+ *
+ * @param operation the instance operation
+ */
+ public void setInstanceOperation(InstanceConstants.InstanceOperation
operation) {
+ InstanceOperation instanceOperation =
+ new InstanceOperation.Builder().setOperation(operation).build();
+ setInstanceOperation(instanceOperation);
+ }
+
private void setInstanceOperationInit(InstanceConstants.InstanceOperation
operation) {
if (operation == null) {
return;
}
- _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation.name());
+ InstanceOperation instanceOperation =
+ new
InstanceOperation.Builder().setOperation(operation).setReason("INIT").build();
+ // When an instance is created for the first time the timestamp is set to
-1 so that if it
+ // is disabled it will not be considered within the delay window when it
joins.
+ instanceOperation.setTimestamp(HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE);
+ setInstanceOperation(instanceOperation);
+ }
+
+ private InstanceOperation getActiveInstanceOperation() {
+ List<InstanceOperation> instanceOperations = getInstanceOperations();
+
+ if (instanceOperations.isEmpty()) {
+ InstanceOperation instanceOperation =
+ new
InstanceOperation.Builder().setOperation(InstanceConstants.InstanceOperation.ENABLE)
+
.setSource(InstanceConstants.InstanceOperationSource.DEFAULT).build();
+ instanceOperation.setTimestamp(HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE);
+ return instanceOperation;
+ }
+
+ // The last instance operation in the list is the most recent one.
+ // ENABLE operation should not be included in the list.
+ return instanceOperations.get(instanceOperations.size() - 1);
}
/**
- * Get the InstanceOperation of this instance, default is ENABLE if nothing
is set. If
+ * Get the InstanceOperationType of this instance, default is ENABLE if
nothing is set. If
* HELIX_ENABLED is set to false, then the instance operation is DISABLE for
backwards
* compatibility.
*
* @return the instance operation
*/
- public InstanceConstants.InstanceOperation getInstanceOperation() {
- String instanceOperationString =
-
_record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name());
-
- InstanceConstants.InstanceOperation instanceOperation;
+ public InstanceOperation getInstanceOperation() {
+ InstanceOperation activeInstanceOperation = getActiveInstanceOperation();
try {
- // If INSTANCE_OPERATION is not set, then the instance is enabled.
- instanceOperation = (instanceOperationString == null ||
instanceOperationString.isEmpty())
- ? InstanceConstants.InstanceOperation.ENABLE
- :
InstanceConstants.InstanceOperation.valueOf(instanceOperationString);
+ activeInstanceOperation.getOperation();
} catch (IllegalArgumentException e) {
- _logger.error("Invalid instance operation: " + instanceOperationString +
" for instance: "
- + _record.getId()
+ _logger.error("Invalid instance operation type for instance: " +
_record.getId()
+ ". You may need to update your version of Helix to get support for
this "
+ "type of InstanceOperation. Defaulting to UNKNOWN.");
- return InstanceConstants.InstanceOperation.UNKNOWN;
+ activeInstanceOperation =
+ new
InstanceOperation.Builder().setOperation(InstanceConstants.InstanceOperation.UNKNOWN)
+ .build();
}
// Always respect the HELIX_ENABLED being set to false when instance
operation is unset
@@ -392,11 +647,16 @@ public class InstanceConfig extends HelixProperty {
if (!_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
HELIX_ENABLED_DEFAULT_VALUE)
&&
(InstanceConstants.INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS.contains(
- instanceOperation))) {
- return InstanceConstants.InstanceOperation.DISABLE;
+ activeInstanceOperation.getOperation()))) {
+ return new InstanceOperation.Builder().setOperation(
+
InstanceConstants.InstanceOperation.DISABLE).setReason(getInstanceDisabledReason())
+ .setSource(
+
InstanceConstants.InstanceOperationSource.instanceDisabledTypeToInstanceOperationSource(
+
InstanceConstants.InstanceDisabledType.valueOf(getInstanceDisabledType())))
+ .build();
}
- return instanceOperation;
+ return activeInstanceOperation;
}
/**
@@ -406,7 +666,7 @@ public class InstanceConfig extends HelixProperty {
* @return true if enabled, false otherwise
*/
public boolean getInstanceEnabled() {
- return
getInstanceOperation().equals(InstanceConstants.InstanceOperation.ENABLE);
+ return
getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.ENABLE);
}
/**
@@ -416,7 +676,8 @@ public class InstanceConfig extends HelixProperty {
* @return true if the instance is assignable, false otherwise
*/
public boolean isAssignable() {
- return
InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(getInstanceOperation());
+ return InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(
+ getInstanceOperation().getOperation());
}
/**
@@ -929,10 +1190,8 @@ public class InstanceConfig extends HelixProperty {
instanceConfig.addTag(tag);
}
- if (_instanceOperation == null && _instanceEnabled !=
HELIX_ENABLED_DEFAULT_VALUE) {
- instanceConfig.setInstanceOperationInit(
- _instanceEnabled ? InstanceConstants.InstanceOperation.ENABLE
- : InstanceConstants.InstanceOperation.DISABLE);
+ if (_instanceOperation == null && !_instanceEnabled) {
+
instanceConfig.setInstanceOperationInit(InstanceConstants.InstanceOperation.DISABLE);
}
if (_instanceOperation != null && !_instanceOperation.equals(
diff --git
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index db9ada93c..f634aac46 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -181,8 +181,8 @@ class RoutingDataCache extends BasicClusterDataCache {
private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig>
instanceConfigMap) {
_routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
- (instanceConfigEntry) ->
!InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
- instanceConfigEntry.getValue().getInstanceOperation()))
+ (instanceConfigEntry) ->
!InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
+
instanceConfigEntry.getValue().getInstanceOperation().getOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@@ -190,8 +190,9 @@ class RoutingDataCache extends BasicClusterDataCache {
Map<String, LiveInstance> liveInstanceMap) {
_routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
(liveInstanceEntry) ->
instanceConfigMap.containsKey(liveInstanceEntry.getKey())
- && !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
-
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
+ && !InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
+
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()
+ .getOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
new file mode 100644
index 000000000..967d561e7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
@@ -0,0 +1,198 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.model.ClusterTopologyConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+
+public class InstanceUtil {
+
+ // Private constructor to prevent instantiation
+ private InstanceUtil() {
+ }
+
+ // Validators for instance operation transitions
+ private static final Function<List<InstanceConfig>, Boolean> ALWAYS_ALLOWED =
+ (matchingInstances) -> true;
+ private static final Function<List<InstanceConfig>, Boolean>
ALL_MATCHES_ARE_UNKNOWN =
+ (matchingInstances) -> matchingInstances.isEmpty() ||
matchingInstances.stream().allMatch(
+ instance -> instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN));
+ private static final Function<List<InstanceConfig>, Boolean>
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
+ (matchingInstances) -> matchingInstances.isEmpty() ||
matchingInstances.stream().allMatch(
+ instance -> instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN)
+ || instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.EVACUATE));
+ private static final Function<List<InstanceConfig>, Boolean>
ANY_MATCH_ENABLE_OR_DISABLE =
+ (matchingInstances) -> !matchingInstances.isEmpty() &&
matchingInstances.stream().anyMatch(
+ instance -> instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.ENABLE) ||
instance.getInstanceOperation()
+
.getOperation().equals(InstanceConstants.InstanceOperation.DISABLE));
+
+ // Validator map for valid instance operation transitions
<currentOperation>:<targetOperation>:<validator>
+ private static final ImmutableMap<InstanceConstants.InstanceOperation,
ImmutableMap<InstanceConstants.InstanceOperation,
Function<List<InstanceConfig>, Boolean>>>
+ validInstanceOperationTransitions =
+ ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
+ // ENABLE and DISABLE can be set to UNKNOWN when matching instance is in
SWAP_IN and set to ENABLE in a transaction.
+ ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.DISABLE, ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+ InstanceConstants.InstanceOperation.DISABLE,
+ ImmutableMap.of(InstanceConstants.InstanceOperation.DISABLE,
ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+ InstanceConstants.InstanceOperation.SWAP_IN,
+ // SWAP_IN can be set to ENABLE when matching instance is in UNKNOWN
state in a transaction.
+ ImmutableMap.of(InstanceConstants.InstanceOperation.SWAP_IN,
ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+ InstanceConstants.InstanceOperation.EVACUATE,
+ ImmutableMap.of(InstanceConstants.InstanceOperation.EVACUATE,
ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.ENABLE, ALL_MATCHES_ARE_UNKNOWN,
+ InstanceConstants.InstanceOperation.DISABLE, ALL_MATCHES_ARE_UNKNOWN,
+ InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+ InstanceConstants.InstanceOperation.UNKNOWN,
+ ImmutableMap.of(InstanceConstants.InstanceOperation.UNKNOWN,
ALWAYS_ALLOWED,
+ InstanceConstants.InstanceOperation.ENABLE,
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE,
+ InstanceConstants.InstanceOperation.DISABLE,
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE,
+ InstanceConstants.InstanceOperation.SWAP_IN,
ANY_MATCH_ENABLE_OR_DISABLE));
+
+ /**
+ * Validates if the transition from the current operation to the target
operation is valid.
+ *
+ * @param configAccessor The ConfigAccessor instance
+ * @param clusterName The cluster name
+ * @param instanceConfig The current instance configuration
+ * @param currentOperation The current operation
+ * @param targetOperation The target operation
+ */
+ public static void validateInstanceOperationTransition(ConfigAccessor
configAccessor,
+ String clusterName, InstanceConfig instanceConfig,
+ InstanceConstants.InstanceOperation currentOperation,
+ InstanceConstants.InstanceOperation targetOperation) {
+ // Check if the current operation and target operation are in the valid
transitions map
+ if (!validInstanceOperationTransitions.containsKey(currentOperation)
+ ||
!validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation))
{
+ throw new HelixException(
+ "Invalid instance operation transition from " + currentOperation + "
to "
+ + targetOperation);
+ }
+
+ // Throw exception if the validation fails
+ if
(!validInstanceOperationTransitions.get(currentOperation).get(targetOperation)
+ .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName,
instanceConfig))) {
+ throw new HelixException(
+ "Failed validation for instance operation transition from " +
currentOperation + " to "
+ + targetOperation);
+ }
+ }
+
+ /**
+ * Finds the instances that have a matching logical ID with the given
instance.
+ *
+ * @param configAccessor The ConfigAccessor instance
+ * @param clusterName The cluster name
+ * @param instanceConfig The instance configuration to match
+ * @return A list of matching instances
+ */
+ public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
+ ConfigAccessor configAccessor, String clusterName, InstanceConfig
instanceConfig) {
+ String logicalIdKey =
+
ClusterTopologyConfig.createFromClusterConfig(configAccessor.getClusterConfig(clusterName))
+ .getEndNodeType();
+
+ // Retrieve and filter instances with matching logical ID
+ return configAccessor.getKeys(
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+ clusterName).build()).stream()
+ .map(instanceName -> configAccessor.getInstanceConfig(clusterName,
instanceName)).filter(
+ potentialInstanceConfig ->
+
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
+ && potentialInstanceConfig.getLogicalId(logicalIdKey)
+ .equals(instanceConfig.getLogicalId(logicalIdKey)))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Sets the instance operation for the given instance.
+ *
+ * @param configAccessor The ConfigAccessor instance
+ * @param baseAccessor The BaseDataAccessor instance
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation to set
+ */
+ public static void setInstanceOperation(ConfigAccessor configAccessor,
+ BaseDataAccessor<ZNRecord> baseAccessor, String clusterName, String
instanceName,
+ InstanceConfig.InstanceOperation instanceOperation) {
+ String path = PropertyPathBuilder.instanceConfig(clusterName,
instanceName);
+
+ // Retrieve the current instance configuration
+ InstanceConfig instanceConfig =
configAccessor.getInstanceConfig(clusterName, instanceName);
+ if (instanceConfig == null) {
+ throw new HelixException("Cluster " + clusterName + ", instance: " +
instanceName
+ + ", instance config does not exist");
+ }
+
+ // Validate the instance operation transition
+ validateInstanceOperationTransition(configAccessor, clusterName,
instanceConfig,
+ instanceConfig.getInstanceOperation().getOperation(),
+ instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE
+ : instanceOperation.getOperation());
+
+ // Update the instance operation
+ boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", instance: "
+ instanceName
+ + ", participant config is null");
+ }
+
+ InstanceConfig config = new InstanceConfig(currentData);
+ config.setInstanceOperation(instanceOperation);
+ return config.getRecord();
+ }
+ }, AccessOption.PERSISTENT);
+
+ if (!succeeded) {
+ throw new HelixException(
+ "Failed to update instance operation. Please check if instance is
disabled.");
+ }
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
index bf9b59dc2..5ea42dc3e 100644
---
a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
+++
b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -52,20 +52,18 @@ public class TestDefaultCloudEventCallbackImpl extends
ZkStandAloneCMTestBase {
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
Assert.assertEquals(_manager.getConfigAccessor()
- .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
- .getInstanceDisabledType(),
InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+ .getInstanceConfig(CLUSTER_NAME,
_instanceManager.getInstanceName()).getInstanceOperation()
+ .getSource(), InstanceConstants.InstanceOperationSource.AUTOMATION);
- // Should not disable instance if it is already disabled due to other
reasons
- // And disabled type should remain unchanged
_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(),
false);
_impl.disableInstance(_instanceManager, null);
Assert.assertFalse(InstanceValidationUtil
.isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
Assert.assertEquals(_manager.getConfigAccessor()
.getInstanceConfig(CLUSTER_NAME,
_instanceManager.getInstanceName())
- .getInstanceDisabledType(),
-
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+ .getInstanceOperation().getSource(),
InstanceConstants.InstanceOperationSource.USER);
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(),
true);
_admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(),
false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 85600c01c..67b575f0c 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -211,7 +211,7 @@ public class TestInstanceOperation extends ZkTestBase {
InstanceConfig instanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
participantName);
if (!_participants.get(i).isConnected() ||
!instanceConfig.getInstanceEnabled()
- || instanceConfig.getInstanceOperation()
+ || instanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
if (_participants.get(i).isConnected()) {
_participants.get(i).syncStop();
@@ -338,7 +338,7 @@ public class TestInstanceOperation extends ZkTestBase {
// now remove operation tag
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate,
InstanceConstants.InstanceOperation.ENABLE);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -370,7 +370,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertEquals(
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instanceToSwapInName)
- .getInstanceOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
+ .getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
}
@Test(dependsOnMethods = "testNodeSwapNoTopologySetup")
@@ -397,7 +398,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertEquals(
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instanceToSwapInName)
- .getInstanceOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
+ .getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
}
@Test(dependsOnMethods = "testAddingNodeWithEnableInstanceOperation")
@@ -416,7 +418,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertEquals(
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instanceToSwapInName)
- .getInstanceOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
+ .getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
}
@Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode")
@@ -440,7 +443,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertEquals(
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instanceToSwapInName)
- .getInstanceOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
+ .getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
// Setting the InstanceOperation to SWAP_IN should work because there is a
matching logicalId in
// the cluster and the InstanceCapacityWeights and FaultZone match.
@@ -481,7 +485,8 @@ public class TestInstanceOperation extends ZkTestBase {
// Instance should be UNKNOWN since there was already a swapping pair.
Assert.assertEquals(_gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME,
secondInstanceToSwapInName).getInstanceOperation(),
+ .getInstanceConfig(CLUSTER_NAME,
secondInstanceToSwapInName).getInstanceOperation()
+ .getOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
// Try to set the InstanceOperation to SWAP_IN, it should throw an
exception since there is already
@@ -576,7 +581,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
Assert.assertEquals(_gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceOperation(),
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceOperation()
+ .getOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
// Check to make sure the throttle was enabled again after the swap was
completed.
@@ -681,7 +687,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
Assert.assertEquals(_gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceOperation(),
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceOperation()
+ .getOperation(),
InstanceConstants.InstanceOperation.UNKNOWN);
// Validate that the SWAP_IN instance has the same partitions the swap out
instance had before
@@ -824,7 +831,7 @@ public class TestInstanceOperation extends ZkTestBase {
Collections.emptySet(), Collections.emptySet());
_gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
InstanceConstants.InstanceOperation.ENABLE);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -1110,7 +1117,7 @@ public class TestInstanceOperation extends ZkTestBase {
// This should throw exception because we cannot ever have two instances
with the same logicalId and both have InstanceOperation
// unset.
_gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null);
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
InstanceConstants.InstanceOperation.ENABLE);
}
@Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenSwapping")
@@ -1180,7 +1187,7 @@ public class TestInstanceOperation extends ZkTestBase {
// cancel the evacuation
_gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate,
InstanceConstants.InstanceOperation.ENABLE);
assignment = getEVs();
for (String resource : _allDBs) {
@@ -1222,7 +1229,7 @@ public class TestInstanceOperation extends ZkTestBase {
// cancel evacuation
_gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate,
InstanceConstants.InstanceOperation.ENABLE);
// check every replica has >= 3 active replicas, even before cluster
converge
Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
@@ -1311,7 +1318,7 @@ public class TestInstanceOperation extends ZkTestBase {
// Remove EVACUATE instance's InstanceOperation
_gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
InstanceConstants.InstanceOperation.ENABLE);
}
@Test(dependsOnMethods = "testSwapEvacuateAddRemoveEvacuate")
@@ -1392,7 +1399,7 @@ public class TestInstanceOperation extends ZkTestBase {
@Override
public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
NotificationContext context) {
- if (instanceConfig.get(0).getInstanceOperation()
+ if (instanceConfig.get(0).getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
throttlesEnabled = false;
} else {
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 558110857..b54be8c04 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -182,8 +182,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
String disableReason = "Reason";
tool.enableInstance(clusterName, instanceName, false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, disableReason);
- Assert.assertTrue(tool.getInstanceConfig(clusterName,
instanceName).getInstanceDisabledReason()
- .equals(disableReason));
+ Assert.assertEquals(disableReason, tool.getInstanceConfig(clusterName,
instanceName).getInstanceDisabledReason());
tool.enableInstance(clusterName, instanceName, true,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, disableReason);
Assert.assertTrue(
@@ -348,6 +347,65 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("END testZkHelixAdmin at " + new
Date(System.currentTimeMillis()));
}
+ @Test
+ private void testSetInstanceOperation() {
+ System.out.println("START testSetInstanceOperation at " + new
Date(System.currentTimeMillis()));
+
+ final String clusterName = getShortClassName();
+ String rootPath = "/" + clusterName;
+ if (_gZkClient.exists(rootPath)) {
+ _gZkClient.deleteRecursively(rootPath);
+ }
+
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ tool.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+
Assert.assertTrue(_gZkClient.exists(PropertyPathBuilder.customizedStateConfig(clusterName)));
+
+ // Add instance to cluster
+ String hostname = "host1";
+ String port = "9999";
+ String instanceName = hostname + "_" + port;
+ InstanceConfig config =
+ new
InstanceConfig.Builder().setHostName(hostname).setPort(port).build(instanceName);
+
+ tool.addInstance(clusterName, config);
+
+ // Set instance operation to DISABLE
+ tool.setInstanceOperation(clusterName, instanceName,
+ InstanceConstants.InstanceOperation.DISABLE, "disableReason");
+ Assert.assertEquals(tool.getInstanceConfig(clusterName,
instanceName).getInstanceOperation()
+ .getOperation(),
+ InstanceConstants.InstanceOperation.DISABLE);
+ Assert.assertEquals(
+ tool.getInstanceConfig(clusterName,
instanceName).getInstanceDisabledReason(),
+ "disableReason");
+
+ // Set instance operation to ENABLE
+ tool.setInstanceOperation(clusterName, instanceName,
InstanceConstants.InstanceOperation.ENABLE,
+ "enableReason");
+ Assert.assertEquals(tool.getInstanceConfig(clusterName,
instanceName).getInstanceOperation()
+ .getOperation(),
+ InstanceConstants.InstanceOperation.ENABLE);
+ // InstanceNonServingReason should be empty after setting operation to
ENABLE
+ Assert.assertEquals(
+ tool.getInstanceConfig(clusterName,
instanceName).getInstanceDisabledReason(), "");
+
+ // Set instance operation to UNKNOWN
+ tool.setInstanceOperation(clusterName, instanceName,
+ InstanceConstants.InstanceOperation.UNKNOWN, "unknownReason");
+ Assert.assertEquals(tool.getInstanceConfig(clusterName,
instanceName).getInstanceOperation()
+ .getOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
+ Assert.assertEquals(
+ tool.getInstanceConfig(clusterName,
instanceName).getInstanceOperation().getReason(),
+ "unknownReason");
+
+ deleteCluster(clusterName);
+
+ System.out.println("END testSetInstanceOperation at " + new
Date(System.currentTimeMillis()));
+ }
+
private HelixManager initializeHelixManager(String clusterName, String
instanceName) {
HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName,
instanceName,
InstanceType.PARTICIPANT, org.apache.helix.common.ZkTestBase.ZK_ADDR);
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index d9bc5d7fe..c5c5626ff 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
+
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -285,16 +287,12 @@ public class MockHelixAdmin implements HelixAdmin {
ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath,
null, 0);
InstanceConfig instanceConfig = new InstanceConfig(record);
- instanceConfig.setInstanceOperation(enabled ?
InstanceConstants.InstanceOperation.ENABLE
- : InstanceConstants.InstanceOperation.DISABLE);
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ enabled ? InstanceConstants.InstanceOperation.ENABLE
+ :
InstanceConstants.InstanceOperation.DISABLE).setReason(reason).build());
if (!enabled) {
+ // TODO: Replace this when the HELIX_ENABLED and HELIX_DISABLED fields
are removed.
instanceConfig.resetInstanceDisabledTypeAndReason();
- if (reason != null) {
- instanceConfig.setInstanceDisabledReason(reason);
- }
- if (disabledType != null) {
- instanceConfig.setInstanceDisabledType(disabledType);
- }
}
_baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0);
}
@@ -307,7 +305,20 @@ public class MockHelixAdmin implements HelixAdmin {
@Override
public void setInstanceOperation(String clusterName, String instanceName,
- InstanceConstants.InstanceOperation instanceOperation) {
+ @Nullable InstanceConstants.InstanceOperation instanceOperation) {
+ setInstanceOperation(clusterName, instanceName, instanceOperation, null,
false);
+ }
+
+ @Override
+ public void setInstanceOperation(String clusterName, String instanceName,
+ @Nullable InstanceConstants.InstanceOperation instanceOperation, String
reason) {
+ setInstanceOperation(clusterName, instanceName, instanceOperation, reason,
false);
+ }
+
+ @Override
+ public void setInstanceOperation(String clusterName, String instanceName,
+ @Nullable InstanceConstants.InstanceOperation instanceOperation, String
reason,
+ boolean overrideAll) {
}
@Override
diff --git
a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 7da983b8a..47ea88ac4 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -52,7 +52,6 @@ public class TestInstanceConfig {
public void testSetInstanceEnableWithReason() {
InstanceConfig instanceConfig = new InstanceConfig(new ZNRecord("id"));
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
- instanceConfig.setInstanceDisabledReason("NoShowReason");
instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
Assert.assertEquals(instanceConfig.getRecord().getSimpleFields()
@@ -62,10 +61,9 @@ public class TestInstanceConfig {
Assert.assertEquals(instanceConfig.getRecord().getSimpleFields()
.get(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_TYPE.toString()),
null);
-
-
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
String reasonCode = "ReasonCode";
- instanceConfig.setInstanceDisabledReason(reasonCode);
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+
InstanceConstants.InstanceOperation.DISABLE).setReason(reasonCode).build());
instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
Assert.assertEquals(instanceConfig.getRecord().getSimpleFields()
.get(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()),
"false");
@@ -198,6 +196,30 @@ public class TestInstanceConfig {
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"),
Integer.valueOf(1));
}
+ @Test
+ public void testInstanceOperationReason() {
+ InstanceConfig instanceConfig = new InstanceConfig("instance1");
+ instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceDisabledReason("disableReason");
+ Assert.assertEquals(instanceConfig.getInstanceDisabledReason(),
"disableReason");
+ Assert.assertEquals(instanceConfig.getInstanceDisabledReason(),
"disableReason");
+
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+
InstanceConstants.InstanceOperation.UNKNOWN).setReason("unknownReason").build());
+ Assert.assertEquals(instanceConfig.getInstanceDisabledReason(),
"disableReason");
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(),
"unknownReason");
+
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+
InstanceConstants.InstanceOperation.DISABLE).setReason("disableReason2").build());
+ Assert.assertEquals(instanceConfig.getInstanceDisabledReason(),
"disableReason2");
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(),
"disableReason2");
+
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+ Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), "");
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), "");
+ }
+
@Test
public void testOverwriteInstanceConfig() {
InstanceConfig instanceConfig = new InstanceConfig("instance2");
@@ -233,9 +255,91 @@ public class TestInstanceConfig {
Assert.assertTrue(instanceConfig.getTags().contains("tag4"));
Assert.assertFalse(instanceConfig.getRecord().getSimpleFields()
.containsKey(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()));
- Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
InstanceConstants.InstanceOperation.EVACUATE);
Assert.assertFalse(instanceConfig.getInstanceCapacityMap().containsKey("weight1"));
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight2"),
Integer.valueOf(2));
}
+
+ @Test
+ public void testInstanceOperationMultipleSources() throws
InterruptedException {
+ InstanceConfig instanceConfig = new InstanceConfig("instance1");
+
+ // Check that the instance operation is ENABLE from the DEFAULT source
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.ENABLE);
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+ InstanceConstants.InstanceOperationSource.DEFAULT);
+
+ // Set instance operation from user source
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+
InstanceConstants.InstanceOperation.DISABLE).setReason("userReason")
+ .setSource(InstanceConstants.InstanceOperationSource.USER).build());
+ // Get enabled time
+ long op1EnabledTime = instanceConfig.getInstanceEnabledTime();
+
+ Thread.sleep(1000);
+ // Set instance operation from automation source
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+
InstanceConstants.InstanceOperation.DISABLE).setReason("automationReason")
+
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build());
+
+ // Check that the enabled time is the same as op1 but the source and
reason is changed to automation
+ Assert.assertEquals(instanceConfig.getInstanceEnabledTime(),
op1EnabledTime);
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+ InstanceConstants.InstanceOperationSource.AUTOMATION);
+
+ Thread.sleep(1000);
+ // Set instance operation from user source to be ENABLE
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.ENABLE)
+ .setSource(InstanceConstants.InstanceOperationSource.USER).build());
+
+ // Check that the operation is DISABLE, the enabled time is the same as
op1, and the source is still automation
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.DISABLE);
+ Assert.assertEquals(instanceConfig.getInstanceEnabledTime(),
op1EnabledTime);
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+ InstanceConstants.InstanceOperationSource.AUTOMATION);
+
+ Thread.sleep(1000);
+ // Set the instance operation from the automation source to be ENABLE
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.ENABLE)
+
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build());
+
+ // Check that the operation is ENABLE, the enabled time is the different
from op1, and the source is still automation
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.ENABLE);
+ Assert.assertFalse(instanceConfig.getInstanceEnabledTime() ==
op1EnabledTime);
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+ InstanceConstants.InstanceOperationSource.AUTOMATION);
+
+ // Set the instance operation from the automation source to be EVACUATE
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.EVACUATE)
+
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build());
+
+ // Set the instance operation from the user source to be DISABLE
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.DISABLE)
+ .setSource(InstanceConstants.InstanceOperationSource.USER).build());
+
+ // Check that the instance operation is DISABLE and the source is user
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.DISABLE);
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+ InstanceConstants.InstanceOperationSource.USER);
+
+ // Set the instance operation from the admin source to be ENABLE
+ instanceConfig.setInstanceOperation(new
InstanceConfig.InstanceOperation.Builder().setOperation(
+ InstanceConstants.InstanceOperation.ENABLE)
+ .setSource(InstanceConstants.InstanceOperationSource.ADMIN).build());
+
+ // Check that the instance operation is ENABLE and the source is admin
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+ InstanceConstants.InstanceOperation.ENABLE);
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+ InstanceConstants.InstanceOperationSource.ADMIN);
+ }
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index 8a4bbf07b..bb5a2bc5c 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -268,8 +268,8 @@ public class StoppableInstancesSelector {
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
- if (InstanceConstants.InstanceOperation.EVACUATE
- .equals(instanceConfig.getInstanceOperation())) {
+ if (InstanceConstants.InstanceOperation.EVACUATE.equals(
+ instanceConfig.getInstanceOperation().getOperation())) {
toBeStoppedInstances.add(instance);
}
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index ea98f6637..55fc4de36 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -45,12 +45,14 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Error;
import org.apache.helix.model.HealthStat;
@@ -66,6 +68,7 @@ import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.filters.ClusterAuth;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.util.InstanceUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
@@ -388,9 +391,11 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
@POST
public Response updateInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName, @QueryParam("command")
String command,
- @QueryParam("instanceOperation") InstanceConstants.InstanceOperation
state,
- @QueryParam("instanceDisabledType") String disabledType,
- @QueryParam("instanceDisabledReason") String disabledReason,
+ @QueryParam("instanceOperation") InstanceConstants.InstanceOperation
instanceOperation,
+ @QueryParam("instanceOperationSource")
InstanceConstants.InstanceOperationSource instanceOperationSource,
+ @QueryParam("reason") String reason,
+ @Deprecated @QueryParam("instanceDisabledType") String disabledType,
+ @Deprecated @QueryParam("instanceDisabledReason") String disabledReason,
@QueryParam("force") boolean force, String content) {
Command cmd;
try {
@@ -445,7 +450,12 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
.getTypeFactory().constructCollectionType(List.class,
String.class)));
break;
case setInstanceOperation:
- admin.setInstanceOperation(clusterId, instanceName, state);
+ InstanceUtil.setInstanceOperation(new
ConfigAccessor(getRealmAwareZkClient()),
+ new ZkBaseDataAccessor<>(getRealmAwareZkClient()), clusterId,
instanceName,
+ new
InstanceConfig.InstanceOperation.Builder().setOperation(instanceOperation)
+ .setReason(reason).setSource(
+ force ? InstanceConstants.InstanceOperationSource.ADMIN
: instanceOperationSource)
+ .build());
break;
case canCompleteSwap:
return OK(OBJECT_MAPPER.writeValueAsString(
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 395f9bf85..6ab727e85 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -495,14 +495,14 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME,
INSTANCE_NAME);
- Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
InstanceConstants.InstanceOperation.EVACUATE);
new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP")
.expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME,
INSTANCE_NAME).post(this, entity);
new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME,
INSTANCE_NAME);
- Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
InstanceConstants.InstanceOperation.ENABLE);
// test canCompleteSwap
@@ -543,7 +543,7 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME,
INSTANCE_NAME);
- Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
InstanceConstants.InstanceOperation.EVACUATE);
Response response = new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")
@@ -586,7 +586,7 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, test_instance_name).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME,
test_instance_name);
- Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
InstanceConstants.InstanceOperation.EVACUATE);
response = new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")