Jackie-Jiang commented on a change in pull request #3819: Refactor periodic task
URL: https://github.com/apache/incubator-pinot/pull/3819#discussion_r256193623
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
##########
@@ -44,134 +43,126 @@
* <p><code>PinotTaskManager</code> is also responsible for checking the
health status on each type of tasks, detect and
* fix issues accordingly.
*/
-public class PinotTaskManager extends ControllerPeriodicTask {
+public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTaskManager.class);
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoProvider _clusterInfoProvider;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
- private Map<String, List<TableConfig>> _enabledTableConfigMap;
- private Set<String> _taskTypes;
- private int _numTaskTypes;
- private Map<String, String> _tasksScheduled;
-
- public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager
helixTaskResourceManager,
- @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull
ControllerConf controllerConf,
- @Nonnull ControllerMetrics controllerMetrics) {
+ public PinotTaskManager(PinotHelixTaskResourceManager
helixTaskResourceManager,
+ PinotHelixResourceManager helixResourceManager, ControllerConf
controllerConf,
+ ControllerMetrics controllerMetrics) {
super("PinotTaskManager",
controllerConf.getTaskManagerFrequencyInSeconds(),
controllerConf.getPeriodicTaskInitialDelayInSeconds(),
helixResourceManager, controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
_clusterInfoProvider = new ClusterInfoProvider(helixResourceManager,
helixTaskResourceManager, controllerConf);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
}
- @Override
- protected void initTask() {
-
- }
-
/**
- * Get the cluster info provider.
- * <p>Cluster info provider might be needed to initialize task generators.
+ * Returns the cluster info provider.
+ * <p>
+ * Cluster info provider might be useful when initializing task generators.
*
* @return Cluster info provider
*/
- @Nonnull
public ClusterInfoProvider getClusterInfoProvider() {
return _clusterInfoProvider;
}
/**
- * Register a task generator.
- * <p>This is for pluggable task generators.
+ * Registers a task generator.
+ * <p>
+ * This method can be used to plug in custom task generators.
*
* @param pinotTaskGenerator Task generator to be registered
*/
- public void registerTaskGenerator(@Nonnull PinotTaskGenerator
pinotTaskGenerator) {
+ public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
_taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
}
/**
* Public API to schedule tasks. It doesn't matter whether current pinot
controller is leader.
*/
- public Map<String, String> scheduleTasks() {
- process(_pinotHelixResourceManager.getAllTables());
- return getTasksScheduled();
+ public synchronized Map<String, String> scheduleTasks() {
+ Map<String, String> tasksScheduled =
scheduleTasks(_pinotHelixResourceManager.getAllTables());
+
+ // For non-leader controller, perform non-leader cleanup
+ if (!isStarted()) {
+ nonLeaderCleanUp();
+ }
+ return tasksScheduled;
}
- @Override
- protected void preprocess() {
-
_metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
+ /**
+ * Check the Pinot cluster status and schedule new tasks for the given
tables.
+ *
+ * @param tableNamesWithType List of table names with type suffix
+ * @return Map from task type to task scheduled
+ */
+ private Map<String, String> scheduleTasks(List<String> tableNamesWithType) {
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
- _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
- _numTaskTypes = _taskTypes.size();
- _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
+ Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+ int numTaskTypes = taskTypes.size();
+ Map<String, List<TableConfig>> enabledTableConfigMap = new
HashMap<>(numTaskTypes);
- for (String taskType : _taskTypes) {
- _enabledTableConfigMap.put(taskType, new ArrayList<>());
+ for (String taskType : taskTypes) {
+ enabledTableConfigMap.put(taskType, new ArrayList<>());
// Ensure all task queues exist
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
}
- }
- @Override
- protected void processTable(String tableNameWithType) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null) {
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig != null) {
- for (String taskType : _taskTypes) {
- if (taskConfig.isTaskTypeEnabled(taskType)) {
- _enabledTableConfigMap.get(taskType).add(tableConfig);
+ // Scan all table configs to get the tables with tasks enabled
+ for (String tableNameWithType : tableNamesWithType) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null) {
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig != null) {
+ for (String taskType : taskTypes) {
+ if (taskConfig.isTaskTypeEnabled(taskType)) {
+ enabledTableConfigMap.get(taskType).add(tableConfig);
+ }
}
}
}
}
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Exception in PinotTaskManager for table {}",
tableNameWithType, e);
- }
- @Override
- protected void postprocess() {
// Generate each type of tasks
- _tasksScheduled = new HashMap<>(_numTaskTypes);
- for (String taskType : _taskTypes) {
+ Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
+ for (String taskType : taskTypes) {
LOGGER.info("Generating tasks for task type: {}", taskType);
PinotTaskGenerator pinotTaskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- List<PinotTaskConfig> pinotTaskConfigs =
pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
+ List<PinotTaskConfig> pinotTaskConfigs =
pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
int numTasks = pinotTaskConfigs.size();
if (numTasks > 0) {
LOGGER
.info("Submitting {} tasks for task type: {} with task configs:
{}", numTasks, taskType, pinotTaskConfigs);
- _tasksScheduled.put(taskType, _helixTaskResourceManager
+ tasksScheduled.put(taskType, _helixTaskResourceManager
.submitTask(pinotTaskConfigs,
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
- _metricsRegistry.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+ _controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
- }
- /**
- * Returns the tasks that have been scheduled as part of the postprocess
- * @return
- */
- private Map<String, String> getTasksScheduled() {
- return _tasksScheduled;
+ return tasksScheduled;
}
- /**
- * Performs necessary cleanups (e.g. remove metrics) when the controller
leadership changes.
- */
- @Override
- public void stopTask() {
- LOGGER.info("Perform task cleanups.");
- // Performs necessary cleanups for each task type.
+ private void nonLeaderCleanUp() {
Review comment:
Added comments in BasePeriodicTask
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]