mcvsubbu commented on a change in pull request #3622: Start and stop
ControllerPeriodicTasks based on leadership changes
URL: https://github.com/apache/incubator-pinot/pull/3622#discussion_r243463189
##########
File path:
pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
##########
@@ -55,67 +57,90 @@ private static long getRandomInitialDelayInSeconds() {
return MIN_INITIAL_DELAY_IN_SECONDS +
RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
}
+ /**
+ * Reset flags, and call initTask which initializes each individual task
+ */
@Override
- public void init() {
+ public final void init() {
+ _stopPeriodicTask = false;
+ _periodicTaskInProgress = false;
+ initTask();
}
+ /**
+ * Execute the ControllerPeriodicTask.
+ * The _periodicTaskInProgress is enabled at the beginning and disabled
before exiting,
+ * to ensure that we can wait for a task in progress to finish when stop has
been invoked
+ */
@Override
- public void run() {
- if (!isLeader()) {
- skipLeaderTask();
- } else {
- List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
- processLeaderTask(allTableNames);
- }
- }
-
- private void skipLeaderTask() {
- if (_isLeader) {
- LOGGER.info("Current pinot controller lost leadership.");
- _isLeader = false;
- onBecomeNotLeader();
- }
- LOGGER.info("Skip running periodic task: {} on non-leader controller",
_taskName);
- }
+ public final void run() {
+ _periodicTaskInProgress = true;
- private void processLeaderTask(List<String> tables) {
- if (!_isLeader) {
- LOGGER.info("Current pinot controller became leader. Starting {} with
running frequency of {} seconds.",
- _taskName, _intervalInSeconds);
- _isLeader = true;
- onBecomeLeader();
- }
+ List<String> tableNamesWithType =
_pinotHelixResourceManager.getAllTables();
long startTime = System.currentTimeMillis();
- int numTables = tables.size();
+ int numTables = tableNamesWithType.size();
+
LOGGER.info("Start processing {} tables in periodic task: {}", numTables,
_taskName);
- process(tables);
+ process(tableNamesWithType);
LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms",
numTables, _taskName,
(System.currentTimeMillis() - startTime));
- }
- /**
- * Does the following logic when losing the leadership. This should be done
only once during leadership transition.
- */
- public void onBecomeNotLeader() {
+ _periodicTaskInProgress = false;
}
/**
- * Does the following logic when becoming lead controller. This should be
done only once during leadership transition.
+ * Stops the ControllerPeriodicTask by enabling the _stopPeriodicTask flag.
The flag ensures that processing of no new table begins.
+ * This method waits for the in progress ControllerPeriodicTask to finish
the table being processed, until MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS
+ * Finally, it invokes the stopTask for any specific cleanup at the
individual task level
*/
- public void onBecomeLeader() {
+ @Override
+ public final void stop() {
+ _stopPeriodicTask = true;
+
+ LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis =
{}", _taskName,
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
+ long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
+ while (_periodicTaskInProgress && millisToWait > 0) {
+ try {
+ long thisWait = 1000;
+ if (millisToWait < thisWait) {
+ thisWait = millisToWait;
+ }
+ Thread.sleep(thisWait);
+ millisToWait -= thisWait;
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted: Remaining wait time {} (out of {}) for task
{}", millisToWait,
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS, _taskName);
+ break;
+ }
+ }
+ LOGGER.info("Wait completed for task {}. Waited for {} ms.
_periodicTaskInProgress = {}", _taskName,
+ MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS - millisToWait,
_periodicTaskInProgress);
+
+ stopTask();
}
+
/**
* Processes the task on the given tables.
*
- * @param tables List of table names
+ * @param tableNamesWithType List of table names
*/
- protected void process(List<String> tables) {
- preprocess();
- for (String table : tables) {
- processTable(table);
+ protected void process(List<String> tableNamesWithType) {
+ if (!shouldStopPeriodicTask()) {
+ preprocess();
+ for (String tableNameWithType : tableNamesWithType) {
+ if (shouldStopPeriodicTask()) {
+ LOGGER.info("_stopPeriodicTask={}. Skip processing table {} and all
the remaining tables for task {}.",
Review comment:
we shouldn't need to log the value of _stopPeriodicTask here, right? seems
redundant.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]