This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit dce65526e62f6ea3832993a2a862cf0a4411fc05 Author: Hunter Lee <[email protected]> AuthorDate: Fri Oct 25 17:01:16 2019 -0700 Make WagedRebalancer static by creating a ThreadLocal (#540) ZKBucketDataAccessor has a GC logic, but this is only valid if the ZkClient inside it is active and not closed. Currently, WAGED rebalancer generates an instance of AssignmentMetadataStore every time it rebalances, which does not allow the internal ZkBucketDataAccessor to garbage collect the assignment metadata it wrote previously. This diff makes the entire WagedRebalancer object a ThreadLocal, which has the effect of making it essentially static across different runs of the pipeline. --- .../stages/BestPossibleStateCalcStage.java | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 6f442ea..1bd1fdf 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -67,8 +67,12 @@ import org.slf4j.LoggerFactory; public class BestPossibleStateCalcStage extends AbstractBaseStage { private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName()); - // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread only. - private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = new ThreadLocal<>(); + // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread + // only. + private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL = + new ThreadLocal<>(); + private static final ThreadLocal<WagedRebalancer> WAGED_REBALANCER_THREAD_LOCAL = + new ThreadLocal<>(); @Override public void process(ClusterEvent event) throws Exception { @@ -261,6 +265,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig().getGlobalRebalancePreference(); + // Create MetricCollector ThreadLocal if it hasn't been already initialized if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) { try { // If HelixManager is null, we just pass in null for MetricCollector so that a @@ -278,9 +283,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { } } - // TODO avoid creating the rebalancer on every rebalance call for performance enhancement - WagedRebalancer wagedRebalancer = - new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()); + // Create MetricCollector ThreadLocal if it hasn't been already initialized + if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) { + WAGED_REBALANCER_THREAD_LOCAL + .set(new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get())); + } + WagedRebalancer wagedRebalancer = WAGED_REBALANCER_THREAD_LOCAL.get(); + try { newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput)); @@ -293,9 +302,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { "Failed to calculate the new Ideal States using the rebalancer %s due to %s", wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); - } finally { - wagedRebalancer.close(); } + Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator(); while (itr.hasNext()) { Resource resource = itr.next();
