This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f8ab3428bf8ed9cbba17274f89eeb3799f55ca2c
Author: Jiajun Wang <[email protected]>
AuthorDate: Fri Oct 25 12:21:11 2019 -0700

    Refine the rebalance scope calculating logic in the WAGED rebalancer. (#519)
    
    * Refine the rebalane scope calculating logic in the WAGED rebalancer.
    
    1. Ignore the IdealState mapping/listing fields if the resource is in 
FULL_AUTO mode.
    2. On IdealState change, the resource shall be fully rebalanced since some 
filter conditions might be changed. Such as instance tag.
    3. Live instance change (node newly connected) shall trigger full rebalance 
so partitions will be re-assigned to the new node.
    4. Modify the related test cases.
    5. Adding an option to the change detector so if it is used elsewhere, the 
caller has an option to listen to any change.
---
 .../changedetector/ResourceChangeDetector.java     | 22 +++++----
 .../changedetector/ResourceChangeSnapshot.java     | 53 ++++++++++++++++++----
 .../rebalancer/waged/WagedRebalancer.java          |  5 +-
 .../waged/model/ClusterModelProvider.java          | 30 ++++++++----
 .../changedetector/TestResourceChangeDetector.java | 45 ++++++++++++++++++
 .../rebalancer/waged/TestWagedRebalancer.java      | 16 +++----
 .../waged/model/AbstractTestClusterModel.java      |  4 ++
 7 files changed, 138 insertions(+), 37 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
 
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index 1df61f8..8402efd 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -19,6 +19,12 @@ package org.apache.helix.controller.changedetector;
  * under the License.
  */
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
 import com.google.common.collect.Sets;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
@@ -27,12 +33,6 @@ import org.apache.helix.model.ClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
 /**
  * ResourceChangeDetector implements ChangeDetector. It caches 
resource-related metadata from
  * Helix's main resource pipeline cache (DataProvider) and the computation 
results of change
@@ -42,6 +42,7 @@ import java.util.Map;
 public class ResourceChangeDetector implements ChangeDetector {
   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
 
+  private final boolean _ignoreControllerGeneratedFields;
   private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous 
pipeline run
   private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline 
run
 
@@ -50,8 +51,13 @@ public class ResourceChangeDetector implements 
ChangeDetector {
   private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new 
HashMap<>();
   private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = 
new HashMap<>();
 
-  public ResourceChangeDetector() {
+  public ResourceChangeDetector(boolean ignoreControllerGeneratedFields) {
     _newSnapshot = new ResourceChangeSnapshot();
+    _ignoreControllerGeneratedFields = ignoreControllerGeneratedFields;
+  }
+
+  public ResourceChangeDetector() {
+    this(false);
   }
 
   /**
@@ -135,7 +141,7 @@ public class ResourceChangeDetector implements 
ChangeDetector {
   public synchronized void updateSnapshots(ResourceControllerDataProvider 
dataProvider) {
     // If there are changes, update internal states
     _oldSnapshot = new ResourceChangeSnapshot(_newSnapshot);
-    _newSnapshot = new ResourceChangeSnapshot(dataProvider);
+    _newSnapshot = new ResourceChangeSnapshot(dataProvider, 
_ignoreControllerGeneratedFields);
     dataProvider.clearRefreshedChangeTypes();
 
     // Invalidate cached computation
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
 
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
index 6351eb9..c965124 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.changedetector;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -67,11 +68,21 @@ class ResourceChangeSnapshot {
    * Constructor using controller cache (ResourceControllerDataProvider).
    *
    * @param dataProvider
+   * @param ignoreControllerGeneratedFields if true, the snapshot won't record 
any changes that is
+   *                                        being modified by the controller.
    */
-  ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider) {
+  ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider,
+      boolean ignoreControllerGeneratedFields) {
     _changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
     _instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap());
     _idealStateMap = new HashMap<>(dataProvider.getIdealStates());
+    if (ignoreControllerGeneratedFields && (
+        dataProvider.getClusterConfig().isPersistBestPossibleAssignment() || 
dataProvider
+            .getClusterConfig().isPersistIntermediateAssignment())) {
+      for (String resourceName : _idealStateMap.keySet()) {
+        _idealStateMap.put(resourceName, 
trimIdealState(_idealStateMap.get(resourceName)));
+      }
+    }
     _resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap());
     _liveInstances = new HashMap<>(dataProvider.getLiveInstances());
     _clusterConfig = dataProvider.getClusterConfig();
@@ -79,15 +90,15 @@ class ResourceChangeSnapshot {
 
   /**
    * Copy constructor for ResourceChangeCache.
-   * @param cache
+   * @param snapshot
    */
-  ResourceChangeSnapshot(ResourceChangeSnapshot cache) {
-    _changedTypes = new HashSet<>(cache._changedTypes);
-    _instanceConfigMap = new HashMap<>(cache._instanceConfigMap);
-    _idealStateMap = new HashMap<>(cache._idealStateMap);
-    _resourceConfigMap = new HashMap<>(cache._resourceConfigMap);
-    _liveInstances = new HashMap<>(cache._liveInstances);
-    _clusterConfig = cache._clusterConfig;
+  ResourceChangeSnapshot(ResourceChangeSnapshot snapshot) {
+    _changedTypes = new HashSet<>(snapshot._changedTypes);
+    _instanceConfigMap = new HashMap<>(snapshot._instanceConfigMap);
+    _idealStateMap = new HashMap<>(snapshot._idealStateMap);
+    _resourceConfigMap = new HashMap<>(snapshot._resourceConfigMap);
+    _liveInstances = new HashMap<>(snapshot._liveInstances);
+    _clusterConfig = snapshot._clusterConfig;
   }
 
   Set<HelixConstants.ChangeType> getChangedTypes() {
@@ -113,4 +124,28 @@ class ResourceChangeSnapshot {
   ClusterConfig getClusterConfig() {
     return _clusterConfig;
   }
+
+  // Trim the IdealState to exclude any controller modified information.
+  private IdealState trimIdealState(IdealState originalIdealState) {
+    // Clone the IdealState to avoid modifying the objects in the Cluster Data 
Cache, which might
+    // be used by the other stages in the pipeline.
+    IdealState trimmedIdealState = new 
IdealState(originalIdealState.getRecord());
+    switch (originalIdealState.getRebalanceMode()) {
+      case FULL_AUTO:
+        // For FULL_AUTO resources, both map fields and list fields are not 
considered as data input
+        // for the controller. The controller will write to these two types of 
fields for persisting
+        // the assignment mapping.
+        trimmedIdealState.getRecord().setListFields(Collections.emptyMap());
+        trimmedIdealState.getRecord().setMapFields(Collections.emptyMap());
+        break;
+      case SEMI_AUTO:
+        // For SEMI_AUTO resources, map fields are not considered as data 
input for the controller.
+        // The controller will write to the map fields for persisting the 
assignment mapping.
+        trimmedIdealState.getRecord().setMapFields(Collections.emptyMap());
+        break;
+      default:
+        break;
+    }
+    return trimmedIdealState;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 5b2573f..b455992 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -71,6 +71,7 @@ public class WagedRebalancer {
   // contains 1. baseline recalculate, 2. partial rebalance that is based on 
the new baseline.
   private static final Set<HelixConstants.ChangeType> 
GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
       ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
+          HelixConstants.ChangeType.IDEAL_STATE,
           HelixConstants.ChangeType.CLUSTER_CONFIG, 
HelixConstants.ChangeType.INSTANCE_CONFIG);
   // The cluster change detector is a stateful object.
   // Make it static to avoid unnecessary reinitialization.
@@ -256,8 +257,6 @@ public class WagedRebalancer {
     if (clusterChanges.keySet().stream()
         .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
       refreshBaseline(clusterData, clusterChanges, resourceMap, 
currentStateOutput);
-      // Inject a cluster config change for large scale partial rebalance once 
the baseline changed.
-      clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, 
Collections.emptySet());
     }
 
     Set<String> activeNodes = 
DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
@@ -470,7 +469,7 @@ public class WagedRebalancer {
 
   private ResourceChangeDetector getChangeDetector() {
     if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
-      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
+      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector(true));
     }
     return CHANGE_DETECTOR_THREAD_LOCAL.get();
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 276b998..4c32f4c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -77,7 +77,7 @@ public class ClusterModelProvider {
         new HashMap<>(); // <instanceName, replica set>
     Set<AssignableReplica> toBeAssignedReplicas =
         findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
-            bestPossibleAssignment, allocatedReplicas);
+            dataProvider.getLiveInstances().keySet(), bestPossibleAssignment, 
allocatedReplicas);
 
     // Update the allocated replicas to the assignable nodes.
     assignableNodes.stream().forEach(node -> node.assignInitBatch(
@@ -97,14 +97,13 @@ public class ClusterModelProvider {
    * Find the minimum set of replicas that need to be reassigned.
    * A replica needs to be reassigned if one of the following condition is 
true:
    * 1. Cluster topology (the cluster config / any instance config) has been 
updated.
-   * 2. The baseline assignment has been updated.
-   * 3. The resource config has been updated.
-   * 4. The resource idealstate has been updated. TODO remove this condition 
when all resource configurations are migrated to resource config.
-   * 5. If the current best possible assignment does not contain the 
partition's valid assignment.
+   * 2. The resource config has been updated.
+   * 3. If the current best possible assignment does not contain the 
partition's valid assignment.
    *
    * @param replicaMap             A map contains all the replicas grouped by 
resource name.
    * @param clusterChanges         A map contains all the important metadata 
updates that happened after the previous rebalance.
-   * @param activeInstances        All the instances that are alive and 
enabled.
+   * @param activeInstances        All the instances that are live and enabled 
according to the delay rebalance configuration.
+   * @param liveInstances          All the instances that are live.
    * @param bestPossibleAssignment The current best possible assignment.
    * @param allocatedReplicas      Return the allocated replicas grouped by 
the target instance name.
    * @return The replicas that need to be reassigned.
@@ -112,12 +111,25 @@ public class ClusterModelProvider {
   private static Set<AssignableReplica> findToBeAssignedReplicas(
       Map<String, Set<AssignableReplica>> replicaMap,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Set<String> 
activeInstances,
-      Map<String, ResourceAssignment> bestPossibleAssignment,
+      Set<String> liveInstances, Map<String, ResourceAssignment> 
bestPossibleAssignment,
       Map<String, Set<AssignableReplica>> allocatedReplicas) {
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    // A newly connected node = A new LiveInstance znode (or session Id 
updated) & the
+    // corresponding instance is live.
+    // TODO: The assumption here is that if the LiveInstance znode is created 
or it's session Id is
+    // TODO: updated, we need to call algorithm for moving some partitions to 
this new node.
+    // TODO: However, if the liveInstance znode is changed because of some 
other reason, it will be
+    // TODO: treated as a newly connected nodes. We need to find a better way 
to identify which one
+    // TODO: is the real newly connected nodes.
+    Set<String> newlyConnectedNodes = clusterChanges
+        .getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, 
Collections.emptySet());
+    newlyConnectedNodes.retainAll(liveInstances);
     if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) 
|| clusterChanges
-        .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      // If the cluster topology has been modified, need to reassign all 
replicas
+        .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG) || 
!newlyConnectedNodes.isEmpty()) {
+      // 1. If the cluster topology has been modified, need to reassign all 
replicas.
+      // 2. If any node was newly connected, need to rebalance all replicas 
for the evenness of
+      // distribution.
       toBeAssignedReplicas
           
.addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
     } else {
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
 
b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
index 445add4..8567c4c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -20,6 +20,9 @@ package org.apache.helix.controller.changedetector;
  */
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
@@ -31,6 +34,7 @@ import 
org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.testng.Assert;
@@ -339,6 +343,47 @@ public class TestResourceChangeDetector extends ZkTestBase 
{
   }
 
   /**
+   * Modify IdealState mapping fields for a FULL_AUTO resource and see if 
detector detects.
+   */
+  @Test(dependsOnMethods = "testNoChange")
+  public void testIgnoreControllerGeneratedFields() {
+    // Modify cluster config and IdealState to ensure the mapping field of the 
IdealState will be
+    // considered as the fields that are modified by Helix logic.
+    ClusterConfig clusterConfig = 
_dataAccessor.getProperty(_keyBuilder.clusterConfig());
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
+
+    // Create an new IS
+    String resourceName = "Resource" + TestHelper.getTestMethodName();
+    _gSetupTool.getClusterManagementTool()
+        .addResource(CLUSTER_NAME, resourceName, NUM_PARTITIONS, STATE_MODEL);
+    IdealState idealState = 
_dataAccessor.getProperty(_keyBuilder.idealStates(resourceName));
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    _dataAccessor.updateProperty(_keyBuilder.idealStates(resourceName), 
idealState);
+
+    _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+
+    ResourceChangeDetector changeDetector = new ResourceChangeDetector(true);
+    changeDetector.updateSnapshots(_dataProvider);
+
+    // Now, modify the field that is modifying by Helix logic
+    idealState.getRecord().getMapFields().put("Extra_Change", new HashMap<>());
+    _dataAccessor.updateProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME), 
idealState);
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+    changeDetector.updateSnapshots(_dataProvider);
+
+    Assert.assertEquals(changeDetector.getChangeTypes(),
+        Collections.singleton(ChangeType.IDEAL_STATE));
+    Assert.assertEquals(
+        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + 
changeDetector
+            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
+  }
+
+  /**
    * Check that the given change types appear in detector's change types.
    * @param types
    */
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 9b6ccbe..abf084d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.mockito.Mockito;
@@ -348,7 +349,6 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
 
     // Note that this test relies on the MockRebalanceAlgorithm 
implementation. The mock algorithm
     // won't propagate any existing assignment from the cluster model.
-
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
         new WagedRebalancer(_metadataStore, _algorithm, new 
DelayedAutoRebalancer());
@@ -378,15 +378,15 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
         _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, algorithmResult);
 
-    // 2. rebalance with one ideal state changed only
+    // 2. rebalance with one resource changed in the Resource Config znode only
     String changedResourceName = _resourceNames.get(0);
-    // Create a new cluster data cache to simulate cluster change
-    clusterData = setupClusterDataCache();
     when(clusterData.getRefreshedChangeTypes())
-        
.thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
-    IdealState is = clusterData.getIdealState(changedResourceName);
-    // Update the tag so the ideal state will be marked as changed.
-    is.setInstanceGroupTag("newTag");
+        
.thenReturn(Collections.singleton(HelixConstants.ChangeType.RESOURCE_CONFIG));
+    ResourceConfig config = new 
ResourceConfig(clusterData.getResourceConfig(changedResourceName).getRecord());
+    // Update the config so the resource will be marked as changed.
+    config.putSimpleConfig("foo", "bar");
+    
when(clusterData.getResourceConfig(changedResourceName)).thenReturn(config);
+    clusterData.getResourceConfigMap().put(changedResourceName, config);
 
     // Although the input contains 2 resources, the rebalancer shall only call 
the algorithm to
     // rebalance the changed one.
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 2350448..54cbd41 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -162,6 +162,10 @@ public abstract class AbstractTestClusterModel {
     testResourceConfigResource2.setPartitionCapacityMap(
         Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, 
capacityDataMapResource2));
     
when(testCache.getResourceConfig("Resource2")).thenReturn(testResourceConfigResource2);
+    Map<String, ResourceConfig> configMap = new HashMap<>();
+    configMap.put("Resource1", testResourceConfigResource1);
+    configMap.put("Resource2", testResourceConfigResource2);
+    when(testCache.getResourceConfigMap()).thenReturn(configMap);
 
     // 6. Define mock state model
     for (BuiltInStateModelDefinitions bsmd : 
BuiltInStateModelDefinitions.values()) {

Reply via email to