This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 41c4d48  Fix Periodic rebalancer Timer leak (#1456)
41c4d48 is described below

commit 41c4d48b263128e65f36c3243bfb1b2fdab9f038
Author: xyuanlu <[email protected]>
AuthorDate: Mon Oct 26 12:13:59 2020 -0700

    Fix Periodic rebalancer Timer leak (#1456)
    
    In current startPeriodRebalance, two thread may interference with each 
other. This may result in one timer got canceled twice, two timers are created 
with one timer leaked. This PR changes Timer to use a 
SingleThreadScheduledExecutor and adds a synchronized block in start/stop 
PeriodRebalance.
---
 .../helix/controller/GenericHelixController.java   | 47 ++++++++++++++--------
 1 file changed, 30 insertions(+), 17 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 06e0fa1..a771c43 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -172,13 +173,15 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
   private boolean _inMaintenanceMode;
 
   /**
-   * The timer that can periodically run the rebalancing pipeline. The timer 
will start if there is
-   * one resource group has the config to use the timer.
+   * The executors that can periodically run the rebalancing pipeline. A
+   * SingleThreadScheduledExecutor will start if there is resource group that 
has the config to do
+   * periodically rebalance.
    */
-  Timer _periodicalRebalanceTimer = null;
+  private static final ScheduledExecutorService _periodicalRebalanceExecutor =
+      Executors.newSingleThreadScheduledExecutor();
+  private ScheduledFuture _periodicRebalanceFutureTask = null;
   long _timerPeriod = Long.MAX_VALUE;
 
-
   /**
    * The timer that triggers the on-demand rebalance pipeline.
    */
@@ -329,19 +332,23 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
   /**
    * Starts the rebalancing timer with the specified period. Start the timer 
if necessary; If the
    * period is smaller than the current period, cancel the current timer and 
use the new period.
+   * note: For case where 2 threads change value from v0->v1 and v0->v2 at the 
same time, the
+   * result is indeterminable.
    */
   void startPeriodRebalance(long period, HelixManager manager) {
     if (period != _timerPeriod) {
       logger.info("Controller starting periodical rebalance timer at period " 
+ period);
-      if (_periodicalRebalanceTimer != null) {
-        _periodicalRebalanceTimer.cancel();
+      ScheduledFuture lastScheduledFuture;
+      synchronized (_periodicalRebalanceExecutor) {
+        lastScheduledFuture = _periodicRebalanceFutureTask;
+        _timerPeriod = period;
+        _periodicRebalanceFutureTask = _periodicalRebalanceExecutor
+            .scheduleAtFixedRate(new RebalanceTask(manager, 
ClusterEventType.PeriodicalRebalance),
+                _timerPeriod, _timerPeriod, TimeUnit.MILLISECONDS);
+      }
+      if (lastScheduledFuture != null && !lastScheduledFuture.isCancelled()) {
+        lastScheduledFuture.cancel(false /* mayInterruptIfRunning */);
       }
-      _periodicalRebalanceTimer =
-          new Timer("GenericHelixController_" + _clusterName + 
"_periodical_Timer", true);
-      _timerPeriod = period;
-      _periodicalRebalanceTimer
-          .scheduleAtFixedRate(new RebalanceTask(manager, 
ClusterEventType.PeriodicalRebalance),
-              _timerPeriod, _timerPeriod);
     } else {
       logger.info("Controller already has periodical rebalance timer at period 
" + _timerPeriod);
     }
@@ -352,11 +359,11 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
    */
   void stopPeriodRebalance() {
     logger.info("Controller stopping periodical rebalance timer at period " + 
_timerPeriod);
-    if (_periodicalRebalanceTimer != null) {
-      _periodicalRebalanceTimer.cancel();
-      _periodicalRebalanceTimer = null;
-      _timerPeriod = Long.MAX_VALUE;
-      logger.info("Controller stopped periodical rebalance timer at period " + 
_timerPeriod);
+    synchronized (_periodicalRebalanceExecutor) {
+      if (_periodicRebalanceFutureTask != null && 
!_periodicRebalanceFutureTask.isCancelled()) {
+        _periodicRebalanceFutureTask.cancel(false /* mayInterruptIfRunning */);
+        _timerPeriod = Long.MAX_VALUE;
+      }
     }
   }
 
@@ -1300,6 +1307,12 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
 
   public void shutdown() throws InterruptedException {
     stopPeriodRebalance();
+    _periodicalRebalanceExecutor.shutdown();
+    if (!_periodicalRebalanceExecutor
+        .awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)) {
+      _periodicalRebalanceExecutor.shutdownNow();
+    }
+
     shutdownOnDemandTimer();
     logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name());
     shutdownPipeline(_eventThread, _eventQueue);

Reply via email to