This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a1fc25  Refactor ControllerPeriodicTask to iterate over tables  
(#3618)
7a1fc25 is described below

commit 7a1fc25a80ec60e7898d2069371ce995132d5a11
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Dec 18 16:01:01 2018 -0800

    Refactor ControllerPeriodicTask to iterate over tables  (#3618)
    
    Refactoring the tasks to pull up the iteration over tables into 
ControllerPeriodicTask. This way we can have the checks just in the 
ControllerPeriodicTask, and each PeriodicTask is only responsible for executing 
the task on a given table.
---
 .../controller/helix/SegmentStatusChecker.java     | 285 +++++++++++----------
 .../helix/core/minion/PinotTaskManager.java        |  99 +++----
 .../core/periodictask/ControllerPeriodicTask.java  |  24 +-
 .../core/relocation/RealtimeSegmentRelocator.java  |  76 +++---
 .../helix/core/retention/RetentionManager.java     |  27 +-
 .../controller/validation/ValidationManager.java   |  89 +++----
 .../periodictask/ControllerPeriodicTaskTest.java   |  17 +-
 7 files changed, 338 insertions(+), 279 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 549dc9d..8059d9e 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -47,13 +47,19 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
   public static final String ERROR = "ERROR";
   public static final String CONSUMING = "CONSUMING";
   private final ControllerMetrics _metricsRegistry;
-  private final ControllerConf _config;
   private final HelixAdmin _helixAdmin;
+  private final String _helixClusterName;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final int _waitForPushTimeSeconds;
 
   // log messages about disabled tables atmost once a day
   private static final long DISABLED_TABLE_LOG_INTERVAL_MS = 
TimeUnit.DAYS.toMillis(1);
   private long _lastDisabledTableLogTimestamp = 0;
+  private boolean _logDisabledTables;
+  private int _realTimeTableCount;
+  private int _offlineTableCount;
+  private int _disabledTableCount;
+
 
   /**
    * Constructs the segment status checker.
@@ -64,7 +70,9 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
       ControllerMetrics metricsRegistry) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), 
pinotHelixResourceManager);
     _helixAdmin = pinotHelixResourceManager.getHelixAdmin();
-    _config = config;
+    _helixClusterName = pinotHelixResourceManager.getHelixClusterName();
+    _propertyStore = _pinotHelixResourceManager.getPropertyStore();
+
     _waitForPushTimeSeconds = 
config.getStatusCheckerWaitForPushTimeInSeconds();
     _metricsRegistry = metricsRegistry;
   }
@@ -82,166 +90,167 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    updateSegmentMetrics(tables);
-  }
-
-  /**
-   * Runs a segment status pass over the given tables.
-   * TODO: revisit the logic and reduce the ZK access
-   *
-   * @param tables List of table names
-   */
-  private void updateSegmentMetrics(List<String> tables) {
-    // Fetch the list of tables
-    String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
-    HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
-    int realTimeTableCount = 0;
-    int offlineTableCount = 0;
-    int disabledTableCount = 0;
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+  protected void preprocess() {
+    _realTimeTableCount = 0;
+    _offlineTableCount = 0;
+    _disabledTableCount = 0;
 
     // check if we need to log disabled tables log messages
-    boolean logDisabledTables = false;
     long now = System.currentTimeMillis();
     if (now - _lastDisabledTableLogTimestamp >= 
DISABLED_TABLE_LOG_INTERVAL_MS) {
-      logDisabledTables = true;
+      _logDisabledTables = true;
       _lastDisabledTableLogTimestamp = now;
     } else {
-      logDisabledTables = false;
+      _logDisabledTables = false;
     }
+  }
 
-    for (String tableName : tables) {
-      try {
-        if (TableNameBuilder.getTableTypeFromTableName(tableName) == 
TableType.OFFLINE) {
-          offlineTableCount++;
-        } else {
-          realTimeTableCount++;
-        }
-        IdealState idealState = 
helixAdmin.getResourceIdealState(helixClusterName, tableName);
-        if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
-          int nReplicasFromIdealState = 1;
-          try {
-            if (idealState != null) {
-              nReplicasFromIdealState = 
Integer.valueOf(idealState.getReplicas());
-            }
-          } catch (NumberFormatException e) {
-            // Ignore
+  @Override
+  protected void processTable(String tableNameWithType) {
+    updateSegmentMetrics(tableNameWithType);
+  }
+
+  @Override
+  protected void postprocess() {
+    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, 
_realTimeTableCount);
+    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, 
_offlineTableCount);
+    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, 
_disabledTableCount);
+  }
+
+  /**
+   * Runs a segment status pass over the given table.
+   * TODO: revisit the logic and reduce the ZK access
+   *
+   * @param tableNameWithType
+   */
+  private void updateSegmentMetrics(String tableNameWithType) {
+
+    try {
+      if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE) {
+        _offlineTableCount++;
+      } else {
+        _realTimeTableCount++;
+      }
+      IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+      if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
+        int nReplicasFromIdealState = 1;
+        try {
+          if (idealState != null) {
+            nReplicasFromIdealState = 
Integer.valueOf(idealState.getReplicas());
           }
-          _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
-          _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_OF_REPLICAS, 100);
-          _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
-          continue;
+        } catch (NumberFormatException e) {
+          // Ignore
         }
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS, 100);
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+        return;
+      }
 
-        if (!idealState.isEnabled()) {
-          if (logDisabledTables) {
-            LOGGER.warn("Table {} is disabled. Skipping segment status 
checks", tableName);
-          }
-          resetTableMetrics(tableName);
-          disabledTableCount++;
-          continue;
+      if (!idealState.isEnabled()) {
+        if (_logDisabledTables) {
+          LOGGER.warn("Table {} is disabled. Skipping segment status checks", 
tableNameWithType);
         }
+        resetTableMetrics(tableNameWithType);
+        _disabledTableCount++;
+        return;
+      }
 
-        _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE,
-            idealState.toString().length());
-        _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.SEGMENT_COUNT,
-            (long) (idealState.getPartitionSet().size()));
-        ExternalView externalView = 
helixAdmin.getResourceExternalView(helixClusterName, tableName);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+          idealState.toString().length());
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT,
+          (long) (idealState.getPartitionSet().size()));
+      ExternalView externalView = 
_helixAdmin.getResourceExternalView(_helixClusterName, tableNameWithType);
 
-        int nReplicasIdealMax = 0; // Keeps track of maximum number of 
replicas in ideal state
-        int nReplicasExternal = -1; // Keeps track of minimum number of 
replicas in external view
-        int nErrors = 0; // Keeps track of number of segments in error state
-        int nOffline = 0; // Keeps track of number segments with no online 
replicas
-        int nSegments = 0; // Counts number of segments
-        for (String partitionName : idealState.getPartitionSet()) {
-          int nReplicas = 0;
-          int nIdeal = 0;
-          nSegments++;
-          // Skip segments not online in ideal state
-          for (Map.Entry<String, String> serverAndState : 
idealState.getInstanceStateMap(partitionName).entrySet()) {
-            if (serverAndState == null) {
-              break;
-            }
-            if (serverAndState.getValue().equals(ONLINE)) {
-              nIdeal++;
-              break;
-            }
-          }
-          if (nIdeal == 0) {
-            // No online segments in ideal state
-            continue;
+      int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas 
in ideal state
+      int nReplicasExternal = -1; // Keeps track of minimum number of replicas 
in external view
+      int nErrors = 0; // Keeps track of number of segments in error state
+      int nOffline = 0; // Keeps track of number segments with no online 
replicas
+      int nSegments = 0; // Counts number of segments
+      for (String partitionName : idealState.getPartitionSet()) {
+        int nReplicas = 0;
+        int nIdeal = 0;
+        nSegments++;
+        // Skip segments not online in ideal state
+        for (Map.Entry<String, String> serverAndState : 
idealState.getInstanceStateMap(partitionName).entrySet()) {
+          if (serverAndState == null) {
+            break;
           }
-          nReplicasIdealMax = 
(idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax)
-              ? idealState.getInstanceStateMap(partitionName).size() : 
nReplicasIdealMax;
-          if ((externalView == null) || 
(externalView.getStateMap(partitionName) == null)) {
-            // No replicas for this segment
-            TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-            if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
-              OfflineSegmentZKMetadata segmentZKMetadata =
-                  
ZKMetadataProvider.getOfflineSegmentZKMetadata(propertyStore, tableName, 
partitionName);
-              if (segmentZKMetadata != null
-                  && segmentZKMetadata.getPushTime() > 
System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
-                // push not yet finished, skip
-                continue;
-              }
-            }
-            nOffline++;
-            if (nOffline < MaxOfflineSegmentsToLog) {
-              LOGGER.warn("Segment {} of table {} has no replicas", 
partitionName, tableName);
-            }
-            nReplicasExternal = 0;
-            continue;
+          if (serverAndState.getValue().equals(ONLINE)) {
+            nIdeal++;
+            break;
           }
-          for (Map.Entry<String, String> serverAndState : 
externalView.getStateMap(partitionName).entrySet()) {
-            // Count number of online replicas. Ignore if state is CONSUMING.
-            // It is possible for a segment to be ONLINE in idealstate, and 
CONSUMING in EV for a short period of time.
-            // So, ignore this combination. If a segment exists in this 
combination for a long time, we will get
-            // low level-partition-not-consuming alert anyway.
-            if (serverAndState.getValue().equals(ONLINE) || 
serverAndState.getValue().equals(CONSUMING)) {
-              nReplicas++;
-            }
-            if (serverAndState.getValue().equals(ERROR)) {
-              nErrors++;
+        }
+        if (nIdeal == 0) {
+          // No online segments in ideal state
+          continue;
+        }
+        nReplicasIdealMax =
+            (idealState.getInstanceStateMap(partitionName).size() > 
nReplicasIdealMax) ? idealState.getInstanceStateMap(
+                partitionName).size() : nReplicasIdealMax;
+        if ((externalView == null) || (externalView.getStateMap(partitionName) 
== null)) {
+          // No replicas for this segment
+          TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+          if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
+            OfflineSegmentZKMetadata segmentZKMetadata =
+                ZKMetadataProvider.getOfflineSegmentZKMetadata(_propertyStore, 
tableNameWithType, partitionName);
+            if (segmentZKMetadata != null
+                && segmentZKMetadata.getPushTime() > 
System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
+              // push not yet finished, skip
+              continue;
             }
           }
-          if (nReplicas == 0) {
-            if (nOffline < MaxOfflineSegmentsToLog) {
-              LOGGER.warn("Segment {} of table {} has no online replicas", 
partitionName, tableName);
-            }
-            nOffline++;
+          nOffline++;
+          if (nOffline < MaxOfflineSegmentsToLog) {
+            LOGGER.warn("Segment {} of table {} has no replicas", 
partitionName, tableNameWithType);
           }
-          nReplicasExternal =
-              ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? 
nReplicas : nReplicasExternal;
-        }
-        if (nReplicasExternal == -1) {
-          nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+          nReplicasExternal = 0;
+          continue;
         }
-        // Synchronization provided by Controller Gauge to make sure that only 
one thread updates the gauge
-        _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
-        _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_OF_REPLICAS,
-            (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / 
nReplicasIdealMax) : 100);
-        _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
-        _metricsRegistry.setValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
-            (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
-        if (nOffline > 0) {
-          LOGGER.warn("Table {} has {} segments with no online replicas", 
tableName, nOffline);
+        for (Map.Entry<String, String> serverAndState : 
externalView.getStateMap(partitionName).entrySet()) {
+          // Count number of online replicas. Ignore if state is CONSUMING.
+          // It is possible for a segment to be ONLINE in idealstate, and 
CONSUMING in EV for a short period of time.
+          // So, ignore this combination. If a segment exists in this 
combination for a long time, we will get
+          // low level-partition-not-consuming alert anyway.
+          if (serverAndState.getValue().equals(ONLINE) || 
serverAndState.getValue().equals(CONSUMING)) {
+            nReplicas++;
+          }
+          if (serverAndState.getValue().equals(ERROR)) {
+            nErrors++;
+          }
         }
-        if (nReplicasExternal < nReplicasIdealMax) {
-          LOGGER.warn("Table {} has {} replicas, below replication threshold 
:{}", tableName, nReplicasExternal,
-              nReplicasIdealMax);
+        if (nReplicas == 0) {
+          if (nOffline < MaxOfflineSegmentsToLog) {
+            LOGGER.warn("Segment {} of table {} has no online replicas", 
partitionName, tableNameWithType);
+          }
+          nOffline++;
         }
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while updating segment status for table 
{}", e, tableName);
-
-        // Remove the metric for this table
-        resetTableMetrics(tableName);
+        nReplicasExternal =
+            ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? 
nReplicas : nReplicasExternal;
       }
-    }
+      if (nReplicasExternal == -1) {
+        nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+      }
+      // Synchronization provided by Controller Gauge to make sure that only 
one thread updates the gauge
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS,
+          (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / 
nReplicasIdealMax) : 100);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+          (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+      if (nOffline > 0) {
+        LOGGER.warn("Table {} has {} segments with no online replicas", 
tableNameWithType, nOffline);
+      }
+      if (nReplicasExternal < nReplicasIdealMax) {
+        LOGGER.warn("Table {} has {} replicas, below replication threshold 
:{}", tableNameWithType, nReplicasExternal,
+            nReplicasIdealMax);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while updating segment status for table 
{}", e, tableNameWithType);
 
-    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, 
realTimeTableCount);
-    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, 
offlineTableCount);
-    
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, 
disabledTableCount);
+      // Remove the metric for this table
+      resetTableMetrics(tableNameWithType);
+    }
   }
 
   private void setStatusToDefault() {
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0ff695d..729c75c 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -49,6 +49,11 @@ public class PinotTaskManager extends ControllerPeriodicTask 
{
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
   private final ControllerMetrics _controllerMetrics;
 
+  private Map<String, List<TableConfig>> _enabledTableConfigMap;
+  private Set<String> _taskTypes;
+  private int _numTaskTypes;
+  private Map<String, String> _tasksScheduled;
+
   public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager 
helixTaskResourceManager,
       @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull 
ControllerConf controllerConf,
       @Nonnull ControllerMetrics controllerMetrics) {
@@ -81,81 +86,81 @@ public class PinotTaskManager extends 
ControllerPeriodicTask {
   }
 
   /**
-   * Check the Pinot cluster status and schedule new tasks for the given 
tables.
-   *
-   * @param tables List of table names
-   * @return Map from task type to task scheduled
+   * Public API to schedule tasks. It doesn't matter whether current pinot 
controller is leader.
    */
-  @Nonnull
-  private Map<String, String> scheduleTasks(List<String> tables) {
+  public Map<String, String> scheduleTasks() {
+    process(_pinotHelixResourceManager.getAllTables());
+    return getTasksScheduled();
+  }
+
+  /**
+   * Performs necessary cleanups (e.g. remove metrics) when the controller 
leadership changes.
+   */
+  @Override
+  public void onBecomeNotLeader() {
+    LOGGER.info("Perform task cleanups.");
+    // Performs necessary cleanups for each task type.
+    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+    }
+  }
+
+
+  @Override
+  protected void preprocess() {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
-    Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
-    int numTaskTypes = taskTypes.size();
-    Map<String, List<TableConfig>> enabledTableConfigMap = new 
HashMap<>(numTaskTypes);
+    _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+    _numTaskTypes = _taskTypes.size();
+    _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
 
-    for (String taskType : taskTypes) {
-      enabledTableConfigMap.put(taskType, new ArrayList<>());
+    for (String taskType : _taskTypes) {
+      _enabledTableConfigMap.put(taskType, new ArrayList<>());
 
       // Ensure all task queues exist
       _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     }
+  }
 
-    // Scan all table configs to get the tables with tasks enabled
-    for (String tableName : tables) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableName);
-      if (tableConfig != null) {
-        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-        if (taskConfig != null) {
-          for (String taskType : taskTypes) {
-            if (taskConfig.isTaskTypeEnabled(taskType)) {
-              enabledTableConfigMap.get(taskType).add(tableConfig);
-            }
+  @Override
+  protected void processTable(String tableNameWithType) {
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig != null) {
+      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+      if (taskConfig != null) {
+        for (String taskType : _taskTypes) {
+          if (taskConfig.isTaskTypeEnabled(taskType)) {
+            _enabledTableConfigMap.get(taskType).add(tableConfig);
           }
         }
       }
     }
+  }
 
+  @Override
+  protected void postprocess() {
     // Generate each type of tasks
-    Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
-    for (String taskType : taskTypes) {
+    _tasksScheduled = new HashMap<>(_numTaskTypes);
+    for (String taskType : _taskTypes) {
       LOGGER.info("Generating tasks for task type: {}", taskType);
       PinotTaskGenerator pinotTaskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-      List<PinotTaskConfig> pinotTaskConfigs = 
pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
+      List<PinotTaskConfig> pinotTaskConfigs = 
pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
       int numTasks = pinotTaskConfigs.size();
       if (numTasks > 0) {
         LOGGER.info("Submitting {} tasks for task type: {} with task configs: 
{}", numTasks, taskType,
             pinotTaskConfigs);
-        tasksScheduled.put(taskType, 
_helixTaskResourceManager.submitTask(pinotTaskConfigs,
+        _tasksScheduled.put(taskType, 
_helixTaskResourceManager.submitTask(pinotTaskConfigs,
             pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
         _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
       }
     }
-
-    return tasksScheduled;
   }
 
   /**
-   * Public API to schedule tasks. It doesn't matter whether current pinot 
controller is leader.
-   */
-  public Map<String, String> scheduleTasks() {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables());
-  }
-
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller 
leadership changes.
+   * Returns the tasks that have been scheduled as part of the postprocess
+   * @return
    */
-  @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
-    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
-      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
-    }
-  }
-
-  @Override
-  public void process(List<String> tables) {
-    scheduleTasks(tables);
+  private Map<String, String> getTasksScheduled() {
+    return _tasksScheduled;
   }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3b12c0f..c416d0e 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -110,7 +110,29 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
    *
    * @param tables List of table names
    */
-  public abstract void process(List<String> tables);
+  protected void process(List<String> tables) {
+    preprocess();
+    for (String table : tables) {
+      processTable(table);
+    }
+    postprocess();
+  }
+
+  /**
+   * This method runs before processing all tables
+   */
+  protected abstract void preprocess();
+
+  /**
+   * Execute the controller periodic task for the given table
+   * @param tableNameWithType
+   */
+  protected abstract void processTable(String tableNameWithType);
+
+  /**
+   * This method runs after processing all tables
+   */
+  protected abstract void postprocess();
 
   @VisibleForTesting
   protected boolean isLeader() {
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index d152887..e324c11 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,51 +58,59 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    runRelocation(tables);
+  protected void preprocess() {
+
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    runRelocation(tableNameWithType);
+  }
+
+  @Override
+  protected void postprocess() {
+
   }
 
   /**
-   * Check the given tables. Perform relocation of segments if table is 
realtime and relocation is required
+   * Check the given table. Perform relocation of segments if table is 
realtime and relocation is required
    * TODO: Model this to implement {@link 
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} 
interface
    * https://github.com/linkedin/pinot/issues/2609
    *
-   * @param tables List of table names
+   * @param tableNameWithType
    */
-  private void runRelocation(List<String> tables) {
-    for (String tableNameWithType : tables) {
-      // Only consider realtime tables.
-      if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
-        continue;
+  private void runRelocation(String tableNameWithType) {
+    // Only consider realtime tables.
+    if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
+      return;
+    }
+    try {
+      LOGGER.info("Starting relocation of segments for table: {}", 
tableNameWithType);
+
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
+      final RealtimeTagConfig realtimeTagConfig = new 
RealtimeTagConfig(tableConfig);
+      if (!realtimeTagConfig.isRelocateCompletedSegments()) {
+        LOGGER.info("Skipping relocation of segments for {}", 
tableNameWithType);
+        return;
       }
-      try {
-        LOGGER.info("Starting relocation of segments for table: {}", 
tableNameWithType);
-
-        TableConfig tableConfig = 
_pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
-        final RealtimeTagConfig realtimeTagConfig = new 
RealtimeTagConfig(tableConfig);
-        if (!realtimeTagConfig.isRelocateCompletedSegments()) {
-          LOGGER.info("Skipping relocation of segments for {}", 
tableNameWithType);
-          continue;
-        }
 
-        Function<IdealState, IdealState> updater = new Function<IdealState, 
IdealState>() {
-          @Nullable
-          @Override
-          public IdealState apply(@Nullable IdealState idealState) {
-            if (!idealState.isEnabled()) {
-              LOGGER.info("Skipping relocation of segments for {} since ideal 
state is disabled", tableNameWithType);
-              return null;
-            }
-            relocateSegments(realtimeTagConfig, idealState);
-            return idealState;
+      Function<IdealState, IdealState> updater = new Function<IdealState, 
IdealState>() {
+        @Nullable
+        @Override
+        public IdealState apply(@Nullable IdealState idealState) {
+          if (!idealState.isEnabled()) {
+            LOGGER.info("Skipping relocation of segments for {} since ideal 
state is disabled", tableNameWithType);
+            return null;
           }
-        };
+          relocateSegments(realtimeTagConfig, idealState);
+          return idealState;
+        }
+      };
 
-        
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), 
tableNameWithType, updater,
-            RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
-      } catch (Exception e) {
-        LOGGER.error("Exception in relocating realtime segments of table {}", 
tableNameWithType, e);
-      }
+      
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), 
tableNameWithType, updater,
+          RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
+    } catch (Exception e) {
+      LOGGER.error("Exception in relocating realtime segments of table {}", 
tableNameWithType, e);
     }
   }
 
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index d76cc3b..80894fc 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -62,29 +62,26 @@ public class RetentionManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    execute(tables);
+  protected void preprocess() {
+
   }
 
-  /**
-   * Manages retention for the given tables.
-   *
-   * @param tables List of table names
-   */
-  private void execute(List<String> tables) {
+  @Override
+  protected void processTable(String tableNameWithType) {
     try {
-      for (String tableNameWithType : tables) {
-        LOGGER.info("Start managing retention for table: {}", 
tableNameWithType);
-        manageRetentionForTable(tableNameWithType);
-      }
-
-      LOGGER.info("Removing aged (more than {} days) deleted segments for all 
tables", _deletedSegmentsRetentionInDays);
-      
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+      LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+      manageRetentionForTable(tableNameWithType);
     } catch (Exception e) {
       LOGGER.error("Caught exception while managing retention for all tables", 
e);
     }
   }
 
+  @Override
+  protected void postprocess() {
+    LOGGER.info("Removing aged (more than {} days) deleted segments for all 
tables", _deletedSegmentsRetentionInDays);
+    
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+  }
+
   private void manageRetentionForTable(String tableNameWithType) {
     try {
       // Build retention strategy from table config
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 8717318..18025eb 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -23,7 +23,7 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ValidationMetrics;
-import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.HLCSegmentName;
 import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.time.TimeUtils;
@@ -57,6 +57,8 @@ public class ValidationManager extends ControllerPeriodicTask 
{
   private final ValidationMetrics _validationMetrics;
 
   private long _lastSegmentLevelValidationTimeMs = 0L;
+  private boolean _runSegmentLevelValidation;
+  private List<InstanceConfig> _instanceConfigs;
 
   public ValidationManager(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, 
ValidationMetrics validationMetrics) {
@@ -74,61 +76,62 @@ public class ValidationManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    runValidation(tables);
-  }
-
-  /**
-   * Runs a validation pass over the given tables.
-   *
-   * @param tables List of table names
-   */
-  private void runValidation(List<String> tables) {
+  protected void preprocess() {
     // Run segment level validation using a separate interval
-    boolean runSegmentLevelValidation = false;
+    _runSegmentLevelValidation = false;
     long currentTimeMs = System.currentTimeMillis();
     if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - 
_lastSegmentLevelValidationTimeMs)
         >= _segmentLevelValidationIntervalInSeconds) {
       LOGGER.info("Run segment-level validation");
-      runSegmentLevelValidation = true;
+      _runSegmentLevelValidation = true;
       _lastSegmentLevelValidationTimeMs = currentTimeMs;
     }
 
     // Cache instance configs to reduce ZK access
-    List<InstanceConfig> instanceConfigs = 
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
-
-    for (String tableNameWithType : tables) {
-      try {
-        TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-        if (tableConfig == null) {
-          LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
-          continue;
-        }
+    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  }
 
-        // Rebuild broker resource
-        Set<String> brokerInstances = 
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(instanceConfigs,
-            tableConfig.getTenantConfig().getBroker());
-        _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
+  @Override
+  protected void processTable(String tableNameWithType) {
+    runValidation(tableNameWithType);
+  }
 
-        // Perform validation based on the table type
-        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-        if (tableType == TableType.OFFLINE) {
-          if (runSegmentLevelValidation) {
-            validateOfflineSegmentPush(tableConfig);
-          }
-        } else {
-          if (runSegmentLevelValidation) {
-            updateRealtimeDocumentCount(tableConfig);
-          }
-          Map<String, String> streamConfigMap = 
tableConfig.getIndexingConfig().getStreamConfigs();
-          StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-          if (streamConfig.hasLowLevelConsumerType()) {
-            _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
-          }
+  @Override
+  protected void postprocess() {
+
+  }
+
+  private void runValidation(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
+        return;
+      }
+
+      // Rebuild broker resource
+      Set<String> brokerInstances = 
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+          tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
+
+      // Perform validation based on the table type
+      CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
+        if (_runSegmentLevelValidation) {
+          validateOfflineSegmentPush(tableConfig);
+        }
+      } else {
+        if (_runSegmentLevelValidation) {
+          updateRealtimeDocumentCount(tableConfig);
+        }
+        Map<String, String> streamConfigMap = 
tableConfig.getIndexingConfig().getStreamConfigs();
+        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+        if (streamConfig.hasLowLevelConsumerType()) {
+          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while validating table: {}", 
tableNameWithType, e);
       }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating table: {}", 
tableNameWithType, e);
     }
   }
 
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 9a5d26c..94c9f52 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -115,7 +115,22 @@ public class ControllerPeriodicTaskTest {
     }
 
     @Override
-    public void process(List<String> tables) {
+    protected void process(List<String> tables) {
+
+    }
+
+    @Override
+    protected void preprocess() {
+
+    }
+
+    @Override
+    protected void processTable(String tableNameWithType) {
+
+    }
+
+    @Override
+    public void postprocess() {
 
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to