This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new e2ec350  Correct getAssignmentForWagedFullAutoImpl snapshot behavior  
(#1730)
e2ec350 is described below

commit e2ec35099535bdf4abe9139916e4f9764e0876bf
Author: Neal Sun <[email protected]>
AuthorDate: Tue May 11 10:22:20 2021 -0700

    Correct getAssignmentForWagedFullAutoImpl snapshot behavior  (#1730)
    
    This PR ensures that getAssignmentForWagedFullAutoImpl is initialized with 
a snapshot correctly.
    
    Co-authored-by: Neal Sun <[email protected]>
---
 .../controller/rebalancer/waged/ReadOnlyWagedRebalancer.java      | 4 ++++
 .../apache/helix/controller/rebalancer/waged/WagedRebalancer.java | 4 ++++
 helix-core/src/main/java/org/apache/helix/util/HelixUtil.java     | 8 +++++++-
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index 44a3c1c..80b62ee 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -60,6 +60,10 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer 
{
     return failureTypes;
   }
 
+  public void updateChangeDetectorSnapshots(ResourceControllerDataProvider 
dataProvider) {
+    getChangeDetector().updateSnapshots(dataProvider);
+  }
+
   private static class ReadOnlyAssignmentMetadataStore extends 
AssignmentMetadataStore {
     ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String 
clusterName) {
       super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 44df28c..7221b1e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -786,6 +786,10 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     return _metricCollector;
   }
 
+  protected ResourceChangeDetector getChangeDetector() {
+    return _changeDetector;
+  }
+
   @Override
   protected void finalize()
       throws Throwable {
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 38103e9..413305c 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -245,11 +245,17 @@ public final class HelixUtil {
         new ClusterEvent(globalSyncClusterConfig.getClusterName(), 
ClusterEventType.Unknown);
 
     try {
-      // Obtain a refreshed dataProvider (cache) and overwrite cluster 
parameters with the given parameters
+      // First, prepare waged rebalancer with a snapshot, so that it can react 
on the difference
+      // between the current snapshot and the provided parameters which act as 
the new snapshot
       ResourceControllerDataProvider dataProvider =
           new 
ResourceControllerDataProvider(globalSyncClusterConfig.getClusterName());
       dataProvider.requireFullRefresh();
       dataProvider.refresh(helixDataAccessor);
+      readOnlyWagedRebalancer.updateChangeDetectorSnapshots(dataProvider);
+      // Refresh dataProvider completely to populate _refreshedChangeTypes
+      dataProvider.requireFullRefresh();
+      dataProvider.refresh(helixDataAccessor);
+
       dataProvider.setClusterConfig(globalSyncClusterConfig);
       dataProvider.setInstanceConfigMap(instanceConfigs.stream()
           .collect(Collectors.toMap(InstanceConfig::getInstanceName, 
Function.identity())));

Reply via email to