mcvsubbu commented on a change in pull request #3622: Start and stop 
ControllerPeriodicTasks based on leadership changes
URL: https://github.com/apache/incubator-pinot/pull/3622#discussion_r243386081
 
 

 ##########
 File path: 
pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 ##########
 @@ -61,61 +63,61 @@ public void init() {
 
   @Override
   public void run() {
-    if (!isLeader()) {
-      skipLeaderTask();
-    } else {
-      List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
-      processLeaderTask(allTableNames);
-    }
-  }
-
-  private void skipLeaderTask() {
-    if (_isLeader) {
-      LOGGER.info("Current pinot controller lost leadership.");
-      _isLeader = false;
-      onBecomeNotLeader();
-    }
-    LOGGER.info("Skip running periodic task: {} on non-leader controller", 
_taskName);
-  }
-
-  private void processLeaderTask(List<String> tables) {
-    if (!_isLeader) {
-      LOGGER.info("Current pinot controller became leader. Starting {} with 
running frequency of {} seconds.",
-          _taskName, _intervalInSeconds);
-      _isLeader = true;
-      onBecomeLeader();
-    }
+    _periodicTaskInProgress = true;
+    List<String> tableNamesWithType = 
_pinotHelixResourceManager.getAllTables();
     long startTime = System.currentTimeMillis();
-    int numTables = tables.size();
+    int numTables = tableNamesWithType.size();
     LOGGER.info("Start processing {} tables in periodic task: {}", numTables, 
_taskName);
-    process(tables);
+    process(tableNamesWithType);
     LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", 
numTables, _taskName,
         (System.currentTimeMillis() - startTime));
+    _periodicTaskInProgress = false;
   }
 
-  /**
-   * Does the following logic when losing the leadership. This should be done 
only once during leadership transition.
-   */
-  public void onBecomeNotLeader() {
-  }
 
-  /**
-   * Does the following logic when becoming lead controller. This should be 
done only once during leadership transition.
-   */
-  public void onBecomeLeader() {
+  @Override
+  public void stop() {
+    _stopPeriodicTask = true;
+
+    LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = 
{}", _taskName,
+        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+    long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+    while (_periodicTaskInProgress && millisToWait > 0) {
+      try {
+        long thisWait = 1000;
+        if (millisToWait < thisWait) {
+          thisWait = millisToWait;
+        }
+        Thread.sleep(thisWait);
+        millisToWait -= thisWait;
+      } catch (InterruptedException e) {
+        LOGGER.info("Interrupted: Remaining wait time {} (out of {})", 
millisToWait,
+            MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+        break;
+      }
+    }
+    LOGGER.info("Wait completed. _periodicTaskInProgress = {}", 
_periodicTaskInProgress);
 
 Review comment:
   All logs should include task name. Also, we should log the amount of wait 
time here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to