Jackie-Jiang commented on a change in pull request #3819: Refactor periodic task
URL: https://github.com/apache/incubator-pinot/pull/3819#discussion_r256183740
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 ##########
 @@ -31,152 +31,90 @@
 /**
  * The base periodic task for pinot controller only. It uses 
<code>PinotHelixResourceManager</code> to determine
  * which table resources should be managed by this Pinot controller.
+ *
+ * @param <C> the context type
  */
-public abstract class ControllerPeriodicTask extends BasePeriodicTask {
+@ThreadSafe
+public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
-  private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 
30_000L;
-
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
-  protected final ControllerMetrics _metricsRegistry;
-
-  private volatile boolean _stopPeriodicTask;
-  private volatile boolean _periodicTaskInProgress;
+  protected final ControllerMetrics _controllerMetrics;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, 
long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics 
controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _metricsRegistry = controllerMetrics;
+    _controllerMetrics = controllerMetrics;
   }
 
-  /**
-   * Reset flags, and call initTask which initializes each individual task
-   */
   @Override
-  public final void init() {
-    _stopPeriodicTask = false;
-    _periodicTaskInProgress = false;
-    initTask();
+  protected final void runTask() {
+    processTables(_pinotHelixResourceManager.getAllTables());
   }
 
   /**
-   * Execute the ControllerPeriodicTask.
-   * The _periodicTaskInProgress is enabled at the beginning and disabled 
before exiting,
-   * to ensure that we can wait for a task in progress to finish when stop has 
been invoked
+   * Processes the given list of tables, and returns the number of tables 
processed.
+   * <p>
+   * Override one of this method, {@link #processTable(String)} or {@link 
#processTable(String, C)}.
    */
-  @Override
-  public final void run() {
-    _stopPeriodicTask = false;
-    _periodicTaskInProgress = true;
-
-    List<String> tableNamesWithType = 
_pinotHelixResourceManager.getAllTables();
-    long startTime = System.currentTimeMillis();
+  protected void processTables(List<String> tableNamesWithType) {
     int numTables = tableNamesWithType.size();
-
-    LOGGER.info("Start processing {} tables in periodic task: {}", numTables, 
getTaskName());
-    process(tableNamesWithType);
-    LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", 
numTables, getTaskName(),
-        (System.currentTimeMillis() - startTime));
-
-    _periodicTaskInProgress = false;
-  }
-
-  /**
-   * Stops the ControllerPeriodicTask by enabling the _stopPeriodicTask flag. 
The flag ensures that processing of no new table begins.
-   * This method waits for the in progress ControllerPeriodicTask to finish 
the table being processed, until MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS
-   * Finally, it invokes the stopTask for any specific cleanup at the 
individual task level
-   */
-  @Override
-  public final void stop() {
-    _stopPeriodicTask = true;
-
-    LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = 
{}", getTaskName(),
-        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
-    long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
-    while (_periodicTaskInProgress && millisToWait > 0) {
-      try {
-        long thisWait = 1000;
-        if (millisToWait < thisWait) {
-          thisWait = millisToWait;
-        }
-        Thread.sleep(thisWait);
-        millisToWait -= thisWait;
-      } catch (InterruptedException e) {
-        LOGGER.info("Interrupted: Remaining wait time {} (out of {}) for task 
{}", millisToWait,
-            MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS, getTaskName());
+    LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+    C context = preprocess();
+    int numTablesProcessed = 0;
+    for (String tableNameWithType : tableNamesWithType) {
+      if (!isStarted()) {
+        LOGGER.info("Task: {} is stopped, early terminate the task", 
_taskName);
         break;
       }
-    }
-    LOGGER.info("Wait completed for task {}. Waited for {} ms. 
_periodicTaskInProgress = {}", getTaskName(),
-        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS - millisToWait, 
_periodicTaskInProgress);
-
-    stopTask();
-  }
-
-  /**
-   * Processes the task on the given tables.
-   *
-   * @param tableNamesWithType List of table names
-   */
-  protected void process(List<String> tableNamesWithType) {
-    if (!shouldStopPeriodicTask()) {
-
-      int numTablesProcessed = 0;
-      preprocess();
-
-      for (String tableNameWithType : tableNamesWithType) {
-        if (shouldStopPeriodicTask()) {
-          LOGGER.info("Skip processing table {} and all the remaining tables 
for task {}.", tableNameWithType,
-              getTaskName());
-          break;
-        }
-        try {
-          processTable(tableNameWithType);
-          numTablesProcessed++;
-        } catch (Exception e) {
-          exceptionHandler(tableNameWithType, e);
-        }
+      try {
+        processTable(tableNameWithType, context);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while processing table: {} in task: 
{}", tableNamesWithType, _taskName, e);
       }
-
-      postprocess();
-      _metricsRegistry
-          
.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
getTaskName(), numTablesProcessed);
-    } else {
-      LOGGER.info("Skip processing all tables for task {}", getTaskName());
+      numTablesProcessed++;
     }
+    postprocess(context);
+    _controllerMetrics
+        
.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
_taskName, numTablesProcessed);
+    LOGGER.info("Finish processing {}/{} tables in task: {}", 
numTablesProcessed, numTables, _taskName);
   }
 
   /**
-   * This method runs before processing all tables
+   * Can be override to provide context before processing the tables.
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to