Repository: helix
Updated Branches:
  refs/heads/master 7e49f995e -> 0beeb8fa2


Check in the intermediate state calculate stage for best possible state.

Resource rebalance pipeline should continuously processing resource even some 
resources cannot be calculated.
This is for preventing controller management being stopped by some problematic 
resources.

Also add several exception handling for resource loops in different stage. The 
idea is the detail calculation may throw HelixException, but at the top stage 
layer, these exception should not prevent the whole pipeline to be finished.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0beeb8fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0beeb8fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0beeb8fa

Branch: refs/heads/master
Commit: 0beeb8fa2babe2d8c7bc50a0a454f752b7c96295
Parents: 7e49f99
Author: Jiajun Wang <jjw...@linkedin.com>
Authored: Thu Sep 27 16:18:59 2018 -0700
Committer: jiajunwang <ericwang1...@gmail.com>
Committed: Tue Oct 30 16:29:04 2018 -0700

----------------------------------------------------------------------
 .../controller/common/ResourcesStateMap.java    |   2 +-
 .../stages/BestPossibleStateCalcStage.java      |  35 ++-
 .../stages/BestPossibleStateOutput.java         |   4 +
 .../stages/ExternalViewComputeStage.java        | 172 +++++------
 .../stages/IntermediateStateCalcStage.java      |  44 ++-
 .../stages/MessageGenerationPhase.java          | 285 ++++++++++---------
 .../stages/MessageSelectionStage.java           |  35 ++-
 .../controller/stages/MessageThrottleStage.java |   4 +-
 .../stages/PersistAssignmentStage.java          | 122 ++++----
 9 files changed, 393 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
 
b/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
index 3559d7b..ccc31d1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/common/ResourcesStateMap.java
@@ -35,7 +35,7 @@ public class ResourcesStateMap {
   protected Map<String, PartitionStateMap> _resourceStateMap;
 
   public ResourcesStateMap() {
-    _resourceStateMap = new HashMap<String, PartitionStateMap>();
+    _resourceStateMap = new HashMap<>();
   }
 
   public Set<String> resourceSet() {

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 636c6e7..1bbd6a0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -19,6 +19,13 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -29,24 +36,25 @@ import 
org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.model.*;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.task.*;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.util.HelixUtil;
-import org.apache.helix.util.StatusUpdateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.Callable;
-
 /**
  * For partition compute best possible (instance,state) pair based on
  * IdealState,StateModel,LiveInstance
  */
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger logger = 
LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
-  private final StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -109,7 +117,16 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     Iterator<Resource> itr = resourceMap.values().iterator();
     while (itr.hasNext()) {
       Resource resource = itr.next();
-      if (!computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output)) {
+      boolean result = false;
+      try {
+        result =
+            computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output);
+      } catch (HelixException ex) {
+        LogUtil.logError(logger, _eventId,
+            "Exception when calculating best possible states for " + 
resource.getResourceName(),
+            ex);
+      }
+      if (!result) {
         failureResources.add(resource.getResourceName());
         LogUtil.logWarn(logger, _eventId,
             "Failed to calculate best possible states for " + 
resource.getResourceName());
@@ -215,7 +232,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     if (rebalancer == null || mappingCalculator == null) {
       LogUtil.logError(logger, _eventId,
           "Error computing assignment for resource " + resourceName + ". no 
rebalancer found. rebalancer: " + rebalancer
-              + " mappingCaculator: " + mappingCalculator);
+              + " mappingCalculator: " + mappingCalculator);
     }
 
     if (rebalancer != null && mappingCalculator != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index a5fac68..b5be5fe 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -105,4 +105,8 @@ public class BestPossibleStateOutput extends 
ResourcesStateMap {
     }
     _preferenceLists.put(resource, resourcePreferenceLists);
   }
+
+  protected boolean containsResource(String resource) {
+    return _preferenceLists != null && _preferenceLists.containsKey(resource);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index d901327..e3a504b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -19,8 +19,19 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
@@ -45,16 +56,6 @@ import 
org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
 public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
   private static Logger LOG = 
LoggerFactory.getLogger(ExternalViewComputeStage.class);
 
@@ -88,78 +89,13 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
 
     Map<String, ExternalView> curExtViews = cache.getExternalViews();
 
-    for (String resourceName : resourceMap.keySet()) {
-      ExternalView view = new ExternalView(resourceName);
-      // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
-      // if resource ideal state has bucket size, set it
-      // otherwise resource has been dropped, use bucket size from current 
state instead
-      Resource resource = resourceMap.get(resourceName);
-      if (resource.getBucketSize() > 0) {
-        view.setBucketSize(resource.getBucketSize());
-      } else {
-        view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
-      }
-
-      int totalPendingMessageCount = 0;
-
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        if (currentStateMap != null && currentStateMap.size() > 0) {
-          // Set<String> disabledInstances
-          // = cache.getDisabledInstancesForResource(resource.toString());
-          for (String instance : currentStateMap.keySet()) {
-            // if (!disabledInstances.contains(instance))
-            // {
-            view.setState(partition.getPartitionName(), instance, 
currentStateMap.get(instance));
-            // }
-          }
-        }
-        totalPendingMessageCount +=
-            
currentStateOutput.getPendingMessageMap(resource.getResourceName(), 
partition).size();
-      }
-
-      // Update cluster status monitor mbean
-      IdealState idealState = cache.getIdealState(resourceName);
-      if (!cache.isTaskCache()) {
-        ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
-        if (clusterStatusMonitor != null) {
-          if (idealState != null // has ideal state
-              && (resourceConfig == null || 
!resourceConfig.isMonitoringDisabled()) // monitoring not disabled
-              && !idealState.getStateModelDefRef() // and not a job resource
-              
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-            StateModelDefinition stateModelDef =
-                cache.getStateModelDef(idealState.getStateModelDefRef());
-            clusterStatusMonitor
-                .setResourceStatus(view, 
cache.getIdealState(view.getResourceName()),
-                    stateModelDef);
-            clusterStatusMonitor
-                .updatePendingMessages(resource.getResourceName(), 
totalPendingMessageCount);
-            monitoringResources.add(resourceName);
-          }
-        }
-      }
-      ExternalView curExtView = curExtViews.get(resourceName);
-      // copy simplefields from IS, in cases where IS is deleted copy it from 
existing ExternalView
-      if (idealState != null) {
-        
view.getRecord().getSimpleFields().putAll(idealState.getRecord().getSimpleFields());
-      } else if (curExtView != null) {
-        
view.getRecord().getSimpleFields().putAll(curExtView.getRecord().getSimpleFields());
-      }
-
-      // compare the new external view with current one, set only on different
-      if (curExtView == null || 
!curExtView.getRecord().equals(view.getRecord())) {
-        // Add external view to the list which will be written to ZK later.
-        newExtViews.add(view);
-
-        // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we 
need to find out which
-        // task partitions are finished (COMPLETED or ERROR), update the 
status update of the original
-        // scheduler message, and then remove the partitions from the ideal 
state
-        if (idealState != null
-            && idealState.getStateModelDefRef().equalsIgnoreCase(
-            DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          updateScheduledTaskStatus(view, manager, idealState);
-        }
+    for (Resource resource : resourceMap.values()) {
+      try {
+        computeExternalView(resource, currentStateOutput, cache, 
clusterStatusMonitor, curExtViews,
+            manager, monitoringResources, newExtViews);
+      } catch (HelixException ex) {
+        LogUtil.logError(LOG, _eventId,
+            "Failed to calculate external view for resource " + 
resource.getResourceName(), ex);
       }
     }
 
@@ -210,6 +146,78 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
     cache.removeExternalViews(externalViewsToRemove);
   }
 
+  private void computeExternalView(final Resource resource,
+      final CurrentStateOutput currentStateOutput, final ClusterDataCache 
cache,
+      final ClusterStatusMonitor clusterStatusMonitor, final Map<String, 
ExternalView> curExtViews,
+      final HelixManager manager, Set<String> monitoringResources, 
List<ExternalView> newExtViews) {
+    String resourceName = resource.getResourceName();
+    ExternalView view = new ExternalView(resource.getResourceName());
+    // if resource ideal state has bucket size, set it
+    // otherwise resource has been dropped, use bucket size from current state 
instead
+    if (resource.getBucketSize() > 0) {
+      view.setBucketSize(resource.getBucketSize());
+    } else {
+      view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
+    }
+
+    int totalPendingMessageCount = 0;
+
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      if (currentStateMap != null && currentStateMap.size() > 0) {
+        for (String instance : currentStateMap.keySet()) {
+          view.setState(partition.getPartitionName(), instance, 
currentStateMap.get(instance));
+        }
+      }
+      totalPendingMessageCount +=
+          currentStateOutput.getPendingMessageMap(resource.getResourceName(), 
partition).size();
+    }
+
+    // Update cluster status monitor mbean
+    IdealState idealState = cache.getIdealState(resourceName);
+    if (!cache.isTaskCache()) {
+      ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
+      if (clusterStatusMonitor != null) {
+        if (idealState != null // has ideal state
+            && (resourceConfig == null || 
!resourceConfig.isMonitoringDisabled()) // monitoring not disabled
+            && !idealState.getStateModelDefRef() // and not a job resource
+            
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+          StateModelDefinition stateModelDef =
+              cache.getStateModelDef(idealState.getStateModelDefRef());
+          clusterStatusMonitor
+              .setResourceStatus(view, 
cache.getIdealState(view.getResourceName()),
+                  stateModelDef);
+          clusterStatusMonitor
+              .updatePendingMessages(resource.getResourceName(), 
totalPendingMessageCount);
+          monitoringResources.add(resourceName);
+        }
+      }
+    }
+    ExternalView curExtView = curExtViews.get(resourceName);
+    // copy simplefields from IS, in cases where IS is deleted copy it from 
existing ExternalView
+    if (idealState != null) {
+      
view.getRecord().getSimpleFields().putAll(idealState.getRecord().getSimpleFields());
+    } else if (curExtView != null) {
+      
view.getRecord().getSimpleFields().putAll(curExtView.getRecord().getSimpleFields());
+    }
+
+    // compare the new external view with current one, set only on different
+    if (curExtView == null || 
!curExtView.getRecord().equals(view.getRecord())) {
+      // Add external view to the list which will be written to ZK later.
+      newExtViews.add(view);
+
+      // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we 
need to find out which
+      // task partitions are finished (COMPLETED or ERROR), update the status 
update of the original
+      // scheduler message, and then remove the partitions from the ideal state
+      if (idealState != null
+          && idealState.getStateModelDefRef().equalsIgnoreCase(
+          DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+        updateScheduledTaskStatus(view, manager, idealState);
+      }
+    }
+  }
+
   private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
       IdealState taskQueueIdealState) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c156c06..c4d11d6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -19,6 +19,16 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -28,13 +38,16 @@ import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.*;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
 /**
  * For partition compute the Intermediate State (instance,state) pair based on 
the BestPossibleState
  * and CurrentState, with all constraints applied (such as state transition 
throttling).
@@ -122,7 +135,6 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       Collections.sort(prioritizedResourceList, new 
ResourcePriorityComparator());
     }
 
-    // Re-load ClusterStatusMonitor MBean
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
 
@@ -130,6 +142,14 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     // decreasing priority
     for (ResourcePriority resourcePriority : prioritizedResourceList) {
       String resourceName = resourcePriority.getResourceName();
+
+      if (!bestPossibleStateOutput.containsResource(resourceName)) {
+        logger.warn(
+            "Skip calculating intermediate state for resource {} because the 
best possible state is not available.",
+            resourceName);
+        continue;
+      }
+
       Resource resource = resourceMap.get(resourceName);
       IdealState idealState = dataCache.getIdealState(resourceName);
       if (idealState == null) {
@@ -140,11 +160,17 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
         idealState = new IdealState(resourceName);
         idealState.setStateModelDefRef(resource.getStateModelDefRef());
       }
-      PartitionStateMap intermediatePartitionStateMap = 
computeIntermediatePartitionState(dataCache,
-          clusterStatusMonitor, idealState, resourceMap.get(resourceName), 
currentStateOutput,
-          bestPossibleStateOutput.getPartitionStateMap(resourceName),
-          bestPossibleStateOutput.getPreferenceLists(resourceName), 
throttleController);
-      output.setState(resourceName, intermediatePartitionStateMap);
+
+      try {
+        output.setState(resourceName,
+            computeIntermediatePartitionState(dataCache, clusterStatusMonitor, 
idealState,
+                resourceMap.get(resourceName), currentStateOutput,
+                bestPossibleStateOutput.getPartitionStateMap(resourceName),
+                bestPossibleStateOutput.getPreferenceLists(resourceName), 
throttleController));
+      } catch (HelixException ex) {
+        LogUtil.logInfo(logger, _eventId,
+            "Failed to calculate intermediate partition states for resource " 
+ resourceName, ex);
+      }
     }
     return output;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index ae0c93f..b1013d1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,7 +26,9 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
@@ -88,171 +90,180 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
     }
     MessageOutput output = new MessageOutput();
 
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-
-      StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
-      if (stateModelDef == null) {
+    for (Resource resource : resourceMap.values()) {
+      try {
+        generateMessage(resource, cache, resourcesStateMap, 
currentStateOutput, manager,
+            sessionIdMap, event.getEventType(), output, 
pendingMessagesToCleanUp);
+      } catch (HelixException ex) {
         LogUtil.logError(logger, _eventId,
-            "State Model Definition null, skip generating messages for 
resource: " + resourceName);
-        continue;
+            "Failed to generate message for resource " + 
resource.getResourceName(), ex);
       }
+    }
 
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> instanceStateMap =
-            new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, 
partition));
-        Map<String, String> pendingStateMap =
-            currentStateOutput.getPendingStateMap(resourceName, partition);
+    // Asynchronously GC pending messages if necessary
+    if (!pendingMessagesToCleanUp.isEmpty()) {
+      schedulePendingMessageCleanUp(pendingMessagesToCleanUp, 
cache.getAsyncTasksThreadPool(),
+          manager.getHelixDataAccessor());
+    }
+    event.addAttribute(AttributeName.MESSAGES_ALL.name(), output);
+  }
 
-        // The operation is combing pending state with best possible state. 
Since some replicas have
-        // been moved from one instance to another, the instance will exist in 
pending state but not
-        // best possible. Thus Helix need to check the pending state and 
cancel it.
+  private void generateMessage(final Resource resource, final ClusterDataCache 
cache,
+      final ResourcesStateMap resourcesStateMap, final CurrentStateOutput 
currentStateOutput,
+      final HelixManager manager, final Map<String, String> sessionIdMap,
+      final ClusterEventType eventType, MessageOutput output,
+      Map<String, Map<String, Message>> pendingMessagesToCleanUp) {
+    String resourceName = resource.getResourceName();
+
+    StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
+    if (stateModelDef == null) {
+      LogUtil.logError(logger, _eventId,
+          "State Model Definition null, skip generating messages for resource: 
" + resourceName);
+      return;
+    }
 
-        for (String instance : pendingStateMap.keySet()) {
-          if (!instanceStateMap.containsKey(instance)) {
-            instanceStateMap.put(instance, NO_DESIRED_STATE);
-          }
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> instanceStateMap =
+          new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, 
partition));
+      Map<String, String> pendingStateMap =
+          currentStateOutput.getPendingStateMap(resourceName, partition);
+
+      // The operation is combing pending state with best possible state. 
Since some replicas have
+      // been moved from one instance to another, the instance will exist in 
pending state but not
+      // best possible. Thus Helix need to check the pending state and cancel 
it.
+
+      for (String instance : pendingStateMap.keySet()) {
+        if (!instanceStateMap.containsKey(instance)) {
+          instanceStateMap.put(instance, NO_DESIRED_STATE);
         }
+      }
+
+      // we should generate message based on the desired-state priority
+      // so keep generated messages in a temp map keyed by state
+      // desired-state->list of generated-messages
+      Map<String, List<Message>> messageMap = new HashMap<>();
 
-        // we should generate message based on the desired-state priority
-        // so keep generated messages in a temp map keyed by state
-        // desired-state->list of generated-messages
-        Map<String, List<Message>> messageMap = new HashMap<>();
+      for (String instanceName : instanceStateMap.keySet()) {
+        String desiredState = instanceStateMap.get(instanceName);
 
-        for (String instanceName : instanceStateMap.keySet()) {
-          String desiredState = instanceStateMap.get(instanceName);
+        String currentState =
+            currentStateOutput.getCurrentState(resourceName, partition, 
instanceName);
+        if (currentState == null) {
+          currentState = stateModelDef.getInitialState();
+        }
 
-          String currentState =
-              currentStateOutput.getCurrentState(resourceName, partition, 
instanceName);
-          if (currentState == null) {
-            currentState = stateModelDef.getInitialState();
+        Message pendingMessage =
+            currentStateOutput.getPendingMessage(resourceName, partition, 
instanceName);
+        boolean isCancellationEnabled = 
cache.getClusterConfig().isStateTransitionCancelEnabled();
+        Message cancellationMessage =
+            currentStateOutput.getCancellationMessage(resourceName, partition, 
instanceName);
+        String nextState = 
stateModelDef.getNextStateForTransition(currentState, desiredState);
+
+        Message message = null;
+
+        if (pendingMessage != null && 
shouldCleanUpPendingMessage(pendingMessage, currentState,
+            currentStateOutput.getEndTime(resourceName, partition, 
instanceName))) {
+          LogUtil.logInfo(logger, _eventId, String.format(
+              "Adding pending message %s on instance %s to clean up. Msg: 
%s->%s, current state of resource %s:%s is %s",
+              pendingMessage.getMsgId(), instanceName, 
pendingMessage.getFromState(),
+              pendingMessage.getToState(), resourceName, partition, 
currentState));
+          if (!pendingMessagesToCleanUp.containsKey(instanceName)) {
+            pendingMessagesToCleanUp.put(instanceName, new HashMap<String, 
Message>());
           }
+          
pendingMessagesToCleanUp.get(instanceName).put(pendingMessage.getMsgId(), 
pendingMessage);
+        }
 
-          Message pendingMessage =
-              currentStateOutput.getPendingMessage(resourceName, partition, 
instanceName);
-          boolean isCancellationEnabled = 
cache.getClusterConfig().isStateTransitionCancelEnabled();
-          Message cancellationMessage =
-              currentStateOutput.getCancellationMessage(resourceName, 
partition, instanceName);
-          String nextState = 
stateModelDef.getNextStateForTransition(currentState, desiredState);
-
-          Message message = null;
-
-          if (pendingMessage != null && 
shouldCleanUpPendingMessage(pendingMessage, currentState,
-              currentStateOutput.getEndTime(resourceName, partition, 
instanceName))) {
-            LogUtil.logInfo(logger, _eventId, String.format(
-                "Adding pending message %s on instance %s to clean up. Msg: 
%s->%s, current state of resource %s:%s is %s",
-                pendingMessage.getMsgId(), instanceName, 
pendingMessage.getFromState(),
-                pendingMessage.getToState(), resourceName, partition, 
currentState));
-            if (!pendingMessagesToCleanUp.containsKey(instanceName)) {
-                pendingMessagesToCleanUp.put(instanceName, new HashMap<String, 
Message>());
-            }
-            pendingMessagesToCleanUp.get(instanceName)
-                .put(pendingMessage.getMsgId(), pendingMessage);
+        if (desiredState.equals(NO_DESIRED_STATE) || 
desiredState.equalsIgnoreCase(currentState)) {
+          if (desiredState.equals(NO_DESIRED_STATE) || pendingMessage != null 
&& !currentState
+              .equalsIgnoreCase(pendingMessage.getToState())) {
+            message = createStateTransitionCancellationMessage(manager, 
resource,
+                partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
+                stateModelDef.getId(), pendingMessage.getFromState(), 
pendingMessage.getToState(),
+                null, cancellationMessage, isCancellationEnabled, 
currentState);
+          }
+        } else {
+          if (nextState == null) {
+            LogUtil.logError(logger, _eventId,
+                "Unable to find a next state for resource: " + 
resource.getResourceName()
+                    + " partition: " + partition.getPartitionName() + " from 
stateModelDefinition"
+                    + stateModelDef.getClass() + " from:" + currentState + " 
to:" + desiredState);
+            continue;
           }
 
-          if (desiredState.equals(NO_DESIRED_STATE) || desiredState
-              .equalsIgnoreCase(currentState)) {
-            if (desiredState.equals(NO_DESIRED_STATE) || pendingMessage != 
null && !currentState
-                .equalsIgnoreCase(pendingMessage.getToState())) {
+          if (pendingMessage != null) {
+            String pendingState = pendingMessage.getToState();
+            if (nextState.equalsIgnoreCase(pendingState)) {
+              LogUtil.logDebug(logger, _eventId,
+                  "Message already exists for " + instanceName + " to transit 
" + resource
+                      .getResourceName() + "." + partition.getPartitionName() 
+ " from "
+                      + currentState + " to " + nextState);
+            } else if (currentState.equalsIgnoreCase(pendingState)) {
+              LogUtil.logInfo(logger, _eventId,
+                  "Message hasn't been removed for " + instanceName + " to 
transit " + resource
+                      .getResourceName() + "." + partition.getPartitionName() 
+ " to "
+                      + pendingState + ", desiredState: " + desiredState);
+            } else {
+              LogUtil.logInfo(logger, _eventId,
+                  "IdealState changed before state transition completes for " 
+ resource
+                      .getResourceName() + "." + partition.getPartitionName() 
+ " on "
+                      + instanceName + ", pendingState: " + pendingState + ", 
currentState: "
+                      + currentState + ", nextState: " + nextState);
+
               message = createStateTransitionCancellationMessage(manager, 
resource,
                   partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
-                  stateModelDef.getId(), pendingMessage.getFromState(), 
pendingMessage.getToState(),
-                  null, cancellationMessage, isCancellationEnabled, 
currentState);
+                  stateModelDef.getId(), pendingMessage.getFromState(), 
pendingState, nextState,
+                  cancellationMessage, isCancellationEnabled, currentState);
             }
           } else {
-            if (nextState == null) {
-              LogUtil.logError(logger, _eventId,
-                  "Unable to find a next state for resource: " + 
resource.getResourceName()
-                      + " partition: " + partition.getPartitionName() + " from 
stateModelDefinition"
-                      + stateModelDef.getClass() + " from:" + currentState + " 
to:" + desiredState);
-              continue;
-            }
-
-            if (pendingMessage != null) {
-              String pendingState = pendingMessage.getToState();
-              if (nextState.equalsIgnoreCase(pendingState)) {
-                LogUtil.logDebug(logger, _eventId,
-                    "Message already exists for " + instanceName + " to 
transit " + resource
-                        .getResourceName() + "." + 
partition.getPartitionName() + " from "
-                        + currentState + " to " + nextState);
-              } else if (currentState.equalsIgnoreCase(pendingState)) {
-                LogUtil.logInfo(logger, _eventId,
-                    "Message hasn't been removed for " + instanceName + " to 
transit " + resource
-                        .getResourceName() + "." + 
partition.getPartitionName() + " to "
-                        + pendingState + ", desiredState: " + desiredState);
-              } else {
-                LogUtil.logInfo(logger, _eventId,
-                    "IdealState changed before state transition completes for 
" + resource
-                        .getResourceName() + "." + 
partition.getPartitionName() + " on "
-                        + instanceName + ", pendingState: " + pendingState + 
", currentState: "
-                        + currentState + ", nextState: " + nextState);
-
-                message = createStateTransitionCancellationMessage(manager, 
resource,
-                    partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
-                    stateModelDef.getId(), pendingMessage.getFromState(), 
pendingState, nextState,
-                    cancellationMessage, isCancellationEnabled, currentState);
-              }
-            } else {
-              // Create new state transition message
-              message =
-                  createStateTransitionMessage(manager, resource, 
partition.getPartitionName(),
-                      instanceName, currentState, nextState, 
sessionIdMap.get(instanceName),
-                      stateModelDef.getId());
-
-              if (logger.isDebugEnabled()) {
-                LogUtil.logDebug(logger, _eventId, String.format(
-                    "Resource %s partition %s for instance %s with 
currentState %s and nextState %s",
-                    resource.getResourceName(), partition.getPartitionName(), 
instanceName,
-                    currentState, nextState));
-              }
+            // Create new state transition message
+            message = createStateTransitionMessage(manager, resource, 
partition.getPartitionName(),
+                instanceName, currentState, nextState, 
sessionIdMap.get(instanceName),
+                stateModelDef.getId());
+
+            if (logger.isDebugEnabled()) {
+              LogUtil.logDebug(logger, _eventId, String.format(
+                  "Resource %s partition %s for instance %s with currentState 
%s and nextState %s",
+                  resource.getResourceName(), partition.getPartitionName(), 
instanceName,
+                  currentState, nextState));
             }
           }
+        }
 
-          if (message != null) {
-            IdealState idealState = cache.getIdealState(resourceName);
-            if (idealState != null && idealState.getStateModelDefRef()
-                
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              if 
(idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                
message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                    
idealState.getRecord().getMapField(partition.getPartitionName()));
-              }
+        if (message != null) {
+          IdealState idealState = cache.getIdealState(resourceName);
+          if (idealState != null && idealState.getStateModelDefRef()
+              
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            if 
(idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+              
message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+                  
idealState.getRecord().getMapField(partition.getPartitionName()));
             }
+          }
 
-            int timeout =
-                getTimeOut(cache.getClusterConfig(), 
cache.getResourceConfig(resourceName),
-                    currentState, nextState, idealState, partition);
-            if (timeout > 0) {
-              message.setExecutionTimeout(timeout);
-            }
+          int timeout = getTimeOut(cache.getClusterConfig(), 
cache.getResourceConfig(resourceName),
+              currentState, nextState, idealState, partition);
+          if (timeout > 0) {
+            message.setExecutionTimeout(timeout);
+          }
 
-            message.setAttribute(Message.Attributes.ClusterEventName, 
event.getEventType().name());
-            // output.addMessage(resourceName, partition, message);
-            if (!messageMap.containsKey(desiredState)) {
-              messageMap.put(desiredState, new ArrayList<Message>());
-            }
-            messageMap.get(desiredState).add(message);
+          message.setAttribute(Message.Attributes.ClusterEventName, 
eventType.name());
+          // output.addMessage(resourceName, partition, message);
+          if (!messageMap.containsKey(desiredState)) {
+            messageMap.put(desiredState, new ArrayList<Message>());
           }
+          messageMap.get(desiredState).add(message);
         }
+      }
 
-        // add generated messages to output according to state priority
-        List<String> statesPriorityList = 
stateModelDef.getStatesPriorityList();
-        for (String state : statesPriorityList) {
-          if (messageMap.containsKey(state)) {
-            for (Message message : messageMap.get(state)) {
-              output.addMessage(resourceName, partition, message);
-            }
+      // add generated messages to output according to state priority
+      List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+      for (String state : statesPriorityList) {
+        if (messageMap.containsKey(state)) {
+          for (Message message : messageMap.get(state)) {
+            output.addMessage(resourceName, partition, message);
           }
         }
-
-      } // end of for-each-partition
-    }
-
-    // Asynchronously clean up pending messages if necessary
-    if (!pendingMessagesToCleanUp.isEmpty()) {
-      schedulePendingMessageCleanUp(pendingMessagesToCleanUp, 
cache.getAsyncTasksThreadPool(),
-          manager.getHelixDataAccessor());
-    }
-    event.addAttribute(AttributeName.MESSAGES_ALL.name(), output);
+      }
+    } // end of for-each-partition
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index eeb36cd..c44c731 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -77,21 +78,25 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
 
     for (String resourceName : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceName);
-      StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
-
-      Map<String, Integer> stateTransitionPriorities = 
getStateTransitionPriorityMap(stateModelDef);
-      IdealState idealState = cache.getIdealState(resourceName);
-      Map<String, Bounds> stateConstraints =
-          computeStateConstraints(stateModelDef, idealState, cache);
-      for (Partition partition : resource.getPartitions()) {
-        List<Message> messages = messageGenOutput.getMessages(resourceName, 
partition);
-        List<Message> selectedMessages = 
selectMessages(cache.getLiveInstances(),
-            currentStateOutput.getCurrentStateMap(resourceName, partition),
-            currentStateOutput.getPendingMessageMap(resourceName, partition), 
messages,
-            currentStateOutput.getPendingRelayMessageMap(resourceName, 
partition).values(),
-            stateConstraints, stateTransitionPriorities, stateModelDef,
-            resource.isP2PMessageEnabled());
-        output.addMessages(resourceName, partition, selectedMessages);
+      try {
+        StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
+        Map<String, Integer> stateTransitionPriorities = 
getStateTransitionPriorityMap(stateModelDef);
+        IdealState idealState = cache.getIdealState(resourceName);
+        Map<String, Bounds> stateConstraints =
+            computeStateConstraints(stateModelDef, idealState, cache);
+        for (Partition partition : resource.getPartitions()) {
+          List<Message> messages = messageGenOutput.getMessages(resourceName, 
partition);
+          List<Message> selectedMessages = 
selectMessages(cache.getLiveInstances(),
+              currentStateOutput.getCurrentStateMap(resourceName, partition),
+              currentStateOutput.getPendingMessageMap(resourceName, 
partition), messages,
+              currentStateOutput.getPendingRelayMessageMap(resourceName, 
partition).values(),
+              stateConstraints, stateTransitionPriorities, stateModelDef,
+              resource.isP2PMessageEnabled());
+          output.addMessages(resourceName, partition, selectedMessages);
+        }
+      } catch (HelixException ex) {
+        LogUtil.logError(LOG, _eventId,
+            "Failed to finish message selection for resource " + resourceName, 
ex);
       }
     }
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), output);

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index d68c3df..1e52c18 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -111,7 +111,7 @@ public class MessageThrottleStage extends AbstractBaseStage 
{
         }
       }
     }
-    return new HashSet<ConstraintItem>(selectedItems.values());
+    return new HashSet<>(selectedItems.values());
   }
 
   @Override
@@ -135,7 +135,7 @@ public class MessageThrottleStage extends AbstractBaseStage 
{
     if (constraint != null) {
       // go through all pending messages, they should be counted but not 
throttled
       for (String instance : cache.getLiveInstances().keySet()) {
-        throttle(throttleCounterMap, constraint, new 
ArrayList<Message>(cache.getMessages(instance)
+        throttle(throttleCounterMap, constraint, new 
ArrayList<>(cache.getMessages(instance)
             .values()), false);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/0beeb8fa/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index f909c9e..6c04d98 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -25,11 +25,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
@@ -39,7 +42,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
-import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,68 +75,78 @@ public class PersistAssignmentStage extends 
AbstractAsyncBaseStage {
     Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
 
     for (String resourceId : bestPossibleAssignment.resourceSet()) {
-      Resource resource = resourceMap.get(resourceId);
-      if (resource != null) {
-        final IdealState idealState = cache.getIdealState(resourceId);
-        if (idealState == null) {
-          LogUtil
-              .logWarn(LOG, event.getEventId(), "IdealState not found for 
resource " + resourceId);
-          continue;
-        }
-        IdealState.RebalanceMode mode = idealState.getRebalanceMode();
-        if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
-            .equals(IdealState.RebalanceMode.FULL_AUTO)) {
-          // do not persist assignment for resource in neither semi or full 
auto.
-          continue;
-        }
+      try {
+        persistAssignment(resourceMap.get(resourceId), cache, event, 
bestPossibleAssignment,
+            clusterConfig, accessor, keyBuilder);
+      } catch (HelixException ex) {
+        LogUtil
+            .logError(LOG, _eventId, "Failed to persist assignment for 
resource " + resourceId, ex);
+      }
+    }
+  }
 
-        boolean needPersist = false;
-        if (mode.equals(IdealState.RebalanceMode.FULL_AUTO)) {
-          // persist preference list in ful-auto mode.
-          Map<String, List<String>> newLists =
-              bestPossibleAssignment.getPreferenceLists(resourceId);
-          if (newLists != null && hasPreferenceListChanged(newLists, 
idealState)) {
-            idealState.setPreferenceLists(newLists);
-            needPersist = true;
-          }
-        }
+  private void persistAssignment(final Resource resource, final 
ClusterDataCache cache,
+      final ClusterEvent event, final BestPossibleStateOutput 
bestPossibleAssignment,
+      final ClusterConfig clusterConfig, final HelixDataAccessor accessor,
+      final PropertyKey.Builder keyBuilder) {
+    String resourceId = resource.getResourceName();
+    if (resource != null) {
+      final IdealState idealState = cache.getIdealState(resourceId);
+      if (idealState == null) {
+        LogUtil.logWarn(LOG, event.getEventId(), "IdealState not found for 
resource " + resourceId);
+        return;
+      }
+      IdealState.RebalanceMode mode = idealState.getRebalanceMode();
+      if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
+          .equals(IdealState.RebalanceMode.FULL_AUTO)) {
+        // do not persist assignment for resource in neither semi or full auto.
+        return;
+      }
 
-        PartitionStateMap partitionStateMap =
-            bestPossibleAssignment.getPartitionStateMap(resourceId);
-        if (clusterConfig.isPersistIntermediateAssignment()) {
-          IntermediateStateOutput intermediateAssignment = event.getAttribute(
-              AttributeName.INTERMEDIATE_STATE.name());
-          partitionStateMap = 
intermediateAssignment.getPartitionStateMap(resourceId);
+      boolean needPersist = false;
+      if (mode.equals(IdealState.RebalanceMode.FULL_AUTO)) {
+        // persist preference list in ful-auto mode.
+        Map<String, List<String>> newLists = 
bestPossibleAssignment.getPreferenceLists(resourceId);
+        if (newLists != null && hasPreferenceListChanged(newLists, 
idealState)) {
+          idealState.setPreferenceLists(newLists);
+          needPersist = true;
         }
+      }
 
-        //TODO: temporary solution for Espresso/Dbus backcompatible, should 
remove this.
-        Map<Partition, Map<String, String>> assignmentToPersist =
-            convertAssignmentPersisted(resource, idealState, 
partitionStateMap.getStateMap());
+      PartitionStateMap partitionStateMap = 
bestPossibleAssignment.getPartitionStateMap(resourceId);
+      if (clusterConfig.isPersistIntermediateAssignment()) {
+        IntermediateStateOutput intermediateAssignment =
+            event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+        partitionStateMap = 
intermediateAssignment.getPartitionStateMap(resourceId);
+      }
 
-        if (assignmentToPersist != null && 
hasInstanceMapChanged(assignmentToPersist, idealState)) {
-          for (Partition partition : assignmentToPersist.keySet()) {
-            Map<String, String> instanceMap = 
assignmentToPersist.get(partition);
-            idealState.setInstanceStateMap(partition.getPartitionName(), 
instanceMap);
-          }
-          needPersist = true;
+      //TODO: temporary solution for Espresso/Dbus backcompatible, should 
remove this.
+      Map<Partition, Map<String, String>> assignmentToPersist =
+          convertAssignmentPersisted(resource, idealState, 
partitionStateMap.getStateMap());
+
+      if (assignmentToPersist != null && 
hasInstanceMapChanged(assignmentToPersist, idealState)) {
+        for (Partition partition : assignmentToPersist.keySet()) {
+          Map<String, String> instanceMap = assignmentToPersist.get(partition);
+          idealState.setInstanceStateMap(partition.getPartitionName(), 
instanceMap);
         }
+        needPersist = true;
+      }
 
-        if (needPersist) {
-          // Update instead of set to ensure any intermediate changes that the 
controller does not update are kept.
-          accessor.updateProperty(keyBuilder.idealStates(resourceId), new 
DataUpdater<ZNRecord>() {
-            @Override
-            public ZNRecord update(ZNRecord current) {
-              if (current != null) {
-                // Overwrite MapFields and ListFields items with the same key.
-                // Note that default merge will keep old values in the maps or 
lists unchanged, which is not desired.
-                current.getMapFields().clear();
-                
current.getMapFields().putAll(idealState.getRecord().getMapFields());
-                
current.getListFields().putAll(idealState.getRecord().getListFields());
-              }
-              return current;
+      if (needPersist) {
+        // Update instead of set to ensure any intermediate changes that the 
controller does not update are kept.
+        accessor.updateProperty(keyBuilder.idealStates(resourceId), new 
DataUpdater<ZNRecord>() {
+          @Override
+          public ZNRecord update(ZNRecord current) {
+            if (current != null) {
+              // Overwrite MapFields and ListFields items with the same key.
+              // Note that default merge will keep old values in the maps or 
lists unchanged, which is not desired.
+              current.getMapFields().clear();
+              
current.getMapFields().putAll(idealState.getRecord().getMapFields());
+              
current.getListFields().putAll(idealState.getRecord().getListFields());
             }
-          }, idealState);
-        }
+            return current;
+          }
+        }, idealState);
       }
     }
   }

Reply via email to