Jackie-Jiang commented on a change in pull request #3475: Enhance controller
periodic task and scheduler
URL: https://github.com/apache/incubator-pinot/pull/3475#discussion_r233677357
##########
File path:
pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -15,58 +15,63 @@
*/
package com.linkedin.pinot.core.periodictask;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Periodic task scheduler will schedule a list of tasks based on their
initial delay time and interval time.
*/
public class PeriodicTaskScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
- private static final int CORE_POOL_SIZE = 5;
- private final ScheduledExecutorService _executorService;
- public PeriodicTaskScheduler() {
- LOGGER.info("Initializing PeriodicTaskScheduler.");
- _executorService = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
- }
+ private ScheduledExecutorService _executorService;
/**
* Start scheduling periodic tasks.
*/
public void start(List<PeriodicTask> periodicTasks) {
- if (periodicTasks == null || periodicTasks.isEmpty()) {
- LOGGER.warn("No periodic task assigned to scheduler!");
- return;
+ if (_executorService != null) {
+ LOGGER.warn("Periodic task scheduler already started");
}
- if (periodicTasks.size() > CORE_POOL_SIZE) {
- LOGGER.warn("The number of tasks:{} is more than the default number of
threads:{}.", periodicTasks.size(),
- CORE_POOL_SIZE);
+ List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+ for (PeriodicTask periodicTask : periodicTasks) {
+ if (periodicTask.getIntervalInSeconds() > 0) {
+ tasksWithValidInterval.add(periodicTask);
+ }
}
- LOGGER.info("Starting PeriodicTaskScheduler.");
- // Set up an executor that executes tasks periodically
- for (PeriodicTask periodicTask : periodicTasks) {
- periodicTask.init();
- _executorService.scheduleWithFixedDelay(() -> {
- try {
- periodicTask.run();
- } catch (Throwable e) {
- // catch all errors to prevent subsequent executions from being
silently suppressed
- // Ref:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
- LOGGER.warn("Caught exception while running Task: {}",
periodicTask.getTaskName(), e);
- }
- }, periodicTask.getInitialDelayInSeconds(),
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+ if (tasksWithValidInterval.isEmpty()) {
+ LOGGER.warn("No periodic task scheduled");
+ } else {
+ LOGGER.info("Starting periodic task scheduler with tasks: {}",
tasksWithValidInterval);
+ _executorService =
Executors.newScheduledThreadPool(tasksWithValidInterval.size());
+ for (PeriodicTask periodicTask : periodicTasks) {
+ periodicTask.init();
+ _executorService.scheduleWithFixedDelay(() -> {
+ try {
+ periodicTask.run();
+ } catch (Throwable e) {
+ // catch all errors to prevent subsequent executions from being
silently suppressed
+ // Ref:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+ LOGGER.warn("Caught exception while running Task: {}",
periodicTask.getTaskName(), e);
+ }
+ }, periodicTask.getInitialDelayInSeconds(),
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+ }
}
}
public void stop() {
- LOGGER.info("Stopping PeriodicTaskScheduler");
- _executorService.shutdown();
+ if (_executorService != null) {
+ LOGGER.info("Stopping periodic task scheduler");
+ _executorService.shutdown();
+ _executorService = null;
Review comment:
That could be very slow. Once we shut down the executor, no new task will be
scheduled.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]