This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fb6371  Refactor periodic task (#3819)
2fb6371 is described below

commit 2fb63714dec6bc18d2c7fe77cdf82dd88d113440
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Feb 12 17:34:33 2019 -0800

    Refactor periodic task (#3819)
    
    Refactor periodic task to fix the following issues:
    - PinotTaskManager.scheduleTasks() has no effect on non-leader controller, 
but returns result from previous run
      The reason for this is that, we store states for each run as member 
variables in the periodic task, which mixed the concept of task and run
    - After calling stop() (lose leadership), when re-gaining leadership, no 
start() method is called to set up the environment
    - Potential race condition between start() and stop()
    
    Replace init() with start() and call it whenever the leadership is gained
    Move the basic control methods for periodic task (start(), run(), stop()) 
into BasePeriodicTask
    Keep table level methods in ControllerPeriodicTask
    For per-run states, add context generic type to pass them between 
processing multiple tables
---
 .../pinot/common/metrics/ControllerMeter.java      |   2 +
 .../controller/helix/SegmentStatusChecker.java     |  93 ++++++------
 .../helix/core/minion/PinotTaskManager.java        | 123 +++++++--------
 .../core/periodictask/ControllerPeriodicTask.java  | 167 +++++++--------------
 .../core/relocation/RealtimeSegmentRelocator.java  |  29 +---
 .../helix/core/retention/RetentionManager.java     |  21 +--
 .../BrokerResourceValidationManager.java           |  32 ++--
 .../validation/OfflineSegmentIntervalChecker.java  |  22 +--
 .../RealtimeSegmentValidationManager.java          |  34 ++---
 .../controller/helix/SegmentStatusCheckerTest.java |  20 +--
 .../periodictask/ControllerPeriodicTaskTest.java   | 114 ++++++--------
 .../helix/core/retention/RetentionManagerTest.java |   4 +-
 .../pinot/core/periodictask/BasePeriodicTask.java  | 144 +++++++++++++++++-
 .../pinot/core/periodictask/PeriodicTask.java      |  29 +++-
 .../core/periodictask/PeriodicTaskScheduler.java   |   9 +-
 .../periodictask/PeriodicTaskSchedulerTest.java    |  34 ++---
 .../tests/SegmentCompletionIntegrationTests.java   |   2 +-
 17 files changed, 431 insertions(+), 448 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 9d522bc..9f9299e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -56,6 +56,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   // Introducing a new stream agnostic metric to replace LLC_KAFKA_DATA_LOSS.
   // We can phase out LLC_KAFKA_DATA_LOSS once we have collected sufficient 
metrics for the new one
   LLC_STREAM_DATA_LOSS("dataLoss", false),
+  CONTROLLER_PERIODIC_TASK_RUN("periodicTaskRun", false),
+  CONTROLLER_PERIODIC_TASK_ERROR("periodicTaskError", false),
   NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
   NUMBER_TASKS_SUBMITTED("tasks", false),
   NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index cbe763c..f40e2b3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * Manages the segment status metrics, regarding tables with fewer replicas 
than requested
  * and segments in error state.
  */
-public class SegmentStatusChecker extends ControllerPeriodicTask {
+public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusChecker.Context> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentStatusChecker.class);
   private static final int MaxOfflineSegmentsToLog = 5;
   public static final String ONLINE = "ONLINE";
@@ -50,10 +50,6 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
   // log messages about disabled tables atmost once a day
   private static final long DISABLED_TABLE_LOG_INTERVAL_MS = 
TimeUnit.DAYS.toMillis(1);
   private long _lastDisabledTableLogTimestamp = 0;
-  private boolean _logDisabledTables;
-  private int _realTimeTableCount;
-  private int _offlineTableCount;
-  private int _disabledTableCount;
 
   /**
    * Constructs the segment status checker.
@@ -69,58 +65,50 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void initTask() {
+  protected void setUpTask() {
     LOGGER.info("Initializing table metrics for all the tables.");
     setStatusToDefault();
   }
 
   @Override
-  protected void preprocess() {
-    _realTimeTableCount = 0;
-    _offlineTableCount = 0;
-    _disabledTableCount = 0;
-
+  protected Context preprocess() {
+    Context context = new Context();
     // check if we need to log disabled tables log messages
     long now = System.currentTimeMillis();
     if (now - _lastDisabledTableLogTimestamp >= 
DISABLED_TABLE_LOG_INTERVAL_MS) {
-      _logDisabledTables = true;
+      context._logDisabledTables = true;
       _lastDisabledTableLogTimestamp = now;
-    } else {
-      _logDisabledTables = false;
     }
+    return context;
   }
 
   @Override
-  protected void processTable(String tableNameWithType) {
-    updateSegmentMetrics(tableNameWithType);
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Caught exception while updating segment status for table 
{}", tableNameWithType, e);
-    // Remove the metric for this table
-    resetTableMetrics(tableNameWithType);
+  protected void processTable(String tableNameWithType, Context context) {
+    try {
+      updateSegmentMetrics(tableNameWithType, context);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while updating segment status for table 
{}", tableNameWithType, e);
+      // Remove the metric for this table
+      resetTableMetrics(tableNameWithType);
+    }
   }
 
   @Override
-  protected void postprocess() {
-    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, 
_realTimeTableCount);
-    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, 
_offlineTableCount);
-    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, 
_disabledTableCount);
+  protected void postprocess(Context context) {
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, 
context._realTimeTableCount);
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, 
context._offlineTableCount);
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, 
context._disabledTableCount);
   }
 
   /**
    * Runs a segment status pass over the given table.
    * TODO: revisit the logic and reduce the ZK access
-   *
-   * @param tableNameWithType
    */
-  private void updateSegmentMetrics(String tableNameWithType) {
-
+  private void updateSegmentMetrics(String tableNameWithType, Context context) 
{
     if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE) {
-      _offlineTableCount++;
+      context._offlineTableCount++;
     } else {
-      _realTimeTableCount++;
+      context._realTimeTableCount++;
     }
 
     IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
@@ -132,11 +120,11 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
     }
 
     if (!idealState.isEnabled()) {
-      if (_logDisabledTables) {
+      if (context._logDisabledTables) {
         LOGGER.warn("Table {} is disabled. Skipping segment status checks", 
tableNameWithType);
       }
       resetTableMetrics(tableNameWithType);
-      _disabledTableCount++;
+      context._disabledTableCount++;
       return;
     }
 
@@ -147,16 +135,16 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
       } catch (NumberFormatException e) {
         // Ignore
       }
-      _metricsRegistry
+      _controllerMetrics
           .setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS, 100);
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS, 100);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
       return;
     }
 
-    _metricsRegistry
+    _controllerMetrics
         .setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
-    _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT,
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT,
         (long) (idealState.getPartitionSet().size()));
     ExternalView externalView = 
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
 
@@ -230,11 +218,11 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
       nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
     }
     // Synchronization provided by Controller Gauge to make sure that only one 
thread updates the gauge
-    _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
-    _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS,
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS,
         (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / 
nReplicasIdealMax) : 100);
-    _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
-    _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
         (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
     if (nOffline > 0) {
       LOGGER.warn("Table {} has {} segments with no online replicas", 
tableNameWithType, nOffline);
@@ -254,15 +242,22 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
   }
 
   private void resetTableMetrics(String tableName) {
-    _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS, Long.MIN_VALUE);
-    _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_OF_REPLICAS, Long.MIN_VALUE);
-    _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
-    _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
+    _controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS, Long.MIN_VALUE);
+    _controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_OF_REPLICAS, Long.MIN_VALUE);
+    _controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
+    _controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
   }
 
   @Override
-  public void stopTask() {
+  public void cleanUpTask() {
     LOGGER.info("Resetting table metrics for all the tables.");
     setStatusToDefault();
   }
+
+  public static final class Context {
+    private boolean _logDisabledTables;
+    private int _realTimeTableCount;
+    private int _offlineTableCount;
+    private int _disabledTableCount;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index a674fe0..6bb5c9d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nonnull;
 import org.apache.pinot.common.config.PinotTaskConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableTaskConfig;
@@ -44,21 +43,16 @@ import org.slf4j.LoggerFactory;
  * <p><code>PinotTaskManager</code> is also responsible for checking the 
health status on each type of tasks, detect and
  * fix issues accordingly.
  */
-public class PinotTaskManager extends ControllerPeriodicTask {
+public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTaskManager.class);
 
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoProvider _clusterInfoProvider;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
-  private Map<String, List<TableConfig>> _enabledTableConfigMap;
-  private Set<String> _taskTypes;
-  private int _numTaskTypes;
-  private Map<String, String> _tasksScheduled;
-
-  public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager 
helixTaskResourceManager,
-      @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull 
ControllerConf controllerConf,
-      @Nonnull ControllerMetrics controllerMetrics) {
+  public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
+      PinotHelixResourceManager helixResourceManager, ControllerConf 
controllerConf,
+      ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(),
         controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
helixResourceManager, controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
@@ -66,110 +60,105 @@ public class PinotTaskManager extends 
ControllerPeriodicTask {
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
   }
 
-  @Override
-  protected void initTask() {
-
-  }
-
   /**
-   * Get the cluster info provider.
-   * <p>Cluster info provider might be needed to initialize task generators.
+   * Returns the cluster info provider.
+   * <p>
+   * Cluster info provider might be useful when initializing task generators.
    *
    * @return Cluster info provider
    */
-  @Nonnull
   public ClusterInfoProvider getClusterInfoProvider() {
     return _clusterInfoProvider;
   }
 
   /**
-   * Register a task generator.
-   * <p>This is for pluggable task generators.
+   * Registers a task generator.
+   * <p>
+   * This method can be used to plug in custom task generators.
    *
    * @param pinotTaskGenerator Task generator to be registered
    */
-  public void registerTaskGenerator(@Nonnull PinotTaskGenerator 
pinotTaskGenerator) {
+  public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
     _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
   }
 
   /**
    * Public API to schedule tasks. It doesn't matter whether current pinot 
controller is leader.
    */
-  public Map<String, String> scheduleTasks() {
-    process(_pinotHelixResourceManager.getAllTables());
-    return getTasksScheduled();
+  public synchronized Map<String, String> scheduleTasks() {
+    Map<String, String> tasksScheduled = 
scheduleTasks(_pinotHelixResourceManager.getAllTables());
+
+    // NOTE: this method might be called from the Rest API instead of the 
periodic task scheduler on non-leader
+    // controllers, so if the task is stopped (non-leader controller), clean 
up the task
+    if (!isStarted()) {
+      cleanUpTask();
+    }
+
+    return tasksScheduled;
   }
 
-  @Override
-  protected void preprocess() {
-    
_metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
+  /**
+   * Check the Pinot cluster status and schedule new tasks for the given 
tables.
+   *
+   * @param tableNamesWithType List of table names with type suffix
+   * @return Map from task type to task scheduled
+   */
+  private Map<String, String> scheduleTasks(List<String> tableNamesWithType) {
+    
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
-    _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
-    _numTaskTypes = _taskTypes.size();
-    _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
+    Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+    int numTaskTypes = taskTypes.size();
+    Map<String, List<TableConfig>> enabledTableConfigMap = new 
HashMap<>(numTaskTypes);
 
-    for (String taskType : _taskTypes) {
-      _enabledTableConfigMap.put(taskType, new ArrayList<>());
+    for (String taskType : taskTypes) {
+      enabledTableConfigMap.put(taskType, new ArrayList<>());
 
       // Ensure all task queues exist
       _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     }
-  }
 
-  @Override
-  protected void processTable(String tableNameWithType) {
-    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    if (tableConfig != null) {
-      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-      if (taskConfig != null) {
-        for (String taskType : _taskTypes) {
-          if (taskConfig.isTaskTypeEnabled(taskType)) {
-            _enabledTableConfigMap.get(taskType).add(tableConfig);
+    // Scan all table configs to get the tables with tasks enabled
+    for (String tableNameWithType : tableNamesWithType) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig != null) {
+        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+        if (taskConfig != null) {
+          for (String taskType : taskTypes) {
+            if (taskConfig.isTaskTypeEnabled(taskType)) {
+              enabledTableConfigMap.get(taskType).add(tableConfig);
+            }
           }
         }
       }
     }
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Exception in PinotTaskManager for table {}", 
tableNameWithType, e);
-  }
 
-  @Override
-  protected void postprocess() {
     // Generate each type of tasks
-    _tasksScheduled = new HashMap<>(_numTaskTypes);
-    for (String taskType : _taskTypes) {
+    Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
+    for (String taskType : taskTypes) {
       LOGGER.info("Generating tasks for task type: {}", taskType);
       PinotTaskGenerator pinotTaskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-      List<PinotTaskConfig> pinotTaskConfigs = 
pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
+      List<PinotTaskConfig> pinotTaskConfigs = 
pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
       int numTasks = pinotTaskConfigs.size();
       if (numTasks > 0) {
         LOGGER
             .info("Submitting {} tasks for task type: {} with task configs: 
{}", numTasks, taskType, pinotTaskConfigs);
-        _tasksScheduled.put(taskType, _helixTaskResourceManager
+        tasksScheduled.put(taskType, _helixTaskResourceManager
             .submitTask(pinotTaskConfigs, 
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
-        _metricsRegistry.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+        _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
       }
     }
+
+    return tasksScheduled;
   }
 
-  /**
-   * Returns the tasks that have been scheduled as part of the postprocess
-   * @return
-   */
-  private Map<String, String> getTasksScheduled() {
-    return _tasksScheduled;
+  @Override
+  protected void processTables(List<String> tableNamesWithType) {
+    scheduleTasks(tableNamesWithType);
   }
 
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller 
leadership changes.
-   */
   @Override
-  public void stopTask() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
+  public void cleanUpTask() {
+    LOGGER.info("Cleaning up all task generators");
     for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
       _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 8d15d1e..7e764f3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pinot.controller.helix.core.periodictask;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
@@ -31,152 +32,96 @@ import org.slf4j.LoggerFactory;
 /**
  * The base periodic task for pinot controller only. It uses 
<code>PinotHelixResourceManager</code> to determine
  * which table resources should be managed by this Pinot controller.
+ *
+ * @param <C> the context type
  */
-public abstract class ControllerPeriodicTask extends BasePeriodicTask {
+@ThreadSafe
+public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
-  private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 
30_000L;
-
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
-  protected final ControllerMetrics _metricsRegistry;
-
-  private volatile boolean _stopPeriodicTask;
-  private volatile boolean _periodicTaskInProgress;
+  protected final ControllerMetrics _controllerMetrics;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, 
long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics 
controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _metricsRegistry = controllerMetrics;
+    _controllerMetrics = controllerMetrics;
   }
 
-  /**
-   * Reset flags, and call initTask which initializes each individual task
-   */
   @Override
-  public final void init() {
-    _stopPeriodicTask = false;
-    _periodicTaskInProgress = false;
-    initTask();
+  protected final void runTask() {
+    _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
+    try {
+      processTables(_pinotHelixResourceManager.getAllTables());
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while running task: {}", _taskName, e);
+      _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
+    }
   }
 
   /**
-   * Execute the ControllerPeriodicTask.
-   * The _periodicTaskInProgress is enabled at the beginning and disabled 
before exiting,
-   * to ensure that we can wait for a task in progress to finish when stop has 
been invoked
+   * Processes the given list of tables, and returns the number of tables 
processed.
+   * <p>
+   * Override one of this method, {@link #processTable(String)} or {@link 
#processTable(String, C)}.
    */
-  @Override
-  public final void run() {
-    _stopPeriodicTask = false;
-    _periodicTaskInProgress = true;
-
-    List<String> tableNamesWithType = 
_pinotHelixResourceManager.getAllTables();
-    long startTime = System.currentTimeMillis();
+  protected void processTables(List<String> tableNamesWithType) {
     int numTables = tableNamesWithType.size();
-
-    LOGGER.info("Start processing {} tables in periodic task: {}", numTables, 
getTaskName());
-    process(tableNamesWithType);
-    LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", 
numTables, getTaskName(),
-        (System.currentTimeMillis() - startTime));
-
-    _periodicTaskInProgress = false;
-  }
-
-  /**
-   * Stops the ControllerPeriodicTask by enabling the _stopPeriodicTask flag. 
The flag ensures that processing of no new table begins.
-   * This method waits for the in progress ControllerPeriodicTask to finish 
the table being processed, until MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS
-   * Finally, it invokes the stopTask for any specific cleanup at the 
individual task level
-   */
-  @Override
-  public final void stop() {
-    _stopPeriodicTask = true;
-
-    LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = 
{}", getTaskName(),
-        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
-    long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
-    while (_periodicTaskInProgress && millisToWait > 0) {
-      try {
-        long thisWait = 1000;
-        if (millisToWait < thisWait) {
-          thisWait = millisToWait;
-        }
-        Thread.sleep(thisWait);
-        millisToWait -= thisWait;
-      } catch (InterruptedException e) {
-        LOGGER.info("Interrupted: Remaining wait time {} (out of {}) for task 
{}", millisToWait,
-            MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS, getTaskName());
+    LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+    C context = preprocess();
+    int numTablesProcessed = 0;
+    for (String tableNameWithType : tableNamesWithType) {
+      if (!isStarted()) {
+        LOGGER.info("Task: {} is stopped, early terminate the task", 
_taskName);
         break;
       }
-    }
-    LOGGER.info("Wait completed for task {}. Waited for {} ms. 
_periodicTaskInProgress = {}", getTaskName(),
-        MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS - millisToWait, 
_periodicTaskInProgress);
-
-    stopTask();
-  }
-
-  /**
-   * Processes the task on the given tables.
-   *
-   * @param tableNamesWithType List of table names
-   */
-  protected void process(List<String> tableNamesWithType) {
-    if (!shouldStopPeriodicTask()) {
-
-      int numTablesProcessed = 0;
-      preprocess();
-
-      for (String tableNameWithType : tableNamesWithType) {
-        if (shouldStopPeriodicTask()) {
-          LOGGER.info("Skip processing table {} and all the remaining tables 
for task {}.", tableNameWithType,
-              getTaskName());
-          break;
-        }
-        try {
-          processTable(tableNameWithType);
-          numTablesProcessed++;
-        } catch (Exception e) {
-          exceptionHandler(tableNameWithType, e);
-        }
+      try {
+        processTable(tableNameWithType, context);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while processing table: {} in task: 
{}", tableNamesWithType, _taskName, e);
       }
-
-      postprocess();
-      _metricsRegistry
-          
.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
getTaskName(), numTablesProcessed);
-    } else {
-      LOGGER.info("Skip processing all tables for task {}", getTaskName());
+      numTablesProcessed++;
     }
+    postprocess(context);
+    _controllerMetrics
+        
.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
_taskName, numTablesProcessed);
+    LOGGER.info("Finish processing {}/{} tables in task: {}", 
numTablesProcessed, numTables, _taskName);
   }
 
   /**
-   * This method runs before processing all tables
+   * Can be overridden to provide context before processing the tables.
    */
-  protected abstract void preprocess();
+  protected C preprocess() {
+    return null;
+  }
 
   /**
-   * Execute the controller periodic task for the given table
-   * @param tableNameWithType
+   * Processes the given table.
+   * <p>
+   * Override one of this method, {@link #processTable(String)} or {@link 
#processTables(List)}.
    */
-  protected abstract void processTable(String tableNameWithType);
+  protected void processTable(String tableNameWithType, C context) {
+    processTable(tableNameWithType);
+  }
 
   /**
-   * This method runs after processing all tables
+   * Processes the given table.
+   * <p>
+   * Override one of this method, {@link #processTable(String, C)} or {@link 
#processTables(List)}.
    */
-  protected abstract void postprocess();
-
-  protected abstract void exceptionHandler(String tableNameWithType, Exception 
e);
-
-  @VisibleForTesting
-  protected boolean shouldStopPeriodicTask() {
-    return _stopPeriodicTask;
+  protected void processTable(String tableNameWithType) {
   }
 
   /**
-   * Initialize the ControllerPeriodicTask, to be defined by each individual 
task
+   * Can be overridden to perform cleanups after processing the tables.
    */
-  protected abstract void initTask();
+  protected void postprocess(C context) {
+    postprocess();
+  }
 
   /**
-   * Perform cleanup for the ControllerPeriodicTask, to be defined by each 
individual task
+   * Can be overridden to perform cleanups after processing the tables.
    */
-  protected abstract void stopTask();
+  protected void postprocess() {
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 203b245..0a9e352 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -34,7 +34,6 @@ import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.time.TimeUtils;
@@ -53,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * We only relocate segments for realtime tables, and only if tenant config 
indicates that relocation is required
  * A segment will be relocated, one replica at a time, once all of its 
replicas are in ONLINE state and all/some are on servers other than completed 
servers
  */
-public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
+public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
 
   public RealtimeSegmentRelocator(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
@@ -63,31 +62,12 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
   }
 
   @Override
-  protected void initTask() {
-
-  }
-
-  @Override
-  protected void preprocess() {
-  }
-
-  @Override
   protected void processTable(String tableNameWithType) {
-    CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+    if (TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
       runRelocation(tableNameWithType);
     }
   }
 
-  @Override
-  protected void postprocess() {
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Exception in relocating realtime segments of table {}", 
tableNameWithType, e);
-  }
-
   /**
    * Check the given table. Perform relocation of segments if table is 
realtime and relocation is required
    * TODO: Model this to implement {@link 
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} 
interface
@@ -277,9 +257,4 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
     }
     return seconds;
   }
-
-  @Override
-  public void stopTask() {
-
-  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b8059e9..b3abf07 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
  * The <code>RetentionManager</code> class manages retention for all segments 
and delete expired segments.
  * <p>It is scheduled to run only on leader controller.
  */
-public class RetentionManager extends ControllerPeriodicTask {
+public class RetentionManager extends ControllerPeriodicTask<Void> {
   public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(5L);
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManager.class);
@@ -65,15 +65,6 @@ public class RetentionManager extends ControllerPeriodicTask 
{
   }
 
   @Override
-  protected void initTask() {
-
-  }
-
-  @Override
-  protected void preprocess() {
-  }
-
-  @Override
   protected void processTable(String tableNameWithType) {
     LOGGER.info("Start managing retention for table: {}", tableNameWithType);
     manageRetentionForTable(tableNameWithType);
@@ -85,11 +76,6 @@ public class RetentionManager extends ControllerPeriodicTask 
{
     
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
   }
 
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Caught exception while managing retention for table: {}", 
tableNameWithType, e);
-  }
-
   private void manageRetentionForTable(String tableNameWithType) {
 
     // Build retention strategy from table config
@@ -190,9 +176,4 @@ public class RetentionManager extends 
ControllerPeriodicTask {
           
.contains(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
     }
   }
-
-  @Override
-  public void stopTask() {
-
-  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 85dd917..8f7e1c0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -33,11 +33,9 @@ import org.slf4j.LoggerFactory;
 /**
  * Rebuilds the broker resource if the instance set has changed
  */
-public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+public class BrokerResourceValidationManager extends 
ControllerPeriodicTask<BrokerResourceValidationManager.Context> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerResourceValidationManager.class);
 
-  private List<InstanceConfig> _instanceConfigs;
-
   public BrokerResourceValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       ControllerMetrics controllerMetrics) {
     super("BrokerResourceValidationManager", 
config.getBrokerResourceValidationFrequencyInSeconds(),
@@ -45,12 +43,14 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  protected void preprocess() {
-    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  protected Context preprocess() {
+    Context context = new Context();
+    context._instanceConfigs = 
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
+    return context;
   }
 
   @Override
-  protected void processTable(String tableNameWithType) {
+  protected void processTable(String tableNameWithType, Context context) {
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
     if (tableConfig == null) {
       LOGGER.warn("Failed to find table config for table: {}, skipping broker 
resource validation", tableNameWithType);
@@ -59,25 +59,11 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask {
 
     // Rebuild broker resource
     Set<String> brokerInstances = _pinotHelixResourceManager
-        .getAllInstancesForBrokerTenant(_instanceConfigs, 
tableConfig.getTenantConfig().getBroker());
+        .getAllInstancesForBrokerTenant(context._instanceConfigs, 
tableConfig.getTenantConfig().getBroker());
     _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
   }
 
-  @Override
-  protected void postprocess() {
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Caught exception while validating broker resource for table: 
{}", tableNameWithType, e);
-  }
-
-  @Override
-  protected void initTask() {
-
-  }
-
-  @Override
-  public void stopTask() {
+  public static final class Context {
+    private List<InstanceConfig> _instanceConfigs;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 3577813..2646ddd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
  * Manages the segment validation metrics, to ensure that all offline segments 
are contiguous (no missing segments) and
  * that the offline push delay isn't too high.
  */
-public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
   private final ValidationMetrics _validationMetrics;
@@ -56,10 +56,6 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask {
   }
 
   @Override
-  protected void preprocess() {
-  }
-
-  @Override
   protected void processTable(String tableNameWithType) {
     CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
@@ -206,21 +202,7 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask {
   }
 
   @Override
-  protected void postprocess() {
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.warn("Caught exception while checking offline segment intervals for 
table: {}", tableNameWithType, e);
-  }
-
-  @Override
-  protected void initTask() {
-
-  }
-
-  @Override
-  public void stopTask() {
+  public void cleanUpTask() {
     LOGGER.info("Unregister all the validation metrics.");
     _validationMetrics.unregisterAllMetrics();
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index a43d63b..6c0b59f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Validates realtime ideal states and segment metadata, fixing any partitions 
which have stopped consuming
  */
-public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<RealtimeSegmentValidationManager.Context> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
 
   private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
@@ -51,7 +51,6 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask {
 
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
-  private boolean _updateRealtimeDocumentCount;
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, 
ValidationMetrics validationMetrics,
@@ -66,20 +65,21 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  protected void preprocess() {
+  protected Context preprocess() {
+    Context context = new Context();
     // Update realtime document counts only if certain time has passed after 
previous run
-    _updateRealtimeDocumentCount = false;
     long currentTimeMs = System.currentTimeMillis();
     if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - 
_lastUpdateRealtimeDocumentCountTimeMs)
         >= _segmentLevelValidationIntervalInSeconds) {
       LOGGER.info("Run segment-level validation");
-      _updateRealtimeDocumentCount = true;
+      context._updateRealtimeDocumentCount = true;
       _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
     }
+    return context;
   }
 
   @Override
-  protected void processTable(String tableNameWithType) {
+  protected void processTable(String tableNameWithType, Context context) {
     CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     if (tableType == CommonConstants.Helix.TableType.REALTIME) {
 
@@ -89,7 +89,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask {
         return;
       }
 
-      if (_updateRealtimeDocumentCount) {
+      if (context._updateRealtimeDocumentCount) {
         updateRealtimeDocumentCount(tableConfig);
       }
 
@@ -148,22 +148,12 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  protected void postprocess() {
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Caught exception while validating realtime table: {}", 
tableNameWithType, e);
-  }
-
-  @Override
-  protected void initTask() {
-
-  }
-
-  @Override
-  public void stopTask() {
+  public void cleanUpTask() {
     LOGGER.info("Unregister all the validation metrics.");
     _validationMetrics.unregisterAllMetrics();
   }
+
+  public static final class Context {
+    private boolean _updateRealtimeDocumentCount;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 11b99ec..c235483 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -87,7 +87,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
         controllerMetrics.getValueOfTableGauge(externalView.getId(), 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
@@ -149,7 +149,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
         controllerMetrics.getValueOfTableGauge(externalView.getId(), 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -225,7 +225,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
         controllerMetrics.getValueOfTableGauge(externalView.getId(), 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
@@ -267,7 +267,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS), 0);
@@ -294,7 +294,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
         Long.MIN_VALUE);
@@ -352,7 +352,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
         controllerMetrics.getValueOfTableGauge(externalView.getId(), 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -393,7 +393,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS), 1);
@@ -435,7 +435,7 @@ public class SegmentStatusCheckerTest {
     // verify state before test
     
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 0);
     // update metrics
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 1);
   }
@@ -469,7 +469,7 @@ public class SegmentStatusCheckerTest {
     // verify state before test
     
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 0);
     // update metrics
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 1);
   }
@@ -511,7 +511,7 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
+    segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
         Long.MIN_VALUE);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index b3c974b..a928922 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -44,30 +44,30 @@ public class ControllerPeriodicTaskTest {
 
   private final PinotHelixResourceManager _resourceManager = 
mock(PinotHelixResourceManager.class);
   private final ControllerMetrics _controllerMetrics = new 
ControllerMetrics(new MetricsRegistry());
+  private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
-  private final AtomicBoolean _initTaskCalled = new AtomicBoolean();
-  private final AtomicBoolean _processCalled = new AtomicBoolean();
+  private final AtomicBoolean _processTablesCalled = new AtomicBoolean();
   private final AtomicInteger _tablesProcessed = new AtomicInteger();
   private final int _numTables = 3;
   private static final String TASK_NAME = "TestTask";
 
-  private final MockControllerPeriodicTask _task = new 
MockControllerPeriodicTask(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
+  private final ControllerPeriodicTask _task = new 
ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
       _controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
_resourceManager, _controllerMetrics) {
 
     @Override
-    protected void initTask() {
-      _initTaskCalled.set(true);
+    protected void setUpTask() {
+      _startTaskCalled.set(true);
     }
 
     @Override
-    public void stopTask() {
+    public void cleanUpTask() {
       _stopTaskCalled.set(true);
     }
 
     @Override
-    public void process(List<String> tableNamesWithType) {
-      _processCalled.set(true);
-      super.process(tableNamesWithType);
+    public void processTables(List<String> tableNamesWithType) {
+      _processTablesCalled.set(true);
+      super.processTables(tableNamesWithType);
     }
 
     @Override
@@ -84,9 +84,9 @@ public class ControllerPeriodicTaskTest {
   }
 
   private void resetState() {
-    _initTaskCalled.set(false);
+    _startTaskCalled.set(false);
     _stopTaskCalled.set(false);
-    _processCalled.set(false);
+    _processTablesCalled.set(false);
     _tablesProcessed.set(0);
     
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME, 0);
   }
@@ -103,86 +103,72 @@ public class ControllerPeriodicTaskTest {
 
   @Test
   public void testControllerPeriodicTaskCalls() {
-    // Initial state
+    // Start periodic task - leadership gained
     resetState();
-    _task.init();
-    assertTrue(_initTaskCalled.get());
-    assertFalse(_processCalled.get());
+    _task.start();
+    assertTrue(_startTaskCalled.get());
+    assertFalse(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), 0);
     assertFalse(_stopTaskCalled.get());
-    assertFalse(_task.shouldStopPeriodicTask());
+    assertTrue(_task.isStarted());
     assertEquals(
         
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME), 0);
 
-    // run task - leadership gained
+    // Run periodic task with leadership
     resetState();
     _task.run();
-    assertFalse(_initTaskCalled.get());
-    assertTrue(_processCalled.get());
+    assertFalse(_startTaskCalled.get());
+    assertTrue(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), _numTables);
     assertEquals(
         
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME),
         _numTables);
     assertFalse(_stopTaskCalled.get());
-    assertFalse(_task.shouldStopPeriodicTask());
+    assertTrue(_task.isStarted());
 
-    // stop periodic task - leadership lost
+    // Stop periodic task - leadership lost
     resetState();
     _task.stop();
-    assertFalse(_initTaskCalled.get());
-    assertFalse(_processCalled.get());
+    assertFalse(_startTaskCalled.get());
+    assertFalse(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), 0);
     assertEquals(
         
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME), 0);
     assertTrue(_stopTaskCalled.get());
-    assertTrue(_task.shouldStopPeriodicTask());
+    assertFalse(_task.isStarted());
 
-    // call to run after periodic task stop invoked - leadership gained back 
on same controller
+    // Run periodic task without leadership
     resetState();
     _task.run();
-    assertFalse(_task.shouldStopPeriodicTask());
-    assertFalse(_initTaskCalled.get());
-    assertTrue(_processCalled.get());
+    assertFalse(_startTaskCalled.get());
+    assertFalse(_processTablesCalled.get());
+    assertEquals(_tablesProcessed.get(), 0);
+    assertEquals(
+        
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME), 0);
+    assertFalse(_stopTaskCalled.get());
+    assertFalse(_task.isStarted());
+
+    // Restart periodic task - leadership re-gained
+    resetState();
+    _task.start();
+    assertTrue(_startTaskCalled.get());
+    assertFalse(_processTablesCalled.get());
+    assertEquals(_tablesProcessed.get(), 0);
+    assertFalse(_stopTaskCalled.get());
+    assertTrue(_task.isStarted());
+    assertEquals(
+        
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME), 0);
+
+    // Run periodic task with leadership
+    resetState();
+    _task.run();
+    assertFalse(_startTaskCalled.get());
+    assertTrue(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), _numTables);
     assertEquals(
         
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 TASK_NAME),
         _numTables);
     assertFalse(_stopTaskCalled.get());
-  }
-
-  private class MockControllerPeriodicTask extends ControllerPeriodicTask {
-
-    public MockControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds, long initialDelayInSeconds,
-        PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics 
controllerMetrics) {
-      super(taskName, runFrequencyInSeconds, initialDelayInSeconds, 
pinotHelixResourceManager, controllerMetrics);
-    }
-
-    @Override
-    protected void initTask() {
-
-    }
-
-    @Override
-    protected void preprocess() {
-    }
-
-    @Override
-    protected void processTable(String tableNameWithType) {
-
-    }
-
-    @Override
-    public void postprocess() {
-    }
-
-    @Override
-    public void exceptionHandler(String tableNameWithType, Exception e) {
-
-    }
-
-    @Override
-    public void stopTask() {
-
-    }
+    assertTrue(_task.isStarted());
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 4fc8eca..4f34525 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -92,7 +92,7 @@ public class RetentionManagerTest {
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
     RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
-    retentionManager.init();
+    retentionManager.start();
     retentionManager.run();
 
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
@@ -215,7 +215,7 @@ public class RetentionManagerTest {
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
     RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
-    retentionManager.init();
+    retentionManager.start();
     retentionManager.run();
 
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index f3561bd..eb15aa9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -18,14 +18,28 @@
  */
 package org.apache.pinot.core.periodictask;
 
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
 /**
  * A base class to implement periodic task interface.
  */
+@ThreadSafe
 public abstract class BasePeriodicTask implements PeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BasePeriodicTask.class);
+
+  // Wait for at most 30 seconds while calling stop() for task to terminate
+  private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
   protected final String _taskName;
   protected final long _intervalInSeconds;
   protected final long _initialDelayInSeconds;
 
+  private volatile boolean _started;
+  private volatile boolean _running;
+
   public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long 
initialDelayInSeconds) {
     _taskName = taskName;
     _intervalInSeconds = runFrequencyInSeconds;
@@ -33,6 +47,11 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
   }
 
   @Override
+  public String getTaskName() {
+    return _taskName;
+  }
+
+  @Override
   public long getIntervalInSeconds() {
     return _intervalInSeconds;
   }
@@ -42,9 +61,130 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
     return _initialDelayInSeconds;
   }
 
+  /**
+   * Returns the status of the {@code started} flag. This flag will be set 
after calling {@link #start()}, and reset
+   * after calling {@link #stop()}.
+   */
+  public final boolean isStarted() {
+    return _started;
+  }
+
+  /**
+   * Returns the status of the {@code running} flag. This flag will be set 
during the task execution.
+   */
+  public final boolean isRunning() {
+    return _running;
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * This method sets {@code started} flag to true.
+   */
   @Override
-  public String getTaskName() {
-    return _taskName;
+  public final synchronized void start() {
+    if (_started) {
+      LOGGER.warn("Task: {} is already started", _taskName);
+      return;
+    }
+    _started = true;
+
+    try {
+      setUpTask();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
+    }
+  }
+
+  /**
+   * Can be overridden for extra task setups. This method will be called when 
the periodic task starts.
+   * <p>
+   * Possible setups include adding or resetting the metric values.
+   */
+  protected void setUpTask() {
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * During the task execution, the {@code running} flag will be set.
+   */
+  @Override
+  public final void run() {
+    _running = true;
+
+    if (_started) {
+      long startTime = System.currentTimeMillis();
+      LOGGER.info("Start running task: {}", _taskName);
+      try {
+        runTask();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while running task: {}", _taskName, e);
+      }
+      LOGGER.info("Finish running task: {} in {}ms", _taskName, 
System.currentTimeMillis() - startTime);
+    } else {
+      LOGGER.warn("Task: {} is skipped because it is not started or already 
stopped", _taskName);
+    }
+
+    _running = false;
+  }
+
+  /**
+   * Executes the task. This method should early terminate if {@code started} 
flag is set to false by {@link #stop()}
+   * during execution.
+   */
+  protected abstract void runTask();
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * This method sets {@code started} flag to false. If the task is running, 
this method will block for at most 30
+   * seconds until the task finishes.
+   */
+  @Override
+  public final synchronized void stop() {
+    if (!_started) {
+      LOGGER.warn("Task: {} is not started", _taskName);
+      return;
+    }
+    _started = false;
+
+    if (_running) {
+      long startTimeMs = System.currentTimeMillis();
+      long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
+      LOGGER.info("Task: {} is running, wait for at most {}ms for it to 
finish", _taskName, remainingTimeMs);
+      while (_running && remainingTimeMs > 0L) {
+        long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
+        remainingTimeMs -= sleepTimeMs;
+        try {
+          Thread.sleep(sleepTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+      long waitTimeMs = System.currentTimeMillis() - startTimeMs;
+      if (_running) {
+        LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+      } else {
+        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+      }
+    }
+
+    try {
+      cleanUpTask();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while cleaning up task: {}", _taskName, 
e);
+    }
+  }
+
+  /**
+   * Can be overridden for extra task cleanups. This method will be called 
when the periodic task stops.
+   * <p>
+   * Possible cleanups include removing or resetting the metric values.
+   */
+  protected void cleanUpTask() {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
index da2952d..ee6c68b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
@@ -18,37 +18,50 @@
  */
 package org.apache.pinot.core.periodictask;
 
+import javax.annotation.concurrent.ThreadSafe;
+
+
 /**
  * An interface to describe the functionality of periodic task. Periodic tasks 
will be added to a list, scheduled
  * and run in the periodic task scheduler with the fixed interval time.
  */
+@ThreadSafe
 public interface PeriodicTask extends Runnable {
 
   /**
-   * Initialize the task before running the task.
+   * Returns the periodic task name.
+   * @return task name.
    */
-  void init();
+  String getTaskName();
 
   /**
-   * Get the interval time of running the same task.
+   * Returns the interval time of running the same task.
    * @return the interval time in seconds.
    */
   long getIntervalInSeconds();
 
   /**
-   * Get the initial delay of the fist run.
+   * Returns the initial delay of the fist run.
    * @return initial delay in seconds.
    */
   long getInitialDelayInSeconds();
 
   /**
-   * Get the periodic task name.
-   * @return task name.
+   * Performs necessary setups and starts the periodic task. Should be called 
before scheduling the periodic task. Can
+   * be called after calling {@link #stop()} to restart the periodic task.
    */
-  String getTaskName();
+  void start();
+
+  /**
+   * Executes the task. This method should be called only after {@link 
#start()} getting called but before
+   * {@link #stop()} getting called.
+   */
+  @Override
+  void run();
 
   /**
-   * Stop the periodic task
+   * Stops the periodic task and performs necessary cleanups. Should be called 
after removing the periodic task from the
+   * scheduler. Should be called after {@link #start()} getting called.
    */
   void stop();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index 054f1f8..f868ecf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -37,8 +37,7 @@ public class PeriodicTaskScheduler {
   private List<PeriodicTask> _tasksWithValidInterval;
 
   /**
-   * Initialize the PeriodicTaskScheduler with list of PeriodicTasks
-   * @param periodicTasks
+   * Initializes the periodic task scheduler with a list of periodic tasks.
    */
   public void init(List<PeriodicTask> periodicTasks) {
     _tasksWithValidInterval = new ArrayList<>();
@@ -46,7 +45,6 @@ public class PeriodicTaskScheduler {
       if (periodicTask.getIntervalInSeconds() > 0) {
         LOGGER.info("Adding periodic task: {}", periodicTask);
         _tasksWithValidInterval.add(periodicTask);
-        periodicTask.init();
       } else {
         LOGGER.info("Skipping periodic task: {}", periodicTask);
       }
@@ -54,7 +52,7 @@ public class PeriodicTaskScheduler {
   }
 
   /**
-   * Start scheduling periodic tasks.
+   * Starts scheduling periodic tasks.
    */
   public synchronized void start() {
     if (_executorService != null) {
@@ -67,6 +65,7 @@ public class PeriodicTaskScheduler {
       LOGGER.info("Starting periodic task scheduler with tasks: {}", 
_tasksWithValidInterval);
       _executorService = 
Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
       for (PeriodicTask periodicTask : _tasksWithValidInterval) {
+        periodicTask.start();
         _executorService.scheduleWithFixedDelay(() -> {
           try {
             LOGGER.info("Starting {} with running frequency of {} seconds.", 
periodicTask.getTaskName(),
@@ -83,7 +82,7 @@ public class PeriodicTaskScheduler {
   }
 
   /**
-   * Shutdown executor service and stop the periodic tasks
+   * Shuts down the executor service and stops the periodic tasks.
    */
   public synchronized void stop() {
     if (_executorService != null) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index acacf15..a4d581d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -34,24 +34,24 @@ public class PeriodicTaskSchedulerTest {
   @Test
   public void testTaskWithInvalidInterval()
       throws Exception {
-    AtomicBoolean initCalled = new AtomicBoolean();
+    AtomicBoolean startCalled = new AtomicBoolean();
     AtomicBoolean runCalled = new AtomicBoolean();
     AtomicBoolean stopCalled = new AtomicBoolean();
 
     List<PeriodicTask> periodicTasks = Collections.singletonList(new 
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
       @Override
-      public void init() {
-        initCalled.set(true);
+      protected void setUpTask() {
+        startCalled.set(true);
       }
 
       @Override
-      public void stop() {
-        stopCalled.set(true);
+      protected void runTask() {
+        runCalled.set(true);
       }
 
       @Override
-      public void run() {
-        runCalled.set(true);
+      protected void cleanUpTask() {
+        stopCalled.set(true);
       }
     });
 
@@ -61,7 +61,7 @@ public class PeriodicTaskSchedulerTest {
     Thread.sleep(100L);
     taskScheduler.stop();
 
-    assertFalse(initCalled.get());
+    assertFalse(startCalled.get());
     assertFalse(runCalled.get());
     assertFalse(stopCalled.get());
   }
@@ -70,26 +70,26 @@ public class PeriodicTaskSchedulerTest {
   public void testScheduleMultipleTasks()
       throws Exception {
     int numTasks = 3;
-    AtomicInteger numTimesInitCalled = new AtomicInteger();
+    AtomicInteger numTimesStartCalled = new AtomicInteger();
     AtomicInteger numTimesRunCalled = new AtomicInteger();
     AtomicInteger numTimesStopCalled = new AtomicInteger();
 
     List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
     for (int i = 0; i < numTasks; i++) {
-      periodicTasks.add(new BasePeriodicTask("Task", 1L, 0L) {
+      periodicTasks.add(new BasePeriodicTask("TestTask", 1L, 0L) {
         @Override
-        public void init() {
-          numTimesInitCalled.getAndIncrement();
+        protected void setUpTask() {
+          numTimesStartCalled.getAndIncrement();
         }
 
         @Override
-        public void stop() {
-          numTimesStopCalled.getAndIncrement();
+        protected void runTask() {
+          numTimesRunCalled.getAndIncrement();
         }
 
         @Override
-        public void run() {
-          numTimesRunCalled.getAndIncrement();
+        protected void cleanUpTask() {
+          numTimesStopCalled.getAndIncrement();
         }
       });
     }
@@ -100,7 +100,7 @@ public class PeriodicTaskSchedulerTest {
     Thread.sleep(1100L);
     taskScheduler.stop();
 
-    assertEquals(numTimesInitCalled.get(), numTasks);
+    assertEquals(numTimesStartCalled.get(), numTasks);
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
     assertEquals(numTimesStopCalled.get(), numTasks);
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 58faf3a..7e7b041 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -160,7 +160,7 @@ public class SegmentCompletionIntegrationTests extends 
LLCRealtimeClusterIntegra
 
     // Now call the validation manager, and the segment should fix itself
     RealtimeSegmentValidationManager validationManager = 
_controllerStarter.getRealtimeSegmentValidationManager();
-    validationManager.init();
+    validationManager.start();
     validationManager.run();
 
     // Check if a new segment get into CONSUMING state


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to