This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 29740864838b7325bc9198861c79b4349207e20e
Author: Jiajun Wang <[email protected]>
AuthorDate: Thu May 28 15:55:33 2020 -0700

    Add Abnormal States Resolver interface and configuration item. (#1028)
    
    The Abnormal States Resolver defines a generic interface to find and 
recover if the partition has any abnormal current states. For example,
    - double masters
    - application data out of sync
    The interface shall be implemented according to the requirement.
    
    The resolver is applied in the rebalance process according to the 
corresponding cluster config item. For example,
    "ABNORMAL_STATES_RESOLVER_MAP" : {
     "MASTERSLAVE" : 
"org.apache.helix.api.rebalancer.constraint.MasterSlaveAbnormalStateReslovler"
    }
    The default behavior without any configuration is not doing any recovery 
work.
---
 .../constraint/AbnormalStateResolver.java          | 75 ++++++++++++++++++
 .../dataproviders/BaseControllerDataProvider.java  | 45 ++++++++++-
 .../controller/rebalancer/AbstractRebalancer.java  | 89 +++++++++++++++++-----
 .../rebalancer/DelayedAutoRebalancer.java          | 40 +++-------
 .../java/org/apache/helix/model/ClusterConfig.java | 28 ++++++-
 .../rebalancer/TestAbstractRebalancer.java         |  4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |  6 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       | 10 ++-
 .../constraint/MockAbnormalStateResolver.java      | 48 ++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  4 +
 .../rebalancer/TestAbnormalStatesResolver.java     | 67 ++++++++++++++++
 .../org/apache/helix/model/TestClusterConfig.java  | 16 ++++
 12 files changed, 376 insertions(+), 56 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
 
b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
new file mode 100644
index 0000000..7e9946c
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -0,0 +1,75 @@
+package org.apache.helix.api.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A generic interface to find and recover if the partition has abnormal 
current states.
+ */
+public interface AbnormalStateResolver {
+  /**
+   * A placeholder which will be used when the resolver is not specified.
+   * This is a dummy class that does not really functional.
+   */
+  AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
+    public boolean isCurrentStatesValid(final CurrentStateOutput 
currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef) {
+      // By default, all current states are valid.
+      return true;
+    }
+    public Map<String, String> computeRecoveryAssignment(final 
CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef, final List<String> 
preferenceList) {
+      throw new UnsupportedOperationException("This resolver won't recover 
abnormal states.");
+    }
+  };
+
+  /**
+   * Check if the current states of the specified partition is valid.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @return true if the current states of the specified partition is valid.
+   */
+  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef);
+
+  /**
+   * Compute a transient partition state assignment to fix the abnormal.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @param preferenceList
+   * @return the transient partition state assignment which remove the 
abnormal states.
+   */
+  Map<String, String> computeRecoveryAssignment(final CurrentStateOutput 
currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> 
preferenceList);
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a24ea46..59058ab 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -34,8 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
@@ -53,6 +55,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +106,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Set<String>>> 
_disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = 
new HashMap<>();
 
   public BaseControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER, 
AbstractDataCache.UNKNOWN_PIPELINE);
@@ -225,6 +229,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false))
 {
       _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
       refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
+      refreshAbnormalStateResolverMap(_clusterConfig);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String.format(
           "No ClusterConfig change for cluster %s, pipeline %s", _clusterName, 
getPipelineName()));
@@ -372,6 +377,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
 
   public void setClusterConfig(ClusterConfig clusterConfig) {
     _clusterConfig = clusterConfig;
+    refreshAbnormalStateResolverMap(_clusterConfig);
   }
 
   @Override
@@ -731,6 +737,43 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     _asyncTasksThreadPool = asyncTasksThreadPool;
   }
 
+
+  public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
+    return _abnormalStateResolverMap
+        .getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+  }
+
+  private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
+    if (clusterConfig == null) {
+      logger.debug("Skip refreshing abnormal state resolvers because the 
ClusterConfig is missing");
+      return;
+    }
+    Map<String, String> resolverMap = 
clusterConfig.getAbnormalStateResolverMap();
+    logger.info("Start loading the abnormal state resolvers with configuration 
{}", resolverMap);
+    // Remove any resolver configuration that does not exist anymore.
+    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
+    // Reload the resolver classes into cache based on the configuration.
+    for (String stateModel : resolverMap.keySet()) {
+      String resolverClassName = resolverMap.get(stateModel);
+      if (resolverClassName == null || resolverClassName.isEmpty()) {
+        // skip the empty definition.
+        continue;
+      }
+      if 
(!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName()))
 {
+        try {
+          AbnormalStateResolver resolver = AbnormalStateResolver.class
+              .cast(HelixUtil.loadClass(getClass(), 
resolverClassName).newInstance());
+          _abnormalStateResolverMap.put(stateModel, resolver);
+        } catch (Exception e) {
+          throw new HelixException(String
+              .format("Failed to instantiate the abnormal state resolver %s 
for state model %s",
+                  resolverClassName, stateModel));
+        }
+      } // else, nothing to update since the same resolver class has been 
loaded.
+    }
+    logger.info("Finish loading the abnormal state resolvers {}", 
_abnormalStateResolverMap);
+  }
+
   public boolean isMaintenanceModeEnabled() {
     return _isMaintenanceModeEnabled;
   }
@@ -768,4 +811,4 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   public String toString() {
     return genCacheContentStringBuilder().toString();
   }
-}
\ No newline at end of file
+}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index e85a2df..6fdd0b6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -27,11 +27,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -102,7 +104,8 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
       Map<String, String> bestStateForPartition =
           
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), 
stateModelDef,
               preferenceList, currentStateOutput, 
disabledInstancesForPartition, idealState,
-              cache.getClusterConfig(), partition);
+              cache.getClusterConfig(), partition,
+              cache.getAbnormalStateResolver(stateModelDefName));
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -179,44 +182,97 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
     return rebalanceStrategy;
   }
 
+  /**
+   * Compute best state for partition in AUTO ideal state mode.
+   * @param liveInstances
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @param idealState
+   * @param clusterConfig
+   * @param partition
+   * @param resolver
+   * @return
+   */
   protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> 
disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) 
{
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, 
currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
 
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), 
partition);
+    Map<String, String> currentStateMap = new HashMap<>(
+        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), 
partition));
+    return computeBestPossibleMap(preferenceList, stateModelDef, 
currentStateMap, liveInstances,
+        disabledInstancesForPartition);
+  }
 
-    if (currentStateMap == null) {
-      currentStateMap = Collections.emptyMap();
-    }
+  /**
+   * Compute if an overwritten is necessary for the partition assignment in 
case that the proposed
+   * assignment is not valid or empty.
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput
+   * @param idealState
+   * @param partition
+   * @param resolver
+   * @return An optional object which contains the assignment map if 
overwritten is necessary.
+   * Otherwise return Optional.empty().
+   */
+  protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
+      final StateModelDefinition stateModelDef, final List<String> 
preferenceList,
+      final CurrentStateOutput currentStateOutput, IdealState idealState, 
final Partition partition,
+      final AbnormalStateResolver resolver) {
+    String resourceName = idealState.getResourceName();
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
 
     // (1) If the partition is removed from IS or the IS is deleted.
     // Transit to DROPPED no matter the instance is disabled or not.
     if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
+      return 
Optional.of(computeBestPossibleMapForDroppedResource(currentStateMap));
     }
 
     // (2) If resource disabled altogether, transit to initial-state (e.g. 
OFFLINE) if it's not in ERROR.
     if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, 
stateModelDef);
+      return 
Optional.of(computeBestPossibleMapForDisabledResource(currentStateMap, 
stateModelDef));
     }
 
-    return computeBestPossibleMap(preferenceList, stateModelDef, 
currentStateMap, liveInstances,
-        disabledInstancesForPartition);
+    // (3) If the current states are not valid, fix the invalid part first.
+    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, 
partition, stateModelDef)) {
+      Map<String, String> recoveryAssignment = resolver
+          .computeRecoveryAssignment(currentStateOutput, resourceName, 
partition, stateModelDef,
+              preferenceList);
+      if (recoveryAssignment == null || !recoveryAssignment.keySet()
+          .equals(currentStateMap.keySet())) {
+        throw new HelixException(String.format(
+            "Invalid recovery assignment %s since it changed the current 
partition placement %s",
+            recoveryAssignment, currentStateMap));
+      }
+      return Optional.of(recoveryAssignment);
+    }
+
+    return Optional.empty();
   }
 
-  protected Map<String, String> 
computeBestPossibleMapForDroppedResource(Map<String, String> currentStateMap) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDroppedResource(
+      final Map<String, String> currentStateMap) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.toString());
     }
     return bestPossibleStateMap;
   }
 
-  protected Map<String, String> 
computeBestPossibleMapForDisabledResource(Map<String, String> currentStateMap
-      , StateModelDefinition stateModelDef) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDisabledResource(
+      final Map<String, String> currentStateMap, StateModelDefinition 
stateModelDef) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       if 
(!HelixDefinedState.ERROR.name().equals(currentStateMap.get(instance))) {
         bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
@@ -267,7 +323,6 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
    */
   protected Map<String, String> computeBestPossibleMap(List<String> 
preferenceList, StateModelDefinition stateModelDef,
       Map<String, String> currentStateMap, Set<String> liveInstances, 
Set<String> disabledInstancesForPartition) {
-
     Map<String, String> bestPossibleStateMap = new HashMap<>();
 
     // (1) Instances that have current state but not in preference list, drop, 
no matter it's disabled or not.
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index f4c95a6..f169e07 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -27,10 +27,12 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -263,7 +265,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(liveNodes, stateModelDef, 
preferenceList,
               currentStateOutput, disabledInstancesForPartition, idealState, 
clusterConfig,
-              partition);
+              partition, cache.getAbnormalStateResolver(stateModelDefName));
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -276,39 +278,20 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     return partitionMapping;
   }
 
-  /**
-   * compute best state for resource in AUTO ideal state mode
-   * @param liveInstances
-   * @param stateModelDef
-   * @param preferenceList
-   * @param currentStateOutput
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
-   * @param idealState
-   * @param  clusterConfig
-   * @param  partition
-   * @return
-   */
   @Override
   protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> 
disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) 
{
-
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, 
currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
     Map<String, String> currentStateMap = new HashMap<>(
         currentStateOutput.getCurrentStateMap(idealState.getResourceName(), 
partition));
-
-    // (1) If the partition is removed from IS or the IS is deleted.
-    // Transit to DROPPED no matter the instance is disabled or not.
-    if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
-    }
-
-    // (2) If resource disabled altogether, transit to initial-state (e.g. 
OFFLINE) if it's not in ERROR.
-    if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, 
stateModelDef);
-    }
-
     // Instances not in preference list but still have active replica, retain 
to avoid zero replica during movement
     List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
     Collections.sort(currentInstances);
@@ -332,7 +315,6 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
       }
     }
 
-
     // Sort the instancesToMove by their current partition state.
     // Reason: because the states are assigned to instances in the order 
appeared in preferenceList, if we have
     // [node1:Slave, node2:Master], we want to keep it that way, instead of 
assigning Master to node1.
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..bb0f728 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -30,10 +30,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Cluster configurations
@@ -109,7 +109,13 @@ public class ClusterConfig extends HelixProperty {
     // 
https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    /**
+     * Configure the abnormal partition states resolver classes for the 
corresponding state model.
+     * <State Model Def Name, Full Path of the Resolver Class Name>
+     */
+    ABNORMAL_STATES_RESOLVER_MAP
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -852,6 +858,24 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Set the abnormal state resolver class map.
+   */
+  public void setAbnormalStateResolverMap(Map<String, String> resolverMap) {
+    if (resolverMap.values().stream()
+        .anyMatch(className -> className == null || className.isEmpty())) {
+      throw new IllegalArgumentException(
+          "Invalid Abnormal State Resolver Map definition. Class name cannot 
be empty.");
+    }
+    
_record.setMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name(), 
resolverMap);
+  }
+
+  public Map<String, String> getAbnormalStateResolverMap() {
+    Map<String, String> resolverMap =
+        
_record.getMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name());
+    return resolverMap == null ? Collections.EMPTY_MAP : resolverMap;
+  }
+
+  /**
    * Get IdealState rules defined in the cluster config.
    * @return
    */
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 0886768..72bb726 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -52,7 +53,8 @@ public class TestAbstractRebalancer {
         .computeBestPossibleStateForPartition(new HashSet<>(liveInstances),
             
BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
             preferenceList, currentStateOutput, new 
HashSet<>(disabledInstancesForPartition),
-            new IdealState("test"), new ClusterConfig("TestCluster"), 
partition);
+            new IdealState("test"), new ClusterConfig("TestCluster"), 
partition,
+            AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 0b1370e..0d09079 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Sets;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -52,6 +52,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -243,7 +244,8 @@ public class TestAutoRebalanceStrategy {
         }
         Map<String, String> assignment = new AutoRebalancer()
             
.computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), 
_stateModelDef,
-                preferenceList, currentStateOutput, disabled, is, 
clusterConfig, p);
+                preferenceList, currentStateOutput, disabled, is, 
clusterConfig, p,
+                AbnormalStateResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
       }
       return mapResult;
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 9a5e085..33885ca 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -41,6 +41,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.testng.Assert;
@@ -85,9 +86,10 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
         }
       }
     }
-    Map<String, String> bestPossibleMap = 
rebalancer.computeBestPossibleStateForPartition(
-        liveInstances, stateModelDef, instancePreferenceList, 
currentStateOutput,
-        Collections.emptySet(), is, new ClusterConfig("TestCluster"), 
partition);
+    Map<String, String> bestPossibleMap = rebalancer
+        .computeBestPossibleStateForPartition(liveInstances, stateModelDef, 
instancePreferenceList,
+            currentStateOutput, Collections.emptySet(), is, new 
ClusterConfig("TestCluster"),
+            partition, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + 
expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + 
instancePreferenceList);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
new file mode 100644
index 0000000..4718921
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A mock abnormal state resolver for supporting tests.
+ * It always return dummy result.
+ */
+public class MockAbnormalStateResolver implements AbnormalStateResolver {
+  @Override
+  public boolean isCurrentStatesValid(final CurrentStateOutput 
currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef) {
+    // By default, all current states are valid.
+    return true;
+  }
+
+  public Map<String, String> computeRecoveryAssignment(final 
CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> 
preferenceList) {
+    throw new UnsupportedOperationException("The mock resolver won't recover 
abnormal states.");
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 7f8281b..ca8fd53 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -39,6 +40,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.BeforeClass;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
@@ -109,6 +111,8 @@ public abstract class AbstractTestClusterModel {
         _capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> 
key, key -> 0)));
     testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+    when(testCache.getAbnormalStateResolver(any()))
+        .thenReturn(AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     // 3. Mock the live instance node for the default instance.
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
new file mode 100644
index 0000000..dde2644
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -0,0 +1,67 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import 
org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
+  @Test
+  public void testConfigureResolver() {
+    ResourceControllerDataProvider cache = new 
ResourceControllerDataProvider(CLUSTER_NAME);
+    // Verify the initial setup.
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      
Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Update the resolver configuration for MasterSlave state model.
+    ConfigAccessor configAccessor = new 
ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, 
MockAbnormalStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    cache.requireFullRefresh();
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      
Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          stateModelDefName.equals(MasterSlaveSMD.name) ?
+              MockAbnormalStateResolver.class :
+              AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java 
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..f353293 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import 
org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -256,4 +257,19 @@ public class TestClusterConfig {
         
.getBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
             false), true);
   }
+
+  @Test
+  public void testAbnormalStatesResolverConfig() {
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    // Default value is empty
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), 
Collections.EMPTY_MAP);
+    // Test set
+    Map<String, String> resolverMap = ImmutableMap.of(MasterSlaveSMD.name,
+        MockAbnormalStateResolver.class.getName());
+    testConfig.setAbnormalStateResolverMap(resolverMap);
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), resolverMap);
+    // Test empty the map
+    testConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), 
Collections.EMPTY_MAP);
+  }
 }

Reply via email to