This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new b7d771e70 Deprecate HELIX_DISABLED_REASON and refactor how 
InstanceOperation is represented in instance configs. (#2801)
b7d771e70 is described below

commit b7d771e7032dd542ab8b466f28d239c3e425f5de
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Jun 11 11:17:07 2024 -0700

    Deprecate HELIX_DISABLED_REASON and refactor how InstanceOperation is 
represented in instance configs. (#2801)
    
    Deprecate HELIX_DISABLED_REASON and HELIX_DISABLED_TYPE; Refactor 
INSTANCE_OPERATION to HELIX_INSTANCE_OPERATIONS List Field
    
    To prevent conflicts from different clients setting the InstanceOperation, 
we are introducing the HELIX_INSTANCE_OPERATIONS list.
    
    Key changes:
    - Clients using the old Helix enabled APIs will take precedence over 
INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS when those fields are set.
    - When the new InstanceOperation APIs set the operation type to DISABLE, 
the old HELIX_ENABLED field will also be set for backwards compatibility.
    - For all InstanceOperation API invocations, the source will default to 
USER unless specified otherwise. An AUTOMATION source will create a separate 
entry in the list.
    - The most recent non-ENABLE InstanceOperation entry will be the active 
InstanceOperation used by the controller and returned by the 
getInstanceOperation API.
    
    These changes ensure smoother operation transitions and maintain 
compatibility with existing APIs.
---
 .../apache/helix/constants/InstanceConstants.java  |  37 ++-
 .../src/main/java/org/apache/helix/HelixAdmin.java |  31 +-
 .../event/helix/DefaultCloudEventCallbackImpl.java |  24 +-
 .../cloud/event/helix/HelixEventHandlingUtil.java  |   3 +
 .../trimmer/InstanceConfigTrimmer.java             |  16 +
 .../dataproviders/BaseControllerDataProvider.java  |   8 +-
 .../stages/BestPossibleStateCalcStage.java         |   8 +-
 .../stages/CurrentStateComputationStage.java       |   1 +
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 314 ++++++------------
 .../org/apache/helix/model/InstanceConfig.java     | 355 ++++++++++++++++++---
 .../apache/helix/spectator/RoutingDataCache.java   |   9 +-
 .../java/org/apache/helix/util/InstanceUtil.java   | 198 ++++++++++++
 .../event/TestDefaultCloudEventCallbackImpl.java   |  10 +-
 .../rebalancer/TestInstanceOperation.java          |  37 ++-
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  |  62 +++-
 .../java/org/apache/helix/mock/MockHelixAdmin.java |  29 +-
 .../org/apache/helix/model/TestInstanceConfig.java | 114 ++++++-
 .../StoppableInstancesSelector.java                |   4 +-
 .../resources/helix/PerInstanceAccessor.java       |  18 +-
 .../helix/rest/server/TestPerInstanceAccessor.java |   8 +-
 20 files changed, 958 insertions(+), 328 deletions(-)

diff --git 
a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java 
b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
index 85c22c460..22f6c7c76 100644
--- 
a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
+++ 
b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
@@ -20,21 +20,54 @@ public class InstanceConstants {
    * TODO: Remove this when the deprecated HELIX_ENABLED is removed.
    */
   public static final Set<InstanceOperation> 
INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS =
-      ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE, 
InstanceOperation.EVACUATE);
+      ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.EVACUATE);
 
 
   /**
    * The set of InstanceOperations that are not allowed to be populated in the 
RoutingTableProvider.
    */
-  public static final Set<InstanceOperation> UNSERVABLE_INSTANCE_OPERATIONS =
+  public static final Set<InstanceOperation> UNROUTABLE_INSTANCE_OPERATIONS =
       ImmutableSet.of(InstanceOperation.SWAP_IN, InstanceOperation.UNKNOWN);
 
+  @Deprecated
   public enum InstanceDisabledType {
     CLOUD_EVENT,
     USER_OPERATION,
     DEFAULT_INSTANCE_DISABLE_TYPE
   }
 
+  public enum InstanceOperationSource {
+    ADMIN(0), USER(1), AUTOMATION(2), DEFAULT(3);
+
+    private final int _priority;
+
+    InstanceOperationSource(int priority) {
+      _priority = priority;
+    }
+
+    public int getPriority() {
+      return _priority;
+    }
+
+    /**
+     * Convert from InstanceDisabledType to InstanceOperationTrigger
+     *
+     * @param disabledType InstanceDisabledType
+     * @return InstanceOperationTrigger
+     */
+    public static InstanceOperationSource 
instanceDisabledTypeToInstanceOperationSource(
+        InstanceDisabledType disabledType) {
+      switch (disabledType) {
+        case CLOUD_EVENT:
+          return InstanceOperationSource.AUTOMATION;
+        case USER_OPERATION:
+          return InstanceOperationSource.USER;
+        default:
+          return InstanceOperationSource.DEFAULT;
+      }
+    }
+  }
+
   public enum InstanceOperation {
     /**
      * Behavior: Replicas will be assigned to the node and will receive upward 
state transitions if
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 84a7154b1..07afb55b6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -310,15 +310,38 @@ public interface HelixAdmin {
   void enableInstance(String clusterName, List<String> instances, boolean 
enabled);
 
   /**
-   * Set the instanceOperation field. Setting it to null is equivalent to
-   * ENABLE.
+   * Set the instanceOperation of and instance with {@link 
InstanceConstants.InstanceOperation}.
    *
    * @param clusterName       The cluster name
    * @param instanceName      The instance name
-   * @param instanceOperation The instance operation
+   * @param instanceOperation The instance operation type
    */
   void setInstanceOperation(String clusterName, String instanceName,
-      @Nullable InstanceConstants.InstanceOperation instanceOperation);
+      InstanceConstants.InstanceOperation instanceOperation);
+
+  /**
+   * Set the instanceOperation of and instance with {@link 
InstanceConstants.InstanceOperation}.
+   *
+   * @param clusterName       The cluster name
+   * @param instanceName      The instance name
+   * @param instanceOperation The instance operation type
+   * @param reason            The reason for the operation
+   */
+  void setInstanceOperation(String clusterName, String instanceName,
+      InstanceConstants.InstanceOperation instanceOperation, String reason);
+
+  /**
+   * Set the instanceOperation of and instance with {@link 
InstanceConstants.InstanceOperation}.
+   *
+   * @param clusterName       The cluster name
+   * @param instanceName      The instance name
+   * @param instanceOperation The instance operation type
+   * @param reason            The reason for the operation
+   * @param overrideAll       Whether to override all existing instance 
operations from all other
+   *                          instance operations
+   */
+  void setInstanceOperation(String clusterName, String instanceName,
+      InstanceConstants.InstanceOperation instanceOperation, String reason, 
boolean overrideAll);
 
   /**
    * Disable or enable a resource
diff --git 
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
 
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index 04ad4b798..20c500116 100644
--- 
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++ 
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -23,6 +23,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.InstanceUtil;
 import org.apache.helix.util.InstanceValidationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,9 +51,14 @@ public class DefaultCloudEventCallbackImpl {
     LOG.info("DefaultCloudEventCallbackImpl disable Instance {}", 
manager.getInstanceName());
     if (InstanceValidationUtil
         .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) 
{
-      manager.getClusterManagmentTool()
-          .enableInstance(manager.getClusterName(), manager.getInstanceName(), 
false,
-              InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+      InstanceUtil.setInstanceOperation(manager.getConfigAccessor(),
+          manager.getHelixDataAccessor().getBaseDataAccessor(), 
manager.getClusterName(),
+          manager.getInstanceName(),
+              new InstanceConfig.InstanceOperation.Builder().setOperation(
+                      InstanceConstants.InstanceOperation.DISABLE)
+                  
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION)
+                  .setReason(message)
+                  .build());
     }
     
HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
         manager.getInstanceName(), 
manager.getHelixDataAccessor().getBaseDataAccessor(), false,
@@ -72,10 +79,13 @@ public class DefaultCloudEventCallbackImpl {
     HelixEventHandlingUtil
         .updateCloudEventOperationInClusterConfig(manager.getClusterName(), 
instanceName,
             manager.getHelixDataAccessor().getBaseDataAccessor(), true, 
message);
-    if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName, 
accessor)) {
-      
manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), 
instanceName, true,
-          InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
-    }
+    InstanceUtil.setInstanceOperation(manager.getConfigAccessor(),
+        manager.getHelixDataAccessor().getBaseDataAccessor(), 
manager.getClusterName(),
+        manager.getInstanceName(),
+            new InstanceConfig.InstanceOperation.Builder().setOperation(
+                    InstanceConstants.InstanceOperation.ENABLE)
+                
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).setReason(message)
+                .build());
   }
 
   /**
diff --git 
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
 
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
index ee96a13ee..ceff1d299 100644
--- 
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
+++ 
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
@@ -48,7 +48,10 @@ class HelixEventHandlingUtil {
    * @param dataAccessor
    * @return return true only when instance is Helix disabled and the disabled 
reason in
    * instanceConfig is cloudEvent
+   * @deprecated No need to check this if using InstanceOperation and 
specifying the trigger as CLOUD
+   *            when enabling.
    */
+  @Deprecated
   static boolean isInstanceDisabledForCloudEvent(String instanceName,
       HelixDataAccessor dataAccessor) {
     InstanceConfig instanceConfig =
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
index 45b0dde76..cd2b16f92 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.changedetector.trimmer;
  * under the License.
  */
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -62,6 +63,21 @@ public class InstanceConfigTrimmer extends 
HelixPropertyTrimmer<InstanceConfig>
     return STATIC_TOPOLOGY_RELATED_FIELD_MAP;
   }
 
+  /**
+   * We should trim HELIX_INSTANCE_OPERATIONS field, it is used to filter 
instances in the
+   * BaseControllerDataProvider. That filtering will be used to determine if 
ResourceChangeSnapshot
+   * has changed as opposed to checking the actual value of the field.
+   *
+   * @param property the instance config
+   * @return a map contains all non-trimmable field keys that need to be kept.
+   */
+  protected Map<FieldType, Set<String>> getNonTrimmableKeys(InstanceConfig 
property) {
+    Map<FieldType, Set<String>> nonTrimmableKeys = 
super.getNonTrimmableKeys(property);
+    nonTrimmableKeys.get(FieldType.LIST_FIELD)
+        .remove(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name());
+    return nonTrimmableKeys;
+  }
+
   @Override
   public InstanceConfig trimProperty(InstanceConfig property) {
     return new InstanceConfig(doTrim(property));
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a91ae12d2..ce5d3de8c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -413,14 +413,15 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
           
currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
 
       newInstanceConfigMapByInstanceOperation.computeIfAbsent(
-              currentInstanceConfig.getInstanceOperation(), k -> new 
HashMap<>())
+              currentInstanceConfig.getInstanceOperation().getOperation(),
+              k -> new HashMap<>())
           .put(node, currentInstanceConfig);
 
       if (currentInstanceConfig.isAssignable()) {
         newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
       }
 
-      if (currentInstanceConfig.getInstanceOperation()
+      if (currentInstanceConfig.getInstanceOperation().getOperation()
           .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
         
swapInLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
             currentInstanceLogicalId);
@@ -1079,7 +1080,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     _disabledInstanceSet.clear();
     for (InstanceConfig config : allInstanceConfigs) {
       Map<String, List<String>> disabledPartitionMap = 
config.getDisabledPartitionsMap();
-      if 
(config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE))
 {
+      if (config.getInstanceOperation().getOperation()
+          .equals(InstanceConstants.InstanceOperation.DISABLE)) {
         _disabledInstanceSet.add(config.getInstanceName());
       }
       for (String resource : disabledPartitionMap.keySet()) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1db0eccfc..714e9325d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -36,7 +36,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.LogUtil;
-import org.apache.helix.controller.common.ResourcesStateMap;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -358,11 +357,12 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     if (maxInstancesUnableToAcceptOnlineReplicas >= 0) {
       // Instead of only checking the offline instances, we consider how many 
instances in the cluster
       // are not assignable and live. This is because some instances may be 
online but have an unassignable
-      // InstanceOperation such as EVACUATE, DISABLE, or UNKNOWN. We will 
exclude SWAP_IN instances from
+      // InstanceOperation such as EVACUATE, and DISABLE. We will exclude 
SWAP_IN and UNKNOWN instances from
       // they should not account against the capacity of the cluster.
       int instancesUnableToAcceptOnlineReplicas = 
cache.getInstanceConfigMap().entrySet().stream()
-          .filter(instanceEntry -> 
!InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
-              
instanceEntry.getValue().getInstanceOperation())).collect(Collectors.toSet())
+          .filter(instanceEntry -> 
!InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
+              instanceEntry.getValue().getInstanceOperation().getOperation()))
+          .collect(Collectors.toSet())
           .size() - cache.getEnabledLiveInstances().size();
       if (instancesUnableToAcceptOnlineReplicas > 
maxInstancesUnableToAcceptOnlineReplicas) {
         String errMsg = String.format(
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 3bf23d22e..da972d682 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -109,6 +109,7 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
 
       // Only update the currentStateExcludingUnknown if the instance is not 
in UNKNOWN InstanceOperation.
       if (instanceConfig == null || !instanceConfig.getInstanceOperation()
+          .getOperation()
           .equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
         // update current states.
         updateCurrentStates(instance,
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 8c873b4cd..39ae9ae67 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -42,7 +42,6 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableSet;
-import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
@@ -69,7 +68,6 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterStatus;
-import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
@@ -89,11 +87,11 @@ import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.ConfigStringUtil;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.InstanceUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -125,6 +123,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   private final RealmAwareZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
+  private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
   // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false 
otherwise
   // This is used for close() to determine how ZKHelixAdmin should close the 
underlying ZkClient
   private final boolean _usesExternalZkClient;
@@ -142,6 +141,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public ZKHelixAdmin(RealmAwareZkClient zkClient) {
     _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(zkClient);
+    _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
     _usesExternalZkClient = true;
   }
 
@@ -182,12 +182,14 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(_zkClient);
+    _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
     _usesExternalZkClient = false;
   }
 
   private ZKHelixAdmin(RealmAwareZkClient zkClient, boolean 
usesExternalZkClient) {
     _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(_zkClient);
+    _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
     _usesExternalZkClient = usesExternalZkClient;
   }
 
@@ -206,7 +208,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     List<InstanceConfig> matchingLogicalIdInstances =
-        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+        InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, 
clusterName,
+            instanceConfig);
     if (matchingLogicalIdInstances.size() > 1) {
       throw new HelixException(
           "There are already more than one instance with the same logicalId in 
the cluster: "
@@ -216,17 +219,16 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     InstanceConstants.InstanceOperation attemptedInstanceOperation =
-        instanceConfig.getInstanceOperation();
+        instanceConfig.getInstanceOperation().getOperation();
     try {
-      validateInstanceOperationTransition(instanceConfig,
-          !matchingLogicalIdInstances.isEmpty() ? 
matchingLogicalIdInstances.get(0) : null,
-          InstanceConstants.InstanceOperation.UNKNOWN,
-          attemptedInstanceOperation, clusterName);
+      InstanceUtil.validateInstanceOperationTransition(_configAccessor, 
clusterName, instanceConfig,
+          InstanceConstants.InstanceOperation.UNKNOWN, 
attemptedInstanceOperation);
     } catch (HelixException e) {
       
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
       logger.error("Failed to add instance " + 
instanceConfig.getInstanceName() + " to cluster "
           + clusterName + " with instance operation " + 
attemptedInstanceOperation
           + ". Setting INSTANCE_OPERATION to " + 
instanceConfig.getInstanceOperation()
+          .getOperation()
           + " instead.", e);
     }
 
@@ -240,8 +242,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, 
nodeId), true);
     
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName,
 nodeId), true);
     
_zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, 
nodeId), true);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.participantHistory(nodeId), new 
ParticipantHistory(nodeId));
   }
@@ -344,8 +345,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           "instance" + instanceName + " does not exist in cluster " + 
clusterName);
     }
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -364,8 +364,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           "instance" + instanceName + " does not exist in cluster " + 
clusterName);
     }
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey instanceConfigPropertyKey = 
accessor.keyBuilder().instanceConfig(instanceName);
     InstanceConfig currentInstanceConfig = 
accessor.getProperty(instanceConfigPropertyKey);
     if 
(!newInstanceConfig.getHostName().equals(currentInstanceConfig.getHostName())
@@ -397,9 +396,6 @@ public class ZKHelixAdmin implements HelixAdmin {
     // Eventually we will have all instances' enable/disable information in 
clusterConfig. Now we
     // update both instanceConfig and clusterConfig in transition period.
     enableSingleInstance(clusterName, instanceName, enabled, baseAccessor, 
disabledType, reason);
-//    enableBatchInstances(clusterName, 
Collections.singletonList(instanceName), enabled,
-//        baseAccessor, disabledType, reason);
-
   }
 
   @Deprecated
@@ -413,62 +409,6 @@ public class ZKHelixAdmin implements HelixAdmin {
     //enableInstance(clusterName, instances, enabled, null, null);
   }
 
-  private void validateInstanceOperationTransition(InstanceConfig 
instanceConfig,
-      InstanceConfig matchingLogicalIdInstance,
-      InstanceConstants.InstanceOperation currentOperation,
-      InstanceConstants.InstanceOperation targetOperation,
-      String clusterName) {
-    boolean targetStateEnableOrDisable =
-        targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE)
-            || 
targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE);
-    switch (currentOperation) {
-      case ENABLE:
-      case DISABLE:
-        // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any 
time.
-        if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE,
-            InstanceConstants.InstanceOperation.DISABLE,
-            
InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) {
-          return;
-        }
-      case SWAP_IN:
-        // We can only ENABLE or DISABLE a SWAP_IN instance if there is an 
instance with matching logicalId
-        // with an InstanceOperation set to UNKNOWN.
-        if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
-            || matchingLogicalIdInstance.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.UNKNOWN))) || 
targetOperation.equals(
-            InstanceConstants.InstanceOperation.UNKNOWN)) {
-          return;
-        }
-      case EVACUATE:
-        // EVACUATE can only be set to ENABLE or DISABLE when there is no 
instance with the same
-        // logicalId in the cluster.
-        if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null)
-            || 
targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
-          return;
-        }
-      case UNKNOWN:
-        // UNKNOWN can be set to ENABLE or DISABLE when there is no instance 
with the same logicalId in the cluster
-        // or the instance with the same logicalId in the cluster has 
InstanceOperation set to EVACUATE.
-        // UNKNOWN can be set to SWAP_IN when there is an instance with the 
same logicalId in the cluster set to ENABLE,
-        // or DISABLE.
-        if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
-            || matchingLogicalIdInstance.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.EVACUATE)))) {
-          return;
-        } else if 
(targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN)
-            && matchingLogicalIdInstance != null && !ImmutableSet.of(
-                InstanceConstants.InstanceOperation.UNKNOWN,
-                InstanceConstants.InstanceOperation.EVACUATE)
-            .contains(matchingLogicalIdInstance.getInstanceOperation())) {
-          return;
-        }
-      default:
-        throw new HelixException(
-            "InstanceOperation cannot be set to " + targetOperation + " when 
the instance is in "
-                + currentOperation + " state");
-    }
-  }
-
   /**
    * Set the InstanceOperation of an instance in the cluster.
    *
@@ -479,75 +419,57 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void setInstanceOperation(String clusterName, String instanceName,
       @Nullable InstanceConstants.InstanceOperation instanceOperation) {
+    setInstanceOperation(clusterName, instanceName, instanceOperation, null, 
false);
+  }
 
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
-    String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
-
-    InstanceConfig instanceConfig = getInstanceConfig(clusterName, 
instanceName);
-    if (instanceConfig == null) {
-      throw new HelixException("Cluster " + clusterName + ", instance: " + 
instanceName
-          + ", instance config does not exist");
-    }
-    List<InstanceConfig> matchingLogicalIdInstances =
-        findInstancesMatchingLogicalId(clusterName, instanceConfig);
-    validateInstanceOperationTransition(instanceConfig,
-        !matchingLogicalIdInstances.isEmpty() ? 
matchingLogicalIdInstances.get(0) : null,
-        instanceConfig.getInstanceOperation(),
-        instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE 
: instanceOperation,
-        clusterName);
-
-   boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          throw new HelixException(
-              "Cluster: " + clusterName + ", instance: " + instanceName + ", 
participant config is null");
-        }
-
-        InstanceConfig config = new InstanceConfig(currentData);
-        config.setInstanceOperation(instanceOperation);
-        return config.getRecord();
-      }
-    }, AccessOption.PERSISTENT);
+  /**
+   * Set the instanceOperation of and instance with {@link 
InstanceConstants.InstanceOperation}.
+   *
+   * @param clusterName       The cluster name
+   * @param instanceName      The instance name
+   * @param instanceOperation The instance operation type
+   * @param reason            The reason for the operation
+   */
+  @Override
+  public void setInstanceOperation(String clusterName, String instanceName,
+      @Nullable InstanceConstants.InstanceOperation instanceOperation, String 
reason) {
+    setInstanceOperation(clusterName, instanceName, instanceOperation, reason, 
false);
+  }
 
-    if (!succeeded) {
-      throw new HelixException("Failed to update instance operation. Please 
check if instance is disabled.");
-    }
+  /**
+   * Set the instanceOperation of and instance with {@link 
InstanceConstants.InstanceOperation}.
+   *
+   * @param clusterName       The cluster name
+   * @param instanceName      The instance name
+   * @param instanceOperation The instance operation type
+   * @param reason            The reason for the operation
+   * @param overrideAll       Whether to override all existing instance 
operations from all other
+   *                          instance operations
+   */
+  @Override
+  public void setInstanceOperation(String clusterName, String instanceName,
+      @Nullable InstanceConstants.InstanceOperation instanceOperation, String 
reason,
+      boolean overrideAll) {
+    InstanceConfig.InstanceOperation instanceOperationObj =
+        new InstanceConfig.InstanceOperation.Builder().setOperation(
+            instanceOperation == null ? 
InstanceConstants.InstanceOperation.ENABLE
+                : instanceOperation).setReason(reason).setSource(
+            overrideAll ? InstanceConstants.InstanceOperationSource.ADMIN
+                : InstanceConstants.InstanceOperationSource.USER).build();
+    InstanceUtil.setInstanceOperation(_configAccessor, _baseDataAccessor, 
clusterName, instanceName,
+        instanceOperationObj);
   }
 
   @Override
   public boolean isEvacuateFinished(String clusterName, String instanceName) {
     if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
       InstanceConfig config = getInstanceConfig(clusterName, instanceName);
-      return config != null && config.getInstanceOperation()
+      return config != null && config.getInstanceOperation().getOperation()
           .equals(InstanceConstants.InstanceOperation.EVACUATE);
     }
     return false;
   }
 
-  /**
-   * Find the instance that the passed instance has a matching logicalId with.
-   *
-   * @param clusterName    The cluster name
-   * @param instanceConfig The instance to find the matching instance for
-   * @return The matching instance if found, null otherwise.
-   */
-  private List<InstanceConfig> findInstancesMatchingLogicalId(String 
clusterName,
-      InstanceConfig instanceConfig) {
-    String logicalIdKey =
-        
ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName))
-            .getEndNodeType();
-    return getConfigKeys(
-        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
-            clusterName).build()).stream()
-        .map(instanceName -> getInstanceConfig(clusterName, 
instanceName)).filter(
-            potentialInstanceConfig ->
-                
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
-                    && potentialInstanceConfig.getLogicalId(logicalIdKey)
-                    .equals(instanceConfig.getLogicalId(logicalIdKey)))
-        .collect(Collectors.toList());
-  }
-
   /**
    * Check to see if swapping between two instances is ready to be completed. 
Checks: 1. Both
    * instances must be alive. 2. Both instances must only have one session and 
not be carrying over
@@ -563,7 +485,7 @@ public class ZKHelixAdmin implements HelixAdmin {
    */
   private boolean canCompleteSwap(String clusterName, String 
swapOutInstanceName,
       String swapInInstanceName) {
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
     HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
baseAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -579,8 +501,8 @@ public class ZKHelixAdmin implements HelixAdmin {
           "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} 
for cluster {}. Swap will"
               + " not complete unless SwapInInstance instance is ONLINE.",
           swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : 
"OFFLINE",
-          swapOutInstanceConfig.getInstanceOperation(), swapInInstanceName,
-          swapInInstanceConfig.getInstanceOperation(), clusterName);
+          swapOutInstanceConfig.getInstanceOperation().getOperation(), 
swapInInstanceName,
+          swapInInstanceConfig.getInstanceOperation().getOperation(), 
clusterName);
       return false;
     }
 
@@ -619,7 +541,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     // 4. If the swap-out instance is not alive or is disabled, we return true 
without checking
     // the current states on the swap-in instance.
-    if (swapOutLiveInstance == null || 
swapOutInstanceConfig.getInstanceOperation()
+    if (swapOutLiveInstance == null || 
swapOutInstanceConfig.getInstanceOperation().getOperation()
         .equals(InstanceConstants.InstanceOperation.DISABLE)) {
       return true;
     }
@@ -697,7 +619,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     List<InstanceConfig> swappingInstances =
-        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+        InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, 
clusterName,
+            instanceConfig);
     if (swappingInstances.size() != 1) {
       logger.warn(
           "Instance {} in cluster {} is not swapping with any other instance. 
Cannot determine if the swap is complete.",
@@ -705,10 +628,10 @@ public class ZKHelixAdmin implements HelixAdmin {
       return false;
     }
 
-    InstanceConfig swapOutInstanceConfig =
-        
!instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+    InstanceConfig swapOutInstanceConfig = 
!instanceConfig.getInstanceOperation().getOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN)
             ? instanceConfig : swappingInstances.get(0);
-    InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+    InstanceConfig swapInInstanceConfig = 
instanceConfig.getInstanceOperation().getOperation()
         .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
         : swappingInstances.get(0);
     if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
@@ -735,7 +658,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     List<InstanceConfig> swappingInstances =
-        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+        InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, 
clusterName,
+            instanceConfig);
     if (swappingInstances.size() != 1) {
       logger.warn(
           "Instance {} in cluster {} is not swapping with any other instance. 
Cannot determine if the swap is complete.",
@@ -743,10 +667,10 @@ public class ZKHelixAdmin implements HelixAdmin {
       return false;
     }
 
-    InstanceConfig swapOutInstanceConfig =
-        
!instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+    InstanceConfig swapOutInstanceConfig = 
!instanceConfig.getInstanceOperation().getOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN)
             ? instanceConfig : swappingInstances.get(0);
-    InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+    InstanceConfig swapInInstanceConfig = 
instanceConfig.getInstanceOperation().getOperation()
         .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
         : swappingInstances.get(0);
     if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
@@ -802,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
       InstanceConfig config = getInstanceConfig(clusterName, instanceName);
       return config != null && 
INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
-          config.getInstanceOperation());
+          config.getInstanceOperation().getOperation());
     }
     return false;
   }
@@ -816,7 +740,7 @@ public class ZKHelixAdmin implements HelixAdmin {
    */
   private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
       String instanceName) {
-    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     // check the instance is alive
@@ -827,7 +751,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       return false;
     }
 
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
     // count number of sessions under CurrentState folder. If it is carrying 
over from prv session,
     // then there are > 1 session ZNodes.
     List<String> sessions = 
baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName,
 instanceName), 0);
@@ -867,7 +791,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public void enableResource(final String clusterName, final String 
resourceName, final boolean enabled) {
     logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : 
"Disable", resourceName, clusterName);
     String path = PropertyPathBuilder.idealState(clusterName, resourceName);
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
     if (!baseAccessor.exists(path, 0)) {
       throw new HelixException("Cluster " + clusterName + ", resource: " + 
resourceName
           + ", ideal-state does not exist");
@@ -894,7 +818,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         instanceName, clusterName);
     String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
 
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
 
     // check instanceConfig exists
     if (!baseAccessor.exists(path, 0)) {
@@ -973,8 +897,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public void enableCluster(String clusterName, boolean enabled, String 
reason) {
     logger.info("{} cluster {} for reason {}.", enabled ? "Enable" : 
"Disable", clusterName,
         reason == null ? "NULL" : reason);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     if (enabled) {
@@ -998,8 +921,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public boolean isInMaintenanceMode(String clusterName) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getBaseDataAccessor()
         .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
@@ -1248,8 +1170,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       final String reason, final MaintenanceSignal.AutoTriggerReason 
internalReason,
       final Map<String, String> customFields,
       final MaintenanceSignal.TriggeringEntity triggeringEntity) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     logger.info("Cluster {} {} {} maintenance mode for reason {}.", 
clusterName,
         triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? 
"automatically"
@@ -1512,8 +1433,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     List<String> instances = _zkClient.getChildren(memberInstancesPath);
     List<String> result = new ArrayList<String>();
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     for (String instanceName : instances) {
@@ -1659,8 +1579,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public List<String> getResourcesInClusterWithTag(String clusterName, String 
tag) {
     List<String> resourcesWithTag = new ArrayList<String>();
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     for (String resourceName : getResourcesInCluster(clusterName)) {
@@ -1675,8 +1594,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public IdealState getResourceIdealState(String clusterName, String 
resourceName) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.idealStates(resourceName));
@@ -1688,8 +1606,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger
         .info("Set IdealState for resource {} in cluster {} with new 
IdealState {}.", resourceName,
             clusterName, idealState == null ? "NULL" : idealState.toString());
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
@@ -1731,8 +1648,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public ExternalView getResourceExternalView(String clusterName, String 
resourceName) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.externalView(resourceName));
   }
@@ -1740,8 +1656,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public CustomizedView getResourceCustomizedView(String clusterName, String 
resourceName,
       String customizedStateType) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.customizedView(customizedStateType, 
resourceName));
   }
@@ -1774,8 +1689,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       }
     }
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
   }
@@ -1786,8 +1700,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
       throw new HelixException("Cluster " + clusterName + " is not setup yet");
     }
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.removeProperty(keyBuilder.idealStates(resourceName));
@@ -1806,8 +1719,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     CloudConfig.Builder builder = new CloudConfig.Builder(cloudConfig);
     CloudConfig cloudConfigBuilder = builder.build();
 
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.cloudConfig(), cloudConfigBuilder);
   }
@@ -1815,8 +1727,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void removeCloudConfig(String clusterName) {
     logger.info("Remove Cloud Config for cluster {}.", clusterName);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.removeProperty(keyBuilder.cloudConfig());
   }
@@ -1847,8 +1758,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public StateModelDefinition getStateModelDef(String clusterName, String 
stateModelName) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
@@ -1857,8 +1767,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void dropCluster(String clusterName) {
     logger.info("Deleting cluster {}.", clusterName);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     String root = "/" + clusterName;
@@ -1935,8 +1844,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         new CustomizedStateConfig.Builder(customizedStateConfig);
     CustomizedStateConfig customizedStateConfigFromBuilder = builder.build();
 
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.customizedStateConfig(),
         customizedStateConfigFromBuilder);
@@ -1947,8 +1855,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info(
         "Remove CustomizedStateConfig from cluster {}.", clusterName);
 
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.removeProperty(keyBuilder.customizedStateConfig());
 
@@ -1967,8 +1874,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     builder.addAggregationEnabledType(type);
     CustomizedStateConfig customizedStateConfigFromBuilder = builder.build();
 
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     if(!accessor.updateProperty(keyBuilder.customizedStateConfig(),
         customizedStateConfigFromBuilder)) {
@@ -1997,8 +1903,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     builder.removeAggregationEnabledType(type);
     CustomizedStateConfig customizedStateConfigFromBuilder = builder.build();
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.customizedStateConfig(),
         customizedStateConfigFromBuilder);
@@ -2021,7 +1926,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void onDemandRebalance(String clusterName) {
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
     String path = PropertyPathBuilder.clusterConfig(clusterName);
 
     if (!baseAccessor.exists(path, 0)) {
@@ -2204,7 +2109,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       final String constraintId, final ConstraintItem constraintItem) {
     logger.info("Set constraint type {} with constraint id {} for cluster 
{}.", constraintType,
         constraintId, clusterName);
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
 
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -2227,7 +2132,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       final String constraintId) {
     logger.info("Remove constraint type {} with constraint id {} for cluster 
{}.", constraintType,
         constraintId, clusterName);
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
 
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -2248,8 +2153,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public ClusterConstraints getConstraints(String clusterName, ConstraintType 
constraintType) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
 
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     return 
accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
@@ -2331,8 +2235,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException(
           "cluster " + clusterName + " instance " + instanceName + " is not 
setup yet");
     }
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -2352,8 +2255,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException(
           "cluster " + clusterName + " instance " + instanceName + " is not 
setup yet");
     }
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -2373,8 +2275,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException(
           "cluster " + clusterName + " instance " + instanceName + " is not 
setup yet");
     }
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -2431,23 +2332,19 @@ public class ZKHelixAdmin implements HelixAdmin {
         }
 
         InstanceConfig config = new InstanceConfig(currentData);
-        config.setInstanceEnabled(enabled);
-        if (!enabled) {
-          // new disabled type and reason will overwrite existing ones.
-          config.resetInstanceDisabledTypeAndReason();
-          if (reason != null) {
-            config.setInstanceDisabledReason(reason);
-          }
-          if (disabledType != null) {
-            config.setInstanceDisabledType(disabledType);
-          }
-        }
+        config.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            enabled ? InstanceConstants.InstanceOperation.ENABLE
+                : 
InstanceConstants.InstanceOperation.DISABLE).setReason(reason).setSource(
+            disabledType != null
+                ? 
InstanceConstants.InstanceOperationSource.instanceDisabledTypeToInstanceOperationSource(
+                disabledType) : null).build());
         return config.getRecord();
       }
     }, AccessOption.PERSISTENT);
   }
 
   // TODO: Add history ZNode for all batched enabling/disabling histories with 
metadata.
+  @Deprecated
   private void enableBatchInstances(final String clusterName, final 
List<String> instances,
       final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor,
       InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -2783,8 +2680,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       }
     }
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<String> instanceConfigNames = 
accessor.getChildNames(keyBuilder.instanceConfigs());
     List<String> instancePathNames = 
accessor.getChildNames(keyBuilder.instances());
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index de41646c3..1b3acd68d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -29,6 +29,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
@@ -48,11 +53,13 @@ public class InstanceConfig extends HelixProperty {
    * Configurable characteristics of an instance
    */
   public enum InstanceConfigProperty {
-    HELIX_HOST, HELIX_PORT, HELIX_ZONE_ID, @Deprecated
-    HELIX_ENABLED,
+    HELIX_HOST,
+    HELIX_PORT,
+    HELIX_ZONE_ID,
+    @Deprecated HELIX_ENABLED,
     HELIX_ENABLED_TIMESTAMP,
-    HELIX_DISABLED_REASON,
-    HELIX_DISABLED_TYPE,
+    @Deprecated HELIX_DISABLED_REASON,
+    @Deprecated HELIX_DISABLED_TYPE,
     HELIX_DISABLED_PARTITION,
     TAG_LIST,
     INSTANCE_WEIGHT,
@@ -60,15 +67,128 @@ public class InstanceConfig extends HelixProperty {
     DELAY_REBALANCE_ENABLED,
     MAX_CONCURRENT_TASK,
     INSTANCE_INFO_MAP,
-    INSTANCE_CAPACITY_MAP,
-    TARGET_TASK_THREAD_POOL_SIZE,
-    INSTANCE_OPERATION
+    INSTANCE_CAPACITY_MAP, TARGET_TASK_THREAD_POOL_SIZE, 
HELIX_INSTANCE_OPERATIONS
+  }
+
+  public static class InstanceOperation {
+    private final Map<String, String> _properties;
+
+    private enum InstanceOperationProperties {
+      OPERATION, REASON, SOURCE, TIMESTAMP
+    }
+
+    private InstanceOperation(@Nullable Map<String, String> properties) {
+      // Default to ENABLE operation if no operation type is provided.
+      _properties = properties == null ? new HashMap<>() : properties;
+      if 
(!_properties.containsKey(InstanceOperationProperties.OPERATION.name())) {
+        _properties.put(InstanceOperationProperties.OPERATION.name(),
+            InstanceConstants.InstanceOperation.ENABLE.name());
+      }
+    }
+
+    public static class Builder {
+      private Map<String, String> _properties = new HashMap<>();
+
+      /**
+       * Set the operation type for this instance operation.
+       * @param operationType InstanceOperation type of this instance 
operation.
+       */
+      public Builder setOperation(@Nullable 
InstanceConstants.InstanceOperation operationType) {
+        _properties.put(InstanceOperationProperties.OPERATION.name(),
+            operationType == null ? 
InstanceConstants.InstanceOperation.ENABLE.name()
+                : operationType.name());
+        return this;
+      }
+
+      /**
+       * Set the reason for this instance operation.
+       * @param reason
+       */
+      public Builder setReason(String reason) {
+        _properties.put(InstanceOperationProperties.REASON.name(), reason != 
null ? reason : "");
+        return this;
+      }
+
+      /**
+       * Set the source for this instance operation.
+       * @param source InstanceOperationSource
+       *              that caused this instance operation to be triggered.
+       */
+      public Builder setSource(InstanceConstants.InstanceOperationSource 
source) {
+        _properties.put(InstanceOperationProperties.SOURCE.name(),
+            source == null ? 
InstanceConstants.InstanceOperationSource.USER.name()
+                : source.name());
+        return this;
+      }
+
+      public InstanceOperation build() throws IllegalArgumentException {
+        if 
(!_properties.containsKey(InstanceOperationProperties.OPERATION.name())) {
+          throw new IllegalArgumentException(
+              "Instance operation type is not set, this is a required field.");
+        }
+        _properties.put(InstanceOperationProperties.TIMESTAMP.name(),
+            String.valueOf(System.currentTimeMillis()));
+        return new InstanceOperation(_properties);
+      }
+    }
+
+    /**
+     * Get the operation type of this instance operation.
+     * @return the InstanceOperation type
+     */
+    public InstanceConstants.InstanceOperation getOperation() throws 
IllegalArgumentException {
+      return InstanceConstants.InstanceOperation.valueOf(
+          _properties.get(InstanceOperationProperties.OPERATION.name()));
+    }
+
+    /**
+     * Get the reason for this instance operation.
+     * If the reason is not set, it will default to an empty string.
+     *
+     * @return the reason for this instance operation.
+     */
+    public String getReason() {
+      return 
_properties.getOrDefault(InstanceOperationProperties.REASON.name(), "");
+    }
+
+    /**
+     * Get the InstanceOperationSource
+     * that caused this instance operation to be triggered.
+     * If the source is not set, it will default to DEFAULT.
+     *
+     * @return the InstanceOperationSource
+     *that caused this instance operation to be triggered.
+     */
+    public InstanceConstants.InstanceOperationSource getSource() {
+      return InstanceConstants.InstanceOperationSource.valueOf(
+          _properties.getOrDefault(InstanceOperationProperties.SOURCE.name(),
+              InstanceConstants.InstanceOperationSource.USER.name()));
+    }
+
+    /**
+     * Get the timestamp (milliseconds from epoch) when this instance 
operation was triggered.
+     *
+     * @return the timestamp when the instance operation was triggered.
+     */
+    public long getTimestamp() {
+      return 
Long.parseLong(_properties.get(InstanceOperationProperties.TIMESTAMP.name()));
+    }
+
+    private void setTimestamp(long timestamp) {
+      _properties.put(InstanceOperationProperties.TIMESTAMP.name(), 
String.valueOf(timestamp));
+    }
+
+    private Map<String, String> getProperties() {
+      return _properties;
+    }
   }
 
   public static final int WEIGHT_NOT_SET = -1;
   public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
   private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
   private static final boolean HELIX_ENABLED_DEFAULT_VALUE = true;
+  private static final long HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE = -1;
+  private static final ObjectMapper _objectMapper = new ObjectMapper();
 
   // These fields are not allowed to be overwritten by the merge method because
   // they are unique properties of an instance.
@@ -79,6 +199,8 @@ public class InstanceConfig extends HelixProperty {
 
   private static final Logger _logger = 
LoggerFactory.getLogger(InstanceConfig.class.getName());
 
+  private List<InstanceOperation> _deserializedInstanceOperations;
+
   /**
    * Instantiate for a specific instance
    * @param instanceId the instance identifier
@@ -264,25 +386,28 @@ public class InstanceConfig extends HelixProperty {
    *        enabled/disabled, return -1.
    */
   public long getInstanceEnabledTime() {
-    return 
_record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
+    return 
_record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(),
+        HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE);
   }
 
   /**
    * Set the enabled state of the instance If user enables the instance, 
HELIX_DISABLED_REASON filed
    * will be removed.
-   * @deprecated This method is deprecated. Please use setInstanceOperation 
instead.
    * @param enabled true to enable, false to disable
+   * @deprecated This method is deprecated. Please use setInstanceOperation 
instead.
    */
   @Deprecated
   public void setInstanceEnabled(boolean enabled) {
     // set instance operation only when we need to change InstanceEnabled 
value.
-    setInstanceEnabledHelper(enabled);
+    setInstanceEnabledHelper(enabled, null);
   }
 
-  private void setInstanceEnabledHelper(boolean enabled) {
-    _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), 
enabled);
-    
_record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), 
System.currentTimeMillis());
+  private void setInstanceEnabledHelper(boolean enabled, Long 
timestampOverride) {
+    _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(), 
enabled);
+    _record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(),
+        timestampOverride != null ? timestampOverride : 
System.currentTimeMillis());
     if (enabled) {
+      // TODO: Replace this when HELIX_ENABLED and HELIX_DISABLED_REASON is 
removed.
       resetInstanceDisabledTypeAndReason();
     }
   }
@@ -290,6 +415,7 @@ public class InstanceConfig extends HelixProperty {
   /**
    * Removes HELIX_DISABLED_REASON and HELIX_DISABLED_TYPE entry from simple 
field.
    */
+  @Deprecated
   public void resetInstanceDisabledTypeAndReason() {
     
_record.getSimpleFields().remove(InstanceConfigProperty.HELIX_DISABLED_REASON.name());
     
_record.getSimpleFields().remove(InstanceConfigProperty.HELIX_DISABLED_TYPE.name());
@@ -298,19 +424,25 @@ public class InstanceConfig extends HelixProperty {
   /**
    * Set the instance disabled reason when instance is disabled.
    * It will be a no-op when instance is enabled.
+   * @deprecated This method is deprecated. Please use .
    */
+  @Deprecated
   public void setInstanceDisabledReason(String disabledReason) {
-    if 
(getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
-       
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), 
disabledReason);
-     }
+    if 
(getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE))
 {
+      
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), 
disabledReason);
+    }
   }
 
   /**
    * Set the instance disabled type when instance is disabled.
    * It will be a no-op when instance is enabled.
+   * @deprecated This method is deprecated. Please use setInstanceOperation 
along with
+   * InstanceOperation.Builder().setSource
+   *(...)
    */
+  @Deprecated
   public void setInstanceDisabledType(InstanceConstants.InstanceDisabledType 
disabledType) {
-    if 
(getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
+    if 
(getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE))
 {
       _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
           disabledType.name());
     }
@@ -319,7 +451,9 @@ public class InstanceConfig extends HelixProperty {
   /**
    * Get the instance disabled reason when instance is disabled.
    * @return Return instance disabled reason. Default is am empty string.
+   * @deprecated This method is deprecated. Please use 
getInstanceOperation().getReason() instead.
    */
+  @Deprecated
   public String getInstanceDisabledReason() {
     return 
_record.getStringField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), "");
   }
@@ -328,63 +462,184 @@ public class InstanceConfig extends HelixProperty {
    *
    * @return Return instance disabled type 
(org.apache.helix.constants.InstanceConstants.InstanceDisabledType)
    *         Default is am empty string.
+   * @deprecated This method is deprecated. Please use 
getInstanceOperation().getSource
+   *() instead.
    */
+  @Deprecated
   public String getInstanceDisabledType() {
-    if 
(!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
+    if (_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
+        HELIX_ENABLED_DEFAULT_VALUE)) {
       return InstanceConstants.INSTANCE_NOT_DISABLED;
     }
     return 
_record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
         
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
   }
 
+  private List<InstanceOperation> getInstanceOperations() {
+    if (_deserializedInstanceOperations == null || 
_deserializedInstanceOperations.isEmpty()) {
+      // If the _deserializedInstanceOperations is not set, then we need to 
build it from the real
+      // helix property HELIX_INSTANCE_OPERATIONS.
+      List<String> instanceOperations =
+          
_record.getListField(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name());
+      List<InstanceOperation> newDeserializedInstanceOperations = new 
ArrayList<>();
+      if (instanceOperations != null) {
+        for (String serializedInstanceOperation : instanceOperations) {
+          try {
+            Map<String, String> properties = 
_objectMapper.readValue(serializedInstanceOperation,
+                    new TypeReference<Map<String, String>>() {
+                    });
+            newDeserializedInstanceOperations.add(new 
InstanceOperation(properties));
+          } catch (JsonProcessingException e) {
+            _logger.error(
+                "Failed to deserialize instance operation for instance: " + 
_record.getId(), e);
+          }
+        }
+      }
+      _deserializedInstanceOperations = newDeserializedInstanceOperations;
+    }
+
+    return _deserializedInstanceOperations;
+  }
+
   /**
    * Set the instance operation for this instance.
+   * This method also sets the HELIX_ENABLED, HELIX_DISABLED_REASON, and 
HELIX_DISABLED_TYPE fields
+   * for backwards compatibility.
    *
    * @param operation the instance operation
    */
-  public void setInstanceOperation(InstanceConstants.InstanceOperation 
operation) {
-    _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
-        operation == null ? "" : operation.name());
-    if (operation == null || operation == 
InstanceConstants.InstanceOperation.ENABLE
-        || operation == InstanceConstants.InstanceOperation.DISABLE) {
+  public void setInstanceOperation(InstanceOperation operation) {
+    List<InstanceOperation> deserializedInstanceOperations = 
getInstanceOperations();
+
+    if (operation.getSource() == 
InstanceConstants.InstanceOperationSource.ADMIN) {
+      deserializedInstanceOperations.clear();
+    } else {
+      // Remove the instance operation with the same source if it exists.
+      deserializedInstanceOperations.removeIf(
+          instanceOperation -> instanceOperation.getSource() == 
operation.getSource());
+    }
+    if (operation.getOperation() == 
InstanceConstants.InstanceOperation.ENABLE) {
+      // Insert the operation after the last ENABLE or at the beginning if 
there isn't ENABLE in the list.
+      int insertIndex = 0;
+      for (int i = deserializedInstanceOperations.size() - 1; i >= 0; i--) {
+        if (deserializedInstanceOperations.get(i).getOperation()
+            == InstanceConstants.InstanceOperation.ENABLE) {
+          insertIndex = i + 1;
+          break;
+        }
+      }
+      deserializedInstanceOperations.add(insertIndex, operation);
+    } else {
+      deserializedInstanceOperations.add(operation);
+    }
+    // Set the actual field in the ZnRecord
+    
_record.setListField(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name(),
+        deserializedInstanceOperations.stream().map(instanceOperation -> {
+          try {
+            return 
_objectMapper.writeValueAsString(instanceOperation.getProperties());
+          } catch (JsonProcessingException e) {
+            throw new HelixException(
+                "Failed to serialize instance operation for instance: " + 
_record.getId()
+                    + " Can't set the instance operation to: " + 
operation.getOperation(), e);
+          }
+        }).collect(Collectors.toList()));
+
+    // TODO: Remove this when we are sure that all users are using the new 
InstanceOperation only and HELIX_ENABLED is removed.
+    if (operation.getOperation() == 
InstanceConstants.InstanceOperation.DISABLE) {
       // We are still setting the HELIX_ENABLED field for backwards 
compatibility.
       // It is possible that users will be using earlier version of HelixAdmin 
or helix-rest
       // is on older version.
-      // TODO: Remove this when we are sure that all users are using the new 
field INSTANCE_OPERATION.
-      setInstanceEnabledHelper(!(operation == 
InstanceConstants.InstanceOperation.DISABLE));
+
+      if (_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(), 
true)) {
+        // Check if it is already disabled, if yes, then we don't need to set 
HELIX_ENABLED and HELIX_ENABLED_TIMESTAMP
+        setInstanceEnabledHelper(false, operation.getTimestamp());
+      }
+
+      
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(),
+          operation.getReason());
+      _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
+          
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+    } else if (operation.getOperation() == 
InstanceConstants.InstanceOperation.ENABLE) {
+      // If any of the other InstanceOperations are of type DISABLE, set that 
in the HELIX_ENABLED,
+      // HELIX_DISABLED_REASON, and HELIX_DISABLED_TYPE fields.
+      InstanceOperation latestDisableInstanceOperation = null;
+      for (InstanceOperation instanceOperation : getInstanceOperations()) {
+        if (instanceOperation.getOperation() == 
InstanceConstants.InstanceOperation.DISABLE && (
+            latestDisableInstanceOperation == null || 
instanceOperation.getTimestamp()
+                > latestDisableInstanceOperation.getTimestamp())) {
+          latestDisableInstanceOperation = instanceOperation;
+        }
+      }
+
+      if (latestDisableInstanceOperation != null) {
+        
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(),
+            latestDisableInstanceOperation.getReason());
+        
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
+            
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+      } else {
+        setInstanceEnabledHelper(true, operation.getTimestamp());
+      }
     }
   }
 
+  /**
+   * Set the instance operation for this instance. Provide the 
InstanceOperation enum and the reason
+   * and source will be set to default values.
+   *
+   * @param operation the instance operation
+   */
+  public void setInstanceOperation(InstanceConstants.InstanceOperation 
operation) {
+    InstanceOperation instanceOperation =
+        new InstanceOperation.Builder().setOperation(operation).build();
+    setInstanceOperation(instanceOperation);
+  }
+
   private void setInstanceOperationInit(InstanceConstants.InstanceOperation 
operation) {
     if (operation == null) {
       return;
     }
-    _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), 
operation.name());
+    InstanceOperation instanceOperation =
+        new 
InstanceOperation.Builder().setOperation(operation).setReason("INIT").build();
+    // When an instance is created for the first time the timestamp is set to 
-1 so that if it
+    // is disabled it will not be considered within the delay window when it 
joins.
+    instanceOperation.setTimestamp(HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE);
+    setInstanceOperation(instanceOperation);
+  }
+
+  private InstanceOperation getActiveInstanceOperation() {
+    List<InstanceOperation> instanceOperations = getInstanceOperations();
+
+    if (instanceOperations.isEmpty()) {
+      InstanceOperation instanceOperation =
+          new 
InstanceOperation.Builder().setOperation(InstanceConstants.InstanceOperation.ENABLE)
+              
.setSource(InstanceConstants.InstanceOperationSource.DEFAULT).build();
+      instanceOperation.setTimestamp(HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE);
+      return instanceOperation;
+    }
+
+    // The last instance operation in the list is the most recent one.
+    // ENABLE operation should not be included in the list.
+    return instanceOperations.get(instanceOperations.size() - 1);
   }
 
   /**
-   * Get the InstanceOperation of this instance, default is ENABLE if nothing 
is set. If
+   * Get the InstanceOperationType of this instance, default is ENABLE if 
nothing is set. If
    * HELIX_ENABLED is set to false, then the instance operation is DISABLE for 
backwards
    * compatibility.
    *
    * @return the instance operation
    */
-  public InstanceConstants.InstanceOperation getInstanceOperation() {
-    String instanceOperationString =
-        
_record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name());
-
-    InstanceConstants.InstanceOperation instanceOperation;
+  public InstanceOperation getInstanceOperation() {
+    InstanceOperation activeInstanceOperation = getActiveInstanceOperation();
     try {
-      // If INSTANCE_OPERATION is not set, then the instance is enabled.
-      instanceOperation = (instanceOperationString == null || 
instanceOperationString.isEmpty())
-          ? InstanceConstants.InstanceOperation.ENABLE
-          : 
InstanceConstants.InstanceOperation.valueOf(instanceOperationString);
+      activeInstanceOperation.getOperation();
     } catch (IllegalArgumentException e) {
-      _logger.error("Invalid instance operation: " + instanceOperationString + 
" for instance: "
-          + _record.getId()
+      _logger.error("Invalid instance operation type for instance: " + 
_record.getId()
           + ". You may need to update your version of Helix to get support for 
this "
           + "type of InstanceOperation. Defaulting to UNKNOWN.");
-      return InstanceConstants.InstanceOperation.UNKNOWN;
+      activeInstanceOperation =
+          new 
InstanceOperation.Builder().setOperation(InstanceConstants.InstanceOperation.UNKNOWN)
+              .build();
     }
 
     // Always respect the HELIX_ENABLED being set to false when instance 
operation is unset
@@ -392,11 +647,16 @@ public class InstanceConfig extends HelixProperty {
     if (!_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
         HELIX_ENABLED_DEFAULT_VALUE)
         && 
(InstanceConstants.INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS.contains(
-        instanceOperation))) {
-      return InstanceConstants.InstanceOperation.DISABLE;
+        activeInstanceOperation.getOperation()))) {
+      return new InstanceOperation.Builder().setOperation(
+              
InstanceConstants.InstanceOperation.DISABLE).setReason(getInstanceDisabledReason())
+          .setSource(
+              
InstanceConstants.InstanceOperationSource.instanceDisabledTypeToInstanceOperationSource(
+                  
InstanceConstants.InstanceDisabledType.valueOf(getInstanceDisabledType())))
+          .build();
     }
 
-    return instanceOperation;
+    return activeInstanceOperation;
   }
 
   /**
@@ -406,7 +666,7 @@ public class InstanceConfig extends HelixProperty {
    * @return true if enabled, false otherwise
    */
   public boolean getInstanceEnabled() {
-    return 
getInstanceOperation().equals(InstanceConstants.InstanceOperation.ENABLE);
+    return 
getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.ENABLE);
   }
 
   /**
@@ -416,7 +676,8 @@ public class InstanceConfig extends HelixProperty {
    * @return true if the instance is assignable, false otherwise
    */
   public boolean isAssignable() {
-    return 
InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(getInstanceOperation());
+    return InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(
+        getInstanceOperation().getOperation());
   }
 
   /**
@@ -929,10 +1190,8 @@ public class InstanceConfig extends HelixProperty {
         instanceConfig.addTag(tag);
       }
 
-      if (_instanceOperation == null && _instanceEnabled != 
HELIX_ENABLED_DEFAULT_VALUE) {
-        instanceConfig.setInstanceOperationInit(
-            _instanceEnabled ? InstanceConstants.InstanceOperation.ENABLE
-                : InstanceConstants.InstanceOperation.DISABLE);
+      if (_instanceOperation == null && !_instanceEnabled) {
+        
instanceConfig.setInstanceOperationInit(InstanceConstants.InstanceOperation.DISABLE);
       }
 
       if (_instanceOperation != null && !_instanceOperation.equals(
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index db9ada93c..f634aac46 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -181,8 +181,8 @@ class RoutingDataCache extends BasicClusterDataCache {
 
   private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig> 
instanceConfigMap) {
     _routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
-            (instanceConfigEntry) -> 
!InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
-                instanceConfigEntry.getValue().getInstanceOperation()))
+            (instanceConfigEntry) -> 
!InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
+                
instanceConfigEntry.getValue().getInstanceOperation().getOperation()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
@@ -190,8 +190,9 @@ class RoutingDataCache extends BasicClusterDataCache {
       Map<String, LiveInstance> liveInstanceMap) {
     _routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
             (liveInstanceEntry) -> 
instanceConfigMap.containsKey(liveInstanceEntry.getKey())
-                && !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
-                
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
+                && !InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
+                
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()
+                    .getOperation()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
new file mode 100644
index 000000000..967d561e7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
@@ -0,0 +1,198 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.model.ClusterTopologyConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+
+public class InstanceUtil {
+
+  // Private constructor to prevent instantiation
+  private InstanceUtil() {
+  }
+
+  // Validators for instance operation transitions
+  private static final Function<List<InstanceConfig>, Boolean> ALWAYS_ALLOWED =
+      (matchingInstances) -> true;
+  private static final Function<List<InstanceConfig>, Boolean> 
ALL_MATCHES_ARE_UNKNOWN =
+      (matchingInstances) -> matchingInstances.isEmpty() || 
matchingInstances.stream().allMatch(
+          instance -> instance.getInstanceOperation().getOperation()
+              .equals(InstanceConstants.InstanceOperation.UNKNOWN));
+  private static final Function<List<InstanceConfig>, Boolean> 
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
+      (matchingInstances) -> matchingInstances.isEmpty() || 
matchingInstances.stream().allMatch(
+          instance -> instance.getInstanceOperation().getOperation()
+              .equals(InstanceConstants.InstanceOperation.UNKNOWN)
+              || instance.getInstanceOperation().getOperation()
+              .equals(InstanceConstants.InstanceOperation.EVACUATE));
+  private static final Function<List<InstanceConfig>, Boolean> 
ANY_MATCH_ENABLE_OR_DISABLE =
+      (matchingInstances) -> !matchingInstances.isEmpty() && 
matchingInstances.stream().anyMatch(
+          instance -> instance.getInstanceOperation().getOperation()
+              .equals(InstanceConstants.InstanceOperation.ENABLE) || 
instance.getInstanceOperation()
+              
.getOperation().equals(InstanceConstants.InstanceOperation.DISABLE));
+
+  // Validator map for valid instance operation transitions 
<currentOperation>:<targetOperation>:<validator>
+  private static final ImmutableMap<InstanceConstants.InstanceOperation, 
ImmutableMap<InstanceConstants.InstanceOperation, 
Function<List<InstanceConfig>, Boolean>>>
+      validInstanceOperationTransitions =
+      ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
+      // ENABLE and DISABLE can be set to UNKNOWN when matching instance is in 
SWAP_IN and set to ENABLE in a transaction.
+          ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, 
ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.DISABLE, ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+      InstanceConstants.InstanceOperation.DISABLE,
+          ImmutableMap.of(InstanceConstants.InstanceOperation.DISABLE, 
ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+      InstanceConstants.InstanceOperation.SWAP_IN,
+      // SWAP_IN can be set to ENABLE when matching instance is in UNKNOWN 
state in a transaction.
+          ImmutableMap.of(InstanceConstants.InstanceOperation.SWAP_IN, 
ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+      InstanceConstants.InstanceOperation.EVACUATE,
+          ImmutableMap.of(InstanceConstants.InstanceOperation.EVACUATE, 
ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.ENABLE, ALL_MATCHES_ARE_UNKNOWN,
+          InstanceConstants.InstanceOperation.DISABLE, ALL_MATCHES_ARE_UNKNOWN,
+          InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED),
+      InstanceConstants.InstanceOperation.UNKNOWN,
+          ImmutableMap.of(InstanceConstants.InstanceOperation.UNKNOWN, 
ALWAYS_ALLOWED,
+          InstanceConstants.InstanceOperation.ENABLE, 
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE,
+          InstanceConstants.InstanceOperation.DISABLE, 
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE,
+          InstanceConstants.InstanceOperation.SWAP_IN, 
ANY_MATCH_ENABLE_OR_DISABLE));
+
+  /**
+   * Validates if the transition from the current operation to the target 
operation is valid.
+   *
+   * @param configAccessor   The ConfigAccessor instance
+   * @param clusterName      The cluster name
+   * @param instanceConfig   The current instance configuration
+   * @param currentOperation The current operation
+   * @param targetOperation  The target operation
+   */
+  public static void validateInstanceOperationTransition(ConfigAccessor 
configAccessor,
+      String clusterName, InstanceConfig instanceConfig,
+      InstanceConstants.InstanceOperation currentOperation,
+      InstanceConstants.InstanceOperation targetOperation) {
+    // Check if the current operation and target operation are in the valid 
transitions map
+    if (!validInstanceOperationTransitions.containsKey(currentOperation)
+        || 
!validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation))
 {
+      throw new HelixException(
+          "Invalid instance operation transition from " + currentOperation + " 
to "
+              + targetOperation);
+    }
+
+    // Throw exception if the validation fails
+    if 
(!validInstanceOperationTransitions.get(currentOperation).get(targetOperation)
+        .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName, 
instanceConfig))) {
+      throw new HelixException(
+          "Failed validation for instance operation transition from " + 
currentOperation + " to "
+              + targetOperation);
+    }
+  }
+
+  /**
+   * Finds the instances that have a matching logical ID with the given 
instance.
+   *
+   * @param configAccessor  The ConfigAccessor instance
+   * @param clusterName     The cluster name
+   * @param instanceConfig  The instance configuration to match
+   * @return A list of matching instances
+   */
+  public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
+      ConfigAccessor configAccessor, String clusterName, InstanceConfig 
instanceConfig) {
+    String logicalIdKey =
+        
ClusterTopologyConfig.createFromClusterConfig(configAccessor.getClusterConfig(clusterName))
+            .getEndNodeType();
+
+    // Retrieve and filter instances with matching logical ID
+    return configAccessor.getKeys(
+            new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+                clusterName).build()).stream()
+        .map(instanceName -> configAccessor.getInstanceConfig(clusterName, 
instanceName)).filter(
+            potentialInstanceConfig ->
+                
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
+                    && potentialInstanceConfig.getLogicalId(logicalIdKey)
+                    .equals(instanceConfig.getLogicalId(logicalIdKey)))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Sets the instance operation for the given instance.
+   *
+   * @param configAccessor      The ConfigAccessor instance
+   * @param baseAccessor        The BaseDataAccessor instance
+   * @param clusterName         The cluster name
+   * @param instanceName        The instance name
+   * @param instanceOperation   The instance operation to set
+   */
+  public static void setInstanceOperation(ConfigAccessor configAccessor,
+      BaseDataAccessor<ZNRecord> baseAccessor, String clusterName, String 
instanceName,
+      InstanceConfig.InstanceOperation instanceOperation) {
+    String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
+
+    // Retrieve the current instance configuration
+    InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+    if (instanceConfig == null) {
+      throw new HelixException("Cluster " + clusterName + ", instance: " + 
instanceName
+          + ", instance config does not exist");
+    }
+
+    // Validate the instance operation transition
+    validateInstanceOperationTransition(configAccessor, clusterName, 
instanceConfig,
+        instanceConfig.getInstanceOperation().getOperation(),
+        instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE
+            : instanceOperation.getOperation());
+
+    // Update the instance operation
+    boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ", instance: " 
+ instanceName
+              + ", participant config is null");
+        }
+
+        InstanceConfig config = new InstanceConfig(currentData);
+        config.setInstanceOperation(instanceOperation);
+        return config.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+
+    if (!succeeded) {
+      throw new HelixException(
+          "Failed to update instance operation. Please check if instance is 
disabled.");
+    }
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
 
b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
index bf9b59dc2..5ea42dc3e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
+++ 
b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -52,20 +52,18 @@ public class TestDefaultCloudEventCallbackImpl extends 
ZkStandAloneCMTestBase {
     Assert.assertFalse(InstanceValidationUtil
         .isEnabled(_manager.getHelixDataAccessor(), 
_instanceManager.getInstanceName()));
     Assert.assertEquals(_manager.getConfigAccessor()
-        .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
-        .getInstanceDisabledType(), 
InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+        .getInstanceConfig(CLUSTER_NAME, 
_instanceManager.getInstanceName()).getInstanceOperation()
+        .getSource(), InstanceConstants.InstanceOperationSource.AUTOMATION);
 
-    // Should not disable instance if it is already disabled due to other 
reasons
-    // And disabled type should remain unchanged
     _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), 
false);
     _impl.disableInstance(_instanceManager, null);
     Assert.assertFalse(InstanceValidationUtil
         .isEnabled(_manager.getHelixDataAccessor(), 
_instanceManager.getInstanceName()));
     Assert.assertEquals(_manager.getConfigAccessor()
             .getInstanceConfig(CLUSTER_NAME, 
_instanceManager.getInstanceName())
-            .getInstanceDisabledType(),
-        
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+        .getInstanceOperation().getSource(), 
InstanceConstants.InstanceOperationSource.USER);
 
+    _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), 
true);
     _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), 
false,
         InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 85600c01c..67b575f0c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -211,7 +211,7 @@ public class TestInstanceOperation extends ZkTestBase {
       InstanceConfig instanceConfig =
           
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
participantName);
       if (!_participants.get(i).isConnected() || 
!instanceConfig.getInstanceEnabled()
-          || instanceConfig.getInstanceOperation()
+          || instanceConfig.getInstanceOperation().getOperation()
           .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
         if (_participants.get(i).isConnected()) {
           _participants.get(i).syncStop();
@@ -338,7 +338,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // now remove operation tag
     String instanceToEvacuate = _participants.get(0).getInstanceName();
     _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.ENABLE);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
@@ -370,7 +370,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertEquals(
         _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instanceToSwapInName)
-            .getInstanceOperation(), 
InstanceConstants.InstanceOperation.UNKNOWN);
+            .getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
   }
 
   @Test(dependsOnMethods = "testNodeSwapNoTopologySetup")
@@ -397,7 +398,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertEquals(
         _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instanceToSwapInName)
-            .getInstanceOperation(), 
InstanceConstants.InstanceOperation.UNKNOWN);
+            .getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
   }
 
   @Test(dependsOnMethods = "testAddingNodeWithEnableInstanceOperation")
@@ -416,7 +418,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertEquals(
         _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instanceToSwapInName)
-            .getInstanceOperation(), 
InstanceConstants.InstanceOperation.UNKNOWN);
+            .getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
   }
 
   @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode")
@@ -440,7 +443,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertEquals(
         _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instanceToSwapInName)
-            .getInstanceOperation(), 
InstanceConstants.InstanceOperation.UNKNOWN);
+            .getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
 
     // Setting the InstanceOperation to SWAP_IN should work because there is a 
matching logicalId in
     // the cluster and the InstanceCapacityWeights and FaultZone match.
@@ -481,7 +485,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // Instance should be UNKNOWN since there was already a swapping pair.
     Assert.assertEquals(_gSetupTool.getClusterManagementTool()
-            .getInstanceConfig(CLUSTER_NAME, 
secondInstanceToSwapInName).getInstanceOperation(),
+            .getInstanceConfig(CLUSTER_NAME, 
secondInstanceToSwapInName).getInstanceOperation()
+            .getOperation(),
         InstanceConstants.InstanceOperation.UNKNOWN);
 
     // Try to set the InstanceOperation to SWAP_IN, it should throw an 
exception since there is already
@@ -576,7 +581,8 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
     Assert.assertEquals(_gSetupTool.getClusterManagementTool()
-            .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceOperation(),
+            .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceOperation()
+            .getOperation(),
         InstanceConstants.InstanceOperation.UNKNOWN);
 
     // Check to make sure the throttle was enabled again after the swap was 
completed.
@@ -681,7 +687,8 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
     Assert.assertEquals(_gSetupTool.getClusterManagementTool()
-            .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceOperation(),
+            .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceOperation()
+            .getOperation(),
         InstanceConstants.InstanceOperation.UNKNOWN);
 
     // Validate that the SWAP_IN instance has the same partitions the swap out 
instance had before
@@ -824,7 +831,7 @@ public class TestInstanceOperation extends ZkTestBase {
         Collections.emptySet(), Collections.emptySet());
 
     _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, 
InstanceConstants.InstanceOperation.ENABLE);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
@@ -1110,7 +1117,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // This should throw exception because we cannot ever have two instances 
with the same logicalId and both have InstanceOperation
     // unset.
     _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null);
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, 
InstanceConstants.InstanceOperation.ENABLE);
   }
 
   @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenSwapping")
@@ -1180,7 +1187,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // cancel the evacuation
     _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.ENABLE);
 
     assignment = getEVs();
     for (String resource : _allDBs) {
@@ -1222,7 +1229,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // cancel evacuation
     _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.ENABLE);
     // check every replica has >= 3 active replicas, even before cluster 
converge
     Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
@@ -1311,7 +1318,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // Remove EVACUATE instance's InstanceOperation
     _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, 
InstanceConstants.InstanceOperation.ENABLE);
   }
 
   @Test(dependsOnMethods = "testSwapEvacuateAddRemoveEvacuate")
@@ -1392,7 +1399,7 @@ public class TestInstanceOperation extends ZkTestBase {
     @Override
     public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
         NotificationContext context) {
-      if (instanceConfig.get(0).getInstanceOperation()
+      if (instanceConfig.get(0).getInstanceOperation().getOperation()
           .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
         throttlesEnabled = false;
       } else {
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 558110857..b54be8c04 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -182,8 +182,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     String disableReason = "Reason";
     tool.enableInstance(clusterName, instanceName, false,
         InstanceConstants.InstanceDisabledType.CLOUD_EVENT, disableReason);
-    Assert.assertTrue(tool.getInstanceConfig(clusterName, 
instanceName).getInstanceDisabledReason()
-        .equals(disableReason));
+    Assert.assertEquals(disableReason, tool.getInstanceConfig(clusterName, 
instanceName).getInstanceDisabledReason());
     tool.enableInstance(clusterName, instanceName, true,
         InstanceConstants.InstanceDisabledType.CLOUD_EVENT, disableReason);
     Assert.assertTrue(
@@ -348,6 +347,65 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     System.out.println("END testZkHelixAdmin at " + new 
Date(System.currentTimeMillis()));
   }
 
+  @Test
+  private void testSetInstanceOperation() {
+    System.out.println("START testSetInstanceOperation at " + new 
Date(System.currentTimeMillis()));
+
+    final String clusterName = getShortClassName();
+    String rootPath = "/" + clusterName;
+    if (_gZkClient.exists(rootPath)) {
+      _gZkClient.deleteRecursively(rootPath);
+    }
+
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    tool.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+    
Assert.assertTrue(_gZkClient.exists(PropertyPathBuilder.customizedStateConfig(clusterName)));
+
+    // Add instance to cluster
+    String hostname = "host1";
+    String port = "9999";
+    String instanceName = hostname + "_" + port;
+    InstanceConfig config =
+        new 
InstanceConfig.Builder().setHostName(hostname).setPort(port).build(instanceName);
+
+    tool.addInstance(clusterName, config);
+
+    // Set instance operation to DISABLE
+    tool.setInstanceOperation(clusterName, instanceName,
+        InstanceConstants.InstanceOperation.DISABLE, "disableReason");
+    Assert.assertEquals(tool.getInstanceConfig(clusterName, 
instanceName).getInstanceOperation()
+            .getOperation(),
+        InstanceConstants.InstanceOperation.DISABLE);
+    Assert.assertEquals(
+        tool.getInstanceConfig(clusterName, 
instanceName).getInstanceDisabledReason(),
+        "disableReason");
+
+    // Set instance operation to ENABLE
+    tool.setInstanceOperation(clusterName, instanceName, 
InstanceConstants.InstanceOperation.ENABLE,
+        "enableReason");
+    Assert.assertEquals(tool.getInstanceConfig(clusterName, 
instanceName).getInstanceOperation()
+            .getOperation(),
+        InstanceConstants.InstanceOperation.ENABLE);
+    // InstanceNonServingReason should be empty after setting operation to 
ENABLE
+    Assert.assertEquals(
+        tool.getInstanceConfig(clusterName, 
instanceName).getInstanceDisabledReason(), "");
+
+    // Set instance operation to UNKNOWN
+    tool.setInstanceOperation(clusterName, instanceName,
+        InstanceConstants.InstanceOperation.UNKNOWN, "unknownReason");
+    Assert.assertEquals(tool.getInstanceConfig(clusterName, 
instanceName).getInstanceOperation()
+            .getOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
+    Assert.assertEquals(
+        tool.getInstanceConfig(clusterName, 
instanceName).getInstanceOperation().getReason(),
+        "unknownReason");
+
+    deleteCluster(clusterName);
+
+    System.out.println("END testSetInstanceOperation at " + new 
Date(System.currentTimeMillis()));
+  }
+
   private HelixManager initializeHelixManager(String clusterName, String 
instanceName) {
     HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, 
instanceName,
         InstanceType.PARTICIPANT, org.apache.helix.common.ZkTestBase.ZK_ADDR);
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index d9bc5d7fe..c5c5626ff 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -285,16 +287,12 @@ public class MockHelixAdmin implements HelixAdmin {
 
     ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, 
null, 0);
     InstanceConfig instanceConfig = new InstanceConfig(record);
-    instanceConfig.setInstanceOperation(enabled ? 
InstanceConstants.InstanceOperation.ENABLE
-        : InstanceConstants.InstanceOperation.DISABLE);
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            enabled ? InstanceConstants.InstanceOperation.ENABLE
+                : 
InstanceConstants.InstanceOperation.DISABLE).setReason(reason).build());
     if (!enabled) {
+      // TODO: Replace this when the HELIX_ENABLED and HELIX_DISABLED fields 
are removed.
       instanceConfig.resetInstanceDisabledTypeAndReason();
-      if (reason != null) {
-        instanceConfig.setInstanceDisabledReason(reason);
-      }
-      if (disabledType != null) {
-        instanceConfig.setInstanceDisabledType(disabledType);
-      }
     }
     _baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0);
   }
@@ -307,7 +305,20 @@ public class MockHelixAdmin implements HelixAdmin {
 
   @Override
   public void setInstanceOperation(String clusterName, String instanceName,
-      InstanceConstants.InstanceOperation instanceOperation) {
+      @Nullable InstanceConstants.InstanceOperation instanceOperation) {
+    setInstanceOperation(clusterName, instanceName, instanceOperation, null, 
false);
+  }
+
+  @Override
+  public void setInstanceOperation(String clusterName, String instanceName,
+      @Nullable InstanceConstants.InstanceOperation instanceOperation, String 
reason) {
+    setInstanceOperation(clusterName, instanceName, instanceOperation, reason, 
false);
+  }
+
+  @Override
+  public void setInstanceOperation(String clusterName, String instanceName,
+      @Nullable InstanceConstants.InstanceOperation instanceOperation, String 
reason,
+      boolean overrideAll) {
   }
 
   @Override
diff --git 
a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java 
b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 7da983b8a..47ea88ac4 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -52,7 +52,6 @@ public class TestInstanceConfig {
   public void testSetInstanceEnableWithReason() {
     InstanceConfig instanceConfig = new InstanceConfig(new ZNRecord("id"));
     
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
-    instanceConfig.setInstanceDisabledReason("NoShowReason");
     
instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
 
     Assert.assertEquals(instanceConfig.getRecord().getSimpleFields()
@@ -62,10 +61,9 @@ public class TestInstanceConfig {
     Assert.assertEquals(instanceConfig.getRecord().getSimpleFields()
         
.get(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_TYPE.toString()), 
null);
 
-
-    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     String reasonCode = "ReasonCode";
-    instanceConfig.setInstanceDisabledReason(reasonCode);
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+        
InstanceConstants.InstanceOperation.DISABLE).setReason(reasonCode).build());
     
instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
     Assert.assertEquals(instanceConfig.getRecord().getSimpleFields()
         .get(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()), 
"false");
@@ -198,6 +196,30 @@ public class TestInstanceConfig {
     
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"), 
Integer.valueOf(1));
   }
 
+  @Test
+  public void testInstanceOperationReason() {
+    InstanceConfig instanceConfig = new InstanceConfig("instance1");
+    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceDisabledReason("disableReason");
+    Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), 
"disableReason");
+    Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), 
"disableReason");
+
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+        
InstanceConstants.InstanceOperation.UNKNOWN).setReason("unknownReason").build());
+    Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), 
"disableReason");
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), 
"unknownReason");
+
+    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+        
InstanceConstants.InstanceOperation.DISABLE).setReason("disableReason2").build());
+    Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), 
"disableReason2");
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), 
"disableReason2");
+
+    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+    Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), "");
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), "");
+  }
+
   @Test
   public void testOverwriteInstanceConfig() {
     InstanceConfig instanceConfig = new InstanceConfig("instance2");
@@ -233,9 +255,91 @@ public class TestInstanceConfig {
     Assert.assertTrue(instanceConfig.getTags().contains("tag4"));
     Assert.assertFalse(instanceConfig.getRecord().getSimpleFields()
         
.containsKey(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()));
-    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
         InstanceConstants.InstanceOperation.EVACUATE);
     
Assert.assertFalse(instanceConfig.getInstanceCapacityMap().containsKey("weight1"));
     
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight2"), 
Integer.valueOf(2));
   }
+
+  @Test
+  public void testInstanceOperationMultipleSources() throws 
InterruptedException {
+    InstanceConfig instanceConfig = new InstanceConfig("instance1");
+
+    // Check that the instance operation is ENABLE from the DEFAULT source
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.ENABLE);
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+        InstanceConstants.InstanceOperationSource.DEFAULT);
+
+    // Set instance operation from user source
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            
InstanceConstants.InstanceOperation.DISABLE).setReason("userReason")
+        .setSource(InstanceConstants.InstanceOperationSource.USER).build());
+    // Get enabled time
+    long op1EnabledTime = instanceConfig.getInstanceEnabledTime();
+
+    Thread.sleep(1000);
+    // Set instance operation from automation source
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            
InstanceConstants.InstanceOperation.DISABLE).setReason("automationReason")
+        
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build());
+
+    // Check that the enabled time is the same as op1 but the source and 
reason is changed to automation
+    Assert.assertEquals(instanceConfig.getInstanceEnabledTime(), 
op1EnabledTime);
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+        InstanceConstants.InstanceOperationSource.AUTOMATION);
+
+    Thread.sleep(1000);
+    // Set instance operation from user source to be ENABLE
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            InstanceConstants.InstanceOperation.ENABLE)
+        .setSource(InstanceConstants.InstanceOperationSource.USER).build());
+
+    // Check that the operation is DISABLE, the enabled time is the same as 
op1, and the source is still automation
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.DISABLE);
+    Assert.assertEquals(instanceConfig.getInstanceEnabledTime(), 
op1EnabledTime);
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+        InstanceConstants.InstanceOperationSource.AUTOMATION);
+
+    Thread.sleep(1000);
+    // Set the instance operation from the automation source to be ENABLE
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            InstanceConstants.InstanceOperation.ENABLE)
+        
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build());
+
+    // Check that the operation is ENABLE, the enabled time is the different 
from op1, and the source is still automation
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.ENABLE);
+    Assert.assertFalse(instanceConfig.getInstanceEnabledTime() == 
op1EnabledTime);
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+        InstanceConstants.InstanceOperationSource.AUTOMATION);
+
+    // Set the instance operation from the automation source to be EVACUATE
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            InstanceConstants.InstanceOperation.EVACUATE)
+        
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build());
+
+    // Set the instance operation from the user source to be DISABLE
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            InstanceConstants.InstanceOperation.DISABLE)
+        .setSource(InstanceConstants.InstanceOperationSource.USER).build());
+
+    // Check that the instance operation is DISABLE and the source is user
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.DISABLE);
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+        InstanceConstants.InstanceOperationSource.USER);
+
+    // Set the instance operation from the admin source to be ENABLE
+    instanceConfig.setInstanceOperation(new 
InstanceConfig.InstanceOperation.Builder().setOperation(
+            InstanceConstants.InstanceOperation.ENABLE)
+        .setSource(InstanceConstants.InstanceOperationSource.ADMIN).build());
+
+    // Check that the instance operation is ENABLE and the source is admin
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
+        InstanceConstants.InstanceOperation.ENABLE);
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(),
+        InstanceConstants.InstanceOperationSource.ADMIN);
+  }
 }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index 8a4bbf07b..bb5a2bc5c 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -268,8 +268,8 @@ public class StoppableInstancesSelector {
       PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
       InstanceConfig instanceConfig =
           
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
-      if (InstanceConstants.InstanceOperation.EVACUATE
-          .equals(instanceConfig.getInstanceOperation())) {
+      if (InstanceConstants.InstanceOperation.EVACUATE.equals(
+          instanceConfig.getInstanceOperation().getOperation())) {
         toBeStoppedInstances.add(instance);
       }
     }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index ea98f6637..55fc4de36 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -45,12 +45,14 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.HealthStat;
@@ -66,6 +68,7 @@ import org.apache.helix.rest.common.HttpConstants;
 import org.apache.helix.rest.server.filters.ClusterAuth;
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.util.InstanceUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
@@ -388,9 +391,11 @@ public class PerInstanceAccessor extends 
AbstractHelixResource {
   @POST
   public Response updateInstance(@PathParam("clusterId") String clusterId,
       @PathParam("instanceName") String instanceName, @QueryParam("command") 
String command,
-      @QueryParam("instanceOperation") InstanceConstants.InstanceOperation 
state,
-      @QueryParam("instanceDisabledType") String disabledType,
-      @QueryParam("instanceDisabledReason") String disabledReason,
+      @QueryParam("instanceOperation") InstanceConstants.InstanceOperation 
instanceOperation,
+      @QueryParam("instanceOperationSource") 
InstanceConstants.InstanceOperationSource instanceOperationSource,
+      @QueryParam("reason") String reason,
+      @Deprecated @QueryParam("instanceDisabledType") String disabledType,
+      @Deprecated @QueryParam("instanceDisabledReason") String disabledReason,
       @QueryParam("force") boolean force, String content) {
     Command cmd;
     try {
@@ -445,7 +450,12 @@ public class PerInstanceAccessor extends 
AbstractHelixResource {
                       .getTypeFactory().constructCollectionType(List.class, 
String.class)));
           break;
         case setInstanceOperation:
-          admin.setInstanceOperation(clusterId, instanceName, state);
+          InstanceUtil.setInstanceOperation(new 
ConfigAccessor(getRealmAwareZkClient()),
+              new ZkBaseDataAccessor<>(getRealmAwareZkClient()), clusterId, 
instanceName,
+              new 
InstanceConfig.InstanceOperation.Builder().setOperation(instanceOperation)
+                  .setReason(reason).setSource(
+                      force ? InstanceConstants.InstanceOperationSource.ADMIN 
: instanceOperationSource)
+                  .build());
           break;
         case canCompleteSwap:
           return OK(OBJECT_MAPPER.writeValueAsString(
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 395f9bf85..6ab727e85 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -495,14 +495,14 @@ public class TestPerInstanceAccessor extends 
AbstractTestClass {
     new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, 
INSTANCE_NAME);
-    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
         InstanceConstants.InstanceOperation.EVACUATE);
     new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP")
         
.expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME,
 INSTANCE_NAME).post(this, entity);
     new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, 
INSTANCE_NAME);
-    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
         InstanceConstants.InstanceOperation.ENABLE);
 
     // test canCompleteSwap
@@ -543,7 +543,7 @@ public class TestPerInstanceAccessor extends 
AbstractTestClass {
     new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, 
INSTANCE_NAME);
-    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
         InstanceConstants.InstanceOperation.EVACUATE);
 
     Response response = new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")
@@ -586,7 +586,7 @@ public class TestPerInstanceAccessor extends 
AbstractTestClass {
     new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
         .format(CLUSTER_NAME, test_instance_name).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, 
test_instance_name);
-    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+    Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(),
         InstanceConstants.InstanceOperation.EVACUATE);
 
     response = new 
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")

Reply via email to