This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit dce65526e62f6ea3832993a2a862cf0a4411fc05
Author: Hunter Lee <[email protected]>
AuthorDate: Fri Oct 25 17:01:16 2019 -0700

    Make WagedRebalancer static by creating a ThreadLocal (#540)
    
    ZKBucketDataAccessor has a GC logic, but this is only valid if the ZkClient 
inside it is active and not closed. Currently, WAGED rebalancer generates an 
instance of AssignmentMetadataStore every time it rebalances, which does not 
allow the internal ZkBucketDataAccessor to garbage collect the assignment 
metadata it wrote previously.
    
    This diff makes the entire WagedRebalancer object a ThreadLocal, which has 
the effect of making it essentially static across different runs of the 
pipeline.
---
 .../stages/BestPossibleStateCalcStage.java         | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 6f442ea..1bd1fdf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -67,8 +67,12 @@ import org.slf4j.LoggerFactory;
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger logger =
       LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
-  // Create a ThreadLocal of MetricCollector. Metrics could only be updated by 
the controller thread only.
-  private static final ThreadLocal<MetricCollector> 
METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>();
+  // Create a ThreadLocal of MetricCollector. Metrics could only be updated by 
the controller thread
+  // only.
+  private static final ThreadLocal<MetricCollector> 
METRIC_COLLECTOR_THREAD_LOCAL =
+      new ThreadLocal<>();
+  private static final ThreadLocal<WagedRebalancer> 
WAGED_REBALANCER_THREAD_LOCAL =
+      new ThreadLocal<>();
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -261,6 +265,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
         cache.getClusterConfig().getGlobalRebalancePreference();
 
+    // Create MetricCollector ThreadLocal if it hasn't been already initialized
     if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
       try {
         // If HelixManager is null, we just pass in null for MetricCollector 
so that a
@@ -278,9 +283,13 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
       }
     }
 
-    // TODO avoid creating the rebalancer on every rebalance call for 
performance enhancement
-    WagedRebalancer wagedRebalancer =
-        new WagedRebalancer(helixManager, preferences, 
METRIC_COLLECTOR_THREAD_LOCAL.get());
+    // Create MetricCollector ThreadLocal if it hasn't been already initialized
+    if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) {
+      WAGED_REBALANCER_THREAD_LOCAL
+          .set(new WagedRebalancer(helixManager, preferences, 
METRIC_COLLECTOR_THREAD_LOCAL.get()));
+    }
+    WagedRebalancer wagedRebalancer = WAGED_REBALANCER_THREAD_LOCAL.get();
+
     try {
       newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, 
wagedRebalancedResourceMap,
           currentStateOutput));
@@ -293,9 +302,8 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
               "Failed to calculate the new Ideal States using the rebalancer 
%s due to %s",
               wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()),
           ex);
-    } finally {
-      wagedRebalancer.close();
     }
+
     Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator();
     while (itr.hasNext()) {
       Resource resource = itr.next();

Reply via email to