This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b9a26f34ba Add support for cron based scheduling in controller
(#18256)
3b9a26f34ba is described below
commit 3b9a26f34badee886f79c25d741436eb98fe08cf
Author: Priyanshu <[email protected]>
AuthorDate: Tue Jun 2 04:26:14 2026 +0530
Add support for cron based scheduling in controller (#18256)
* adding new configs in controllercong
* adding constructors for cron expression acceptance
* adding constructors for cron expression acceptance
* adding quartz scheduler for cron job scheduling
* fixing issues
* fixing tests
* fixing tests
* unit tests for cron changes
* removing unnecessary comments
* fixing issues
* fixing issues
* fixing issues
* fixing minor issues
* fixing minor issues
* fixing minor issues
* improving code quality
* resolving comments
* fixing checkstyle violations
* Delete
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
* addressing comments
* resolving comments
* addressing comments
* addressing comments
* resolving comments
* resolving comments
* fixing failing CI
---
.../apache/pinot/controller/ControllerConf.java | 108 +++++++++++++-
.../controller/helix/RealtimeConsumerMonitor.java | 3 +-
.../controller/helix/SegmentStatusChecker.java | 4 +-
.../core/cleanup/StaleInstancesCleanupTask.java | 3 +-
.../helix/core/minion/PinotTaskManager.java | 4 +-
.../helix/core/minion/TaskMetricsEmitter.java | 2 +-
.../core/periodictask/ControllerPeriodicTask.java | 6 +-
.../helix/core/rebalance/RebalanceChecker.java | 5 +-
.../rebalance/tenant/TenantRebalanceChecker.java | 3 +-
.../helix/core/relocation/SegmentRelocator.java | 5 +-
.../helix/core/retention/RetentionManager.java | 5 +-
.../BrokerResourceValidationManager.java | 5 +-
.../OfflineSegmentValidationManager.java | 4 +-
.../validation/RealtimeOffsetAutoResetManager.java | 5 +-
.../RealtimeSegmentValidationManager.java | 5 +-
.../periodictask/ControllerPeriodicTaskTest.java | 4 +-
.../pinot/core/periodictask/BasePeriodicTask.java | 30 +++-
.../pinot/core/periodictask/PeriodicTask.java | 8 ++
.../core/periodictask/PeriodicTaskCronJob.java | 53 +++++++
.../core/periodictask/PeriodicTaskScheduler.java | 152 ++++++++++++++++----
.../periodictask/PeriodicTaskSchedulerTest.java | 160 ++++++++++++++++++++-
21 files changed, 514 insertions(+), 60 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e9ac8f6a081..c4d9b131d19 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -105,23 +105,31 @@ public class ControllerConf extends PinotConfiguration {
public static class ControllerPeriodicTasksConf {
// frequency configs
public static final String RETENTION_MANAGER_FREQUENCY_PERIOD =
"controller.retention.frequencyPeriod";
+ public static final String RETENTION_MANAGER_CRON_EXPRESSION =
"controller.retention.cronExpression";
public static final String
OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_PERIOD =
"controller.offline.segment.interval.checker.frequencyPeriod";
public static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD =
"controller.realtime.segment.validation.frequencyPeriod";
+ public static final String REALTIME_SEGMENT_VALIDATION_CRON_EXPRESSION =
+ "controller.realtime.segment.validation.cronExpression";
public static final String
REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS =
"controller.realtime.segment.validation.initialDelayInSeconds";
public static final String REALTIME_OFFSET_AUTO_RESET_BACKFILL_ENABLED =
"controller.realtime.offsetAutoReset.backfill.enabled";
public static final String
REALTIME_OFFSET_AUTO_RESET_BACKFILL_FREQUENCY_PERIOD =
"controller.realtime.offsetAutoReset.backfill.frequencyPeriod";
+ public static final String
REALTIME_OFFSET_AUTO_RESET_BACKFILL_CRON_EXPRESSION =
+ "controller.realtime.offsetAutoReset.backfill.cronExpression";
public static final String
REALTIME_OFFSET_AUTO_RESET_BACKFILL_INITIAL_DELAY_IN_SECONDS =
"controller.realtime.offsetAutoReset.backfill.initialDelayInSeconds";
public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD =
"controller.broker.resource.validation.frequencyPeriod";
+ public static final String BROKER_RESOURCE_VALIDATION_CRON_EXPRESSION =
+ "controller.broker.resource.validation.cronExpression";
public static final String
BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
"controller.broker.resource.validation.initialDelayInSeconds";
public static final String STATUS_CHECKER_FREQUENCY_PERIOD =
"controller.statuschecker.frequencyPeriod";
+ public static final String STATUS_CHECKER_CRON_EXPRESSION =
"controller.statuschecker.cronExpression";
public static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD =
"controller.statuschecker.waitForPushTimePeriod";
public static final String TASK_MANAGER_FREQUENCY_PERIOD =
"controller.task.frequencyPeriod";
@@ -131,6 +139,8 @@ public class ControllerConf extends PinotConfiguration {
public static final String STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD =
"controller.stale.instances.cleanup.task.frequencyPeriod";
+ public static final String STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION =
+ "controller.stale.instances.cleanup.task.cronExpression";
public static final String
STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
"controller.stale.instances.cleanup.task.initialDelaySeconds";
public static final String
STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD =
@@ -138,6 +148,8 @@ public class ControllerConf extends PinotConfiguration {
public static final String TASK_METRICS_EMITTER_FREQUENCY_PERIOD =
"controller.minion.task.metrics.emitter.frequencyPeriod";
+ public static final String TASK_METRICS_EMITTER_CRON_EXPRESSION =
+ "controller.minion.task.metrics.emitter.cronExpression";
public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED =
"controller.task.scheduler.enabled";
// This is the expiry for the ended tasks. Helix cleans up the task info
from ZK after the expiry time from the
@@ -184,7 +196,7 @@ public class ControllerConf extends PinotConfiguration {
public static final boolean DEFAULT_CONCURRENT_SCHEDULING_ENABLED = false;
public static final String SEGMENT_RELOCATOR_FREQUENCY_PERIOD =
"controller.segment.relocator.frequencyPeriod";
-
+ public static final String SEGMENT_RELOCATOR_CRON_EXPRESSION =
"controller.segment.relocator.cronExpression";
public static final String SEGMENT_RELOCATOR_REASSIGN_INSTANCES =
"controller.segment.relocator.reassignInstances";
public static final String SEGMENT_RELOCATOR_BOOTSTRAP =
"controller.segment.relocator.bootstrap";
public static final String SEGMENT_RELOCATOR_DOWNTIME =
"controller.segment.relocator.downtime";
@@ -211,10 +223,12 @@ public class ControllerConf extends PinotConfiguration {
"controller.segmentRelocator.batchSizePerServer";
public static final String REBALANCE_CHECKER_FREQUENCY_PERIOD =
"controller.rebalance.checker.frequencyPeriod";
+ public static final String REBALANCE_CHECKER_CRON_EXPRESSION =
"controller.rebalance.checker.cronExpression";
// Because segment level validation is expensive and requires heavy ZK
access, we run segment level validation
// with a separate interval
public static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_PERIOD =
"controller.segment.level.validation.intervalPeriod";
+
public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
"controller.segment.error.autoReset";
public static final String ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR =
@@ -230,13 +244,16 @@ public class ControllerConf extends PinotConfiguration {
"controller.offlineSegmentIntervalChecker.initialDelayInSeconds";
public static final String OFFLINE_SEGMENT_VALIDATION_FREQUENCY_PERIOD =
"controller.offline.segment.validation.frequencyPeriod";
-
+ public static final String OFFLINE_SEGMENT_VALIDATION_CRON_EXPRESSION =
+ "controller.offline.segment.validation.cronExpression";
public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
"controller.segmentRelocator.initialDelayInSeconds";
public static final String REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
"controller.rebalanceChecker.initialDelayInSeconds";
public static final String TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD =
"controller.tenant.rebalance.checker.frequencyPeriod";
+ public static final String TENANT_REBALANCE_CHECKER_CRON_EXPRESSION =
+ "controller.tenant.rebalance.checker.cronExpression";
public static final String
TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
"controller.tenant.rebalance.checker.initialDelayInSeconds";
@@ -311,7 +328,8 @@ public class ControllerConf extends PinotConfiguration {
"controller.realtimeConsumerMonitor.frequencyPeriod";
public static final String RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS =
"controller.realtimeConsumerMonitor.initialDelayInSeconds";
-
+ public static final String RT_CONSUMER_MONITOR_CRON_EXPRESSION =
+ "controller.realtimeConsumerMonitor.cronExpression";
public static final String DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_PERIOD =
"-1s"; // Disabled by default
}
@@ -731,11 +749,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getRetentionControllerCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_CRON_EXPRESSION);
+ }
+
public void setRetentionControllerFrequencyInSeconds(int
retentionFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_PERIOD,
Long.toString(retentionFrequencyInSeconds) + "s");
}
+ public void setRetentionControllerCronExpression(String cronExpression) {
+ setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_CRON_EXPRESSION,
cronExpression);
+ }
+
/**
* Returns the offline segment interval checker frequency in seconds.
* Reads {@code controller.offline.segment.interval.checker.frequencyPeriod}
as a period string (e.g. "24h"),
@@ -774,11 +800,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getRealtimeSegmentValidationCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_CRON_EXPRESSION);
+ }
+
public void setRealtimeSegmentValidationFrequencyInSeconds(int
validationFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD,
Long.toString(validationFrequencyInSeconds) + "s");
}
+ public void setRealtimeSegmentValidationCronExpression(String
cronExpression) {
+
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_CRON_EXPRESSION,
cronExpression);
+ }
+
public boolean isRealtimeOffsetAutoResetBackfillEnabled() {
return
getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_ENABLED,
false);
}
@@ -793,11 +827,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getRealtimeOffsetAutoResetBackfillCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_CRON_EXPRESSION);
+ }
+
public void setRealtimeOffsetAutoResetBackfillFrequencyInSeconds(int
offsetAutoResetBackfillFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_FREQUENCY_PERIOD,
Integer.toString(offsetAutoResetBackfillFrequencyInSeconds) + "s");
}
+ public void setRealtimeOffsetAutoResetBackfillCronExpression(String
cronExpression) {
+
setProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_CRON_EXPRESSION,
cronExpression);
+ }
+
/**
* Returns the broker resource validation frequency in seconds.
* Reads {@code controller.broker.resource.validation.frequencyPeriod} as a
period string (e.g. "1h"),
@@ -814,11 +856,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getBrokerResourceValidationCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_CRON_EXPRESSION);
+ }
+
public void setBrokerResourceValidationFrequencyInSeconds(int
validationFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD,
Long.toString(validationFrequencyInSeconds) + "s");
}
+ public void setBrokerResourceValidationCronExpression(String cronExpression)
{
+
setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_CRON_EXPRESSION,
cronExpression);
+ }
+
public long getBrokerResourceValidationInitialDelayInSeconds() {
return
getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
getPeriodicTaskInitialDelayInSeconds());
@@ -833,11 +883,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getStatusCheckerCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_CRON_EXPRESSION);
+ }
+
public void setStatusCheckerFrequencyInSeconds(int
statusCheckerFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD,
Long.toString(statusCheckerFrequencyInSeconds) + "s");
}
+ public void setStatusCheckerCronExpression(String cronExpression) {
+ setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_CRON_EXPRESSION,
cronExpression);
+ }
+
public int getRebalanceCheckerFrequencyInSeconds() {
String period =
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_FREQUENCY_PERIOD,
ControllerPeriodicTasksConf.DEFAULT_REBALANCE_CHECKER_FREQUENCY_PERIOD);
@@ -847,6 +905,10 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getRebalanceCheckerCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_CRON_EXPRESSION);
+ }
+
public long getRebalanceCheckerInitialDelayInSeconds() {
return
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -861,6 +923,10 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getTenantRebalanceCheckerCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_CRON_EXPRESSION);
+ }
+
public long getTenantRebalanceCheckerInitialDelayInSeconds() {
return
getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -875,6 +941,10 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getRealtimeConsumerMonitorCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_CRON_EXPRESSION);
+ }
+
public long getRealtimeConsumerMonitorInitialDelayInSeconds() {
return
getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -889,11 +959,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getTaskMetricsEmitterCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION);
+ }
+
public void setTaskMetricsEmitterFrequencyInSeconds(int
taskMetricsEmitterFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD,
Long.toString(taskMetricsEmitterFrequencyInSeconds) + "s");
}
+ public void setTaskMetricsEmitterCronExpression(String
taskMetricsEmitterCronExpression) {
+
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION,
taskMetricsEmitterCronExpression);
+ }
+
public int getStatusCheckerWaitForPushTimeInSeconds() {
String period =
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD,
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_PERIOD);
@@ -922,11 +1000,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getSegmentRelocatorCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_CRON_EXPRESSION);
+ }
+
public void setSegmentRelocatorFrequencyInSeconds(int
segmentRelocatorFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_FREQUENCY_PERIOD,
Long.toString(segmentRelocatorFrequencyInSeconds) + "s");
}
+ public void setSegmentRelocatorCronExpression(String
segmentRelocatorCronExpression) {
+ setProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_CRON_EXPRESSION,
segmentRelocatorCronExpression);
+ }
+
public boolean getSegmentRelocatorReassignInstances() {
return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_REASSIGN_INSTANCES))
.map(Boolean::parseBoolean).orElse(false);
@@ -1060,10 +1146,18 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getStaleInstancesCleanupTaskCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION);
+ }
+
public void setStaleInstanceCleanupTaskFrequencyInSeconds(String
frequencyPeriod) {
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD,
frequencyPeriod);
}
+ public void setStaleInstancesCleanupTaskCronExpression(String
cronExpression) {
+
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION,
cronExpression);
+ }
+
public long getStaleInstanceCleanupTaskInitialDelaySeconds() {
return
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -1269,11 +1363,19 @@ public class ControllerConf extends PinotConfiguration {
return (int) convertPeriodToSeconds(period);
}
+ public String getOfflineSegmentValidationCronExpression() {
+ return
getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_VALIDATION_CRON_EXPRESSION);
+ }
+
public void setOfflineSegmentValidationFrequencyInSeconds(int
validationFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_VALIDATION_FREQUENCY_PERIOD,
TimeUtils.convertMillisToPeriod(validationFrequencyInSeconds * 1000L));
}
+ public void setOfflineSegmentValidationCronExpression(String
validationCronExpression) {
+
setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_VALIDATION_CRON_EXPRESSION,
validationCronExpression);
+ }
+
public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
return
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
getPeriodicTaskInitialDelayInSeconds());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
index b4246f05123..77488c36dff 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
@@ -50,7 +50,8 @@ public class RealtimeConsumerMonitor extends
ControllerPeriodicTask<RealtimeCons
LeadControllerManager leadControllerManager, ControllerMetrics
controllerMetrics,
ConsumingSegmentInfoReader consumingSegmentInfoReader) {
super("RealtimeConsumerMonitor",
controllerConf.getRealtimeConsumerMonitorRunFrequency(),
- controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(),
pinotHelixResourceManager,
+ controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(),
+ controllerConf.getRealtimeConsumerMonitorCronExpression(),
pinotHelixResourceManager,
leadControllerManager, controllerMetrics);
_consumingSegmentInfoReader = consumingSegmentInfoReader;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index ab20f6fb745..d65504d377b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -93,8 +93,8 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
TableSizeReader tableSizeReader) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
- config.getStatusCheckerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
- controllerMetrics);
+ config.getStatusCheckerInitialDelayInSeconds(),
config.getStatusCheckerCronExpression(),
+ pinotHelixResourceManager, leadControllerManager, controllerMetrics);
_waitForPushTimeSeconds =
config.getStatusCheckerWaitForPushTimeInSeconds();
_tableSizeReader = tableSizeReader;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
index 6bf9f248dae..e0634ccd2a0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
@@ -61,7 +61,8 @@ public class StaleInstancesCleanupTask extends
BasePeriodicTask {
public StaleInstancesCleanupTask(PinotHelixResourceManager
pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf
controllerConf, ControllerMetrics controllerMetrics) {
super(TASK_NAME,
controllerConf.getStaleInstancesCleanupTaskFrequencyInSeconds(),
- controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds());
+ controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds(),
+ controllerConf.getStaleInstancesCleanupTaskCronExpression());
_pinotHelixResourceManager = pinotHelixResourceManager;
_leadControllerManager = leadControllerManager;
_controllerMetrics = controllerMetrics;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index ce986dd5add..940797d5b8e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -163,8 +163,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo>
taskManagerStatusCache, Executor executor,
PoolingHttpClientConnectionManager connectionManager,
ResourceUtilizationManager resourceUtilizationManager) {
super("PinotTaskManager",
controllerConf.getTaskManagerFrequencyInSeconds(),
- controllerConf.getPinotTaskManagerInitialDelaySeconds(),
helixResourceManager, leadControllerManager,
- controllerMetrics);
+ controllerConf.getPinotTaskManagerInitialDelaySeconds(), null,
helixResourceManager,
+ leadControllerManager, controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
_resourceUtilizationManager = resourceUtilizationManager;
_taskManagerStatusCache = taskManagerStatusCache;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
index b562118ab10..8c0515c55b4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
@@ -73,7 +73,7 @@ public class TaskMetricsEmitter extends BasePeriodicTask {
PinotHelixTaskResourceManager helixTaskResourceManager,
LeadControllerManager leadControllerManager,
ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
super(TASK_NAME, controllerConf.getTaskMetricsEmitterFrequencyInSeconds(),
- controllerConf.getPeriodicTaskInitialDelayInSeconds());
+ controllerConf.getPeriodicTaskInitialDelayInSeconds(),
controllerConf.getTaskMetricsEmitterCronExpression());
_pinotHelixResourceManager = pinotHelixResourceManager;
_helixTaskResourceManager = helixTaskResourceManager;
_controllerMetrics = controllerMetrics;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7d2accc9934..0877672dfa6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -56,9 +56,9 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask impleme
protected Set<String> _prevLeaderOfTables = new HashSet<>();
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
long initialDelayInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
- ControllerMetrics controllerMetrics) {
- super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
+ String cronExpression, PinotHelixResourceManager
pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerMetrics
controllerMetrics) {
+ super(taskName, runFrequencyInSeconds, initialDelayInSeconds,
cronExpression);
_pinotHelixResourceManager = pinotHelixResourceManager;
_leadControllerManager = leadControllerManager;
_controllerMetrics = controllerMetrics;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index 481f990e873..51cc5194c2d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -63,8 +63,9 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
ControllerConf config, ControllerMetrics controllerMetrics) {
super(RebalanceChecker.class.getSimpleName(),
config.getRebalanceCheckerFrequencyInSeconds(),
- config.getRebalanceCheckerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
- controllerMetrics);
+ config.getRebalanceCheckerInitialDelayInSeconds(),
config.getRebalanceCheckerCronExpression(),
+ pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
_tableRebalanceManager = tableRebalanceManager;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
index 0c1eaf88c10..0edf1c9fc53 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
@@ -63,7 +63,8 @@ public class TenantRebalanceChecker extends BasePeriodicTask {
public TenantRebalanceChecker(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer
tenantRebalancer) {
super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(),
- config.getTenantRebalanceCheckerInitialDelayInSeconds());
+ config.getTenantRebalanceCheckerInitialDelayInSeconds(),
+ config.getTenantRebalanceCheckerCronExpression());
_pinotHelixResourceManager = pinotHelixResourceManager;
_tenantRebalancer = tenantRebalancer;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 579787fb6c5..7c55131ca5f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -94,8 +94,9 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
ControllerConf config, ControllerMetrics controllerMetrics,
ExecutorService executorService,
HttpClientConnectionManager connectionManager) {
super(SegmentRelocator.class.getSimpleName(),
config.getSegmentRelocatorFrequencyInSeconds(),
- config.getSegmentRelocatorInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
- controllerMetrics);
+ config.getSegmentRelocatorInitialDelayInSeconds(),
config.getSegmentRelocatorCronExpression(),
+ pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
_tableRebalanceManager = tableRebalanceManager;
_executorService = executorService;
_connectionManager = connectionManager;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9e6c3bbd8f6..55de58eae34 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -94,8 +94,9 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
BrokerServiceHelper brokerServiceHelper) {
super(TASK_NAME, config.getRetentionControllerFrequencyInSeconds(),
- config.getRetentionManagerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
- controllerMetrics);
+ config.getRetentionManagerInitialDelayInSeconds(),
config.getRetentionControllerCronExpression(),
+ pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
_untrackedSegmentDeletionEnabled =
config.getUntrackedSegmentDeletionEnabled();
_untrackedSegmentsRetentionTimeInDays =
config.getUntrackedSegmentsRetentionTimeInDays();
_agedSegmentsDeletionBatchSize = config.getAgedSegmentsDeletionBatchSize();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 7f0d1dc1564..747d139e640 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -44,8 +44,9 @@ public class BrokerResourceValidationManager extends
ControllerPeriodicTask<Brok
public BrokerResourceValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerMetrics
controllerMetrics) {
super("BrokerResourceValidationManager",
config.getBrokerResourceValidationFrequencyInSeconds(),
- config.getBrokerResourceValidationInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
- controllerMetrics);
+ config.getBrokerResourceValidationInitialDelayInSeconds(),
+ config.getBrokerResourceValidationCronExpression(),
pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
}
@Override
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
index 59f0881c3f7..c9535c1b62b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
@@ -66,8 +66,8 @@ public class OfflineSegmentValidationManager extends
ControllerPeriodicTask<Offl
LeadControllerManager leadControllerManager, ValidationMetrics
validationMetrics,
ControllerMetrics controllerMetrics, ResourceUtilizationManager
resourceUtilizationManager) {
super("OfflineSegmentValidationManager",
config.getOfflineSegmentValidationFrequencyInSeconds(),
- config.getPeriodicTaskInitialDelayInSeconds(),
pinotHelixResourceManager,
- leadControllerManager, controllerMetrics);
+ config.getPeriodicTaskInitialDelayInSeconds(),
config.getOfflineSegmentValidationCronExpression(),
+ pinotHelixResourceManager, leadControllerManager, controllerMetrics);
_validationMetrics = validationMetrics;
_segmentAutoResetOnErrorAtValidation =
config.isAutoResetErrorSegmentsOnValidationEnabled();
_resourceUtilizationManager = resourceUtilizationManager;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
index ab73ceab6b2..889be0600ee 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
@@ -56,8 +56,9 @@ public class RealtimeOffsetAutoResetManager extends
ControllerPeriodicTask<Realt
LeadControllerManager leadControllerManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ControllerMetrics controllerMetrics) {
super("RealtimeOffsetAutoResetManager",
config.getRealtimeOffsetAutoResetBackfillFrequencyInSeconds(),
- config.getRealtimeOffsetAutoResetBackfillInitialDelaySeconds(),
pinotHelixResourceManager,
- leadControllerManager, controllerMetrics);
+ config.getRealtimeOffsetAutoResetBackfillInitialDelaySeconds(),
+ config.getRealtimeOffsetAutoResetBackfillCronExpression(),
+ pinotHelixResourceManager, leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_pinotHelixResourceManager = pinotHelixResourceManager;
_tableToHandler = new ConcurrentHashMap<>();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 30e899a4548..e5bb5a871bb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -77,8 +77,9 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
ValidationMetrics validationMetrics, ControllerMetrics
controllerMetrics, StorageQuotaChecker quotaChecker,
ResourceUtilizationManager resourceUtilizationManager) {
super("RealtimeSegmentValidationManager",
config.getRealtimeSegmentValidationFrequencyInSeconds(),
- config.getRealtimeSegmentValidationManagerInitialDelaySeconds(),
pinotHelixResourceManager,
- leadControllerManager, controllerMetrics);
+ config.getRealtimeSegmentValidationManagerInitialDelaySeconds(),
+ config.getRealtimeSegmentValidationCronExpression(),
+ pinotHelixResourceManager, leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
_controllerMetrics = controllerMetrics;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index f4e0eb46b14..9e0a4f1b121 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -58,8 +58,8 @@ public class ControllerPeriodicTaskTest {
private static final String TASK_NAME = "TestTask";
private final ControllerPeriodicTask _task = new
ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
- _controllerConf.getPeriodicTaskInitialDelayInSeconds(),
_resourceManager, _leadControllerManager,
- _controllerMetrics) {
+ _controllerConf.getPeriodicTaskInitialDelayInSeconds(), null,
_resourceManager,
+ _leadControllerManager, _controllerMetrics) {
@Override
protected void setUpTask() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index c5910b27b42..e80d90da54f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.periodictask;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
protected final String _taskName;
protected final long _intervalInSeconds;
protected final long _initialDelayInSeconds;
+ protected final String _cronExpression;
protected final ReentrantLock _runLock;
// Lock used to synchronize life-cycle functions
@@ -61,11 +63,30 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
}
public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long
initialDelayInSeconds) {
+ this(taskName, runFrequencyInSeconds, initialDelayInSeconds, null);
+ }
+
+ public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long
initialDelayInSeconds,
+ @Nullable String cronExpression) {
_taskName = taskName;
- _intervalInSeconds = runFrequencyInSeconds;
- _initialDelayInSeconds = initialDelayInSeconds;
_runLock = new ReentrantLock();
_lifeCycleLock = new Object();
+ boolean hasCronScheduling = cronExpression != null &&
!cronExpression.trim().isEmpty();
+ boolean hasFrequencyScheduling = runFrequencyInSeconds > 0;
+
+ if (hasCronScheduling && hasFrequencyScheduling) {
+ LOGGER.warn("Task '{}' is configured with both a cron expression ('{}') "
+ + "and a fixed execution frequency ({}s). Preferring cron
scheduling.",
+ taskName, cronExpression, runFrequencyInSeconds);
+ _intervalInSeconds = 0;
+ _initialDelayInSeconds = 0;
+ _cronExpression = cronExpression;
+ } else {
+ _intervalInSeconds = runFrequencyInSeconds;
+ _initialDelayInSeconds = initialDelayInSeconds;
+ //this will be null/empty anyway if it's not set.
+ _cronExpression = cronExpression;
+ }
}
@Override
@@ -83,6 +104,11 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
return _initialDelayInSeconds;
}
+ @Override
+ public String getCronExpression() {
+ return _cronExpression;
+ }
+
/**
* Returns the status of the {@code started} flag. This flag will be set
after calling {@link #start()}, and reset
* after calling {@link #stop()}.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
index 4a5e120b95a..d10bccfdf37 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.periodictask;
import java.util.Properties;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -51,6 +52,13 @@ public interface PeriodicTask extends Runnable {
*/
long getInitialDelayInSeconds();
+ /**
+ * Returns the CRON expression for absolute scheduling, or null if
fixed-delay scheduling should be used.
+ * @return Cron expression
+ */
+ @Nullable
+ String getCronExpression();
+
/**
* Performs necessary setups and starts the periodic task. Should be called
before scheduling the periodic task. Can
* be called after calling {@link #stop()} to restart the periodic task.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java
new file mode 100644
index 00000000000..275d9021b2c
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.periodictask;
+
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@DisallowConcurrentExecution
+public class PeriodicTaskCronJob implements Job {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PeriodicTaskCronJob.class);
+ public static final String PERIODIC_TASK_KEY = "PeriodicTask";
+
+ public PeriodicTaskCronJob() {
+ }
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext)
+ throws JobExecutionException {
+ PeriodicTask periodicTask = (PeriodicTask) jobExecutionContext
+ .getJobDetail()
+ .getJobDataMap()
+ .get(PERIODIC_TASK_KEY);
+
+ if (periodicTask != null) {
+ try {
+ periodicTask.run();
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while running Task: {}",
periodicTask.getTaskName(), e);
+ }
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index 37b63dd6f41..534c8799468 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -28,6 +28,15 @@ import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +47,14 @@ import org.slf4j.LoggerFactory;
*/
public class PeriodicTaskScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
-
+ private static final int MAX_QUARTZ_THREADS_HARD_LIMIT = 20;
+ //configure a thread limit for the quartz scheduler to use
+ private static final int CONFIGURED_THREAD_COUNT = Math.min(
+ MAX_QUARTZ_THREADS_HARD_LIMIT,
+ Math.max(1, Runtime.getRuntime().availableProcessors())
+ );
private ScheduledExecutorService _executorService;
+ private Scheduler _scheduler;
private Map<String, PeriodicTask> _periodicTasks;
/**
@@ -57,41 +72,103 @@ public class PeriodicTaskScheduler {
/**
* Starts scheduling periodic tasks.
+ * It uses the cron expression if provided, if not, it falls back to the
default fixed delay scheduling.
+ *
*/
public synchronized void start() {
if (_executorService != null) {
LOGGER.warn("Periodic task scheduler already started");
+ return;
}
+ Preconditions.checkState(_periodicTasks != null,
+ "Periodic task scheduler has not been initialized");
if (_periodicTasks.isEmpty()) {
LOGGER.warn("No periodic task scheduled");
- } else {
- Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
- LOGGER.info("Starting periodic task scheduler with tasks: {}",
periodicTasks);
- _executorService =
Executors.newScheduledThreadPool(_periodicTasks.size());
+ return;
+ }
+
+ Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
+ LOGGER.info("Starting periodic task scheduler with tasks: {}",
periodicTasks);
+
+ for (PeriodicTask task : periodicTasks) {
+ String cron = task.getCronExpression();
+ if (cron != null && !cron.trim().isEmpty() &&
!CronExpression.isValidExpression(cron)) {
+ throw new IllegalArgumentException(
+ String.format("Invalid CRON expression '%s' for task '%s'. Halting
controller startup.",
+ cron, task.getTaskName())
+ );
+ }
+ }
+
+ boolean hasCronTasks = false;
+
+ for (PeriodicTask task : periodicTasks) {
+ String cron = task.getCronExpression();
+ if (cron != null && !cron.trim().isEmpty()) {
+ if (!CronExpression.isValidExpression(cron)) {
+ throw new IllegalArgumentException(
+ String.format("Invalid CRON expression '%s' for task '%s'. "
+ + "Halting controller startup.",
+ cron, task.getTaskName())
+ );
+ }
+ hasCronTasks = true;
+ }
+ }
+
+ if (hasCronTasks) {
+ try {
+ int periodicTaskCount = _periodicTasks.size();
+ Properties quartzProperties = getQuartzProperties(periodicTaskCount);
+ StdSchedulerFactory customSchedulerFactory = new
StdSchedulerFactory(quartzProperties);
+ _scheduler = customSchedulerFactory.getScheduler();
+ _scheduler.start();
+ } catch (SchedulerException e) {
+ throw new RuntimeException("Failed to initialize Quartz scheduler.
Halting controller startup.", e);
+ }
+ }
+
+ _executorService = Executors.newScheduledThreadPool(periodicTasks.size());
+
+ try {
for (PeriodicTask periodicTask : periodicTasks) {
periodicTask.start();
- String periodicTaskTaskName = periodicTask.getTaskName();
- long intervalInSeconds = periodicTask.getIntervalInSeconds();
- if (intervalInSeconds <= 0) {
- LOGGER.info("Skip scheduling periodic task: {} for periodic
execution (it can be manually triggered)",
- periodicTaskTaskName);
- continue;
- }
- _executorService.scheduleWithFixedDelay(() -> {
- try {
- LOGGER.info("Starting {} with running frequency of {} seconds.",
periodicTaskTaskName, intervalInSeconds);
- periodicTask.run();
- } catch (Throwable e) {
- // catch all errors to prevent subsequent executions from being
silently suppressed
- // <pre>
- // See <a
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService
- //
.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a>
- // </pre>
- LOGGER.warn("Caught exception while running Task: {}",
periodicTaskTaskName, e);
+
+ String cronExpression = periodicTask.getCronExpression();
+ String taskName = periodicTask.getTaskName();
+
+ if (cronExpression != null && !cronExpression.trim().isEmpty()) {
+ LOGGER.info("Scheduling periodic task {} with cron expression: {}",
taskName, cronExpression);
+ JobDetail jobDetail = JobBuilder.newJob(PeriodicTaskCronJob.class)
+ .withIdentity(taskName)
+ .build();
+ jobDetail.getJobDataMap().put(PeriodicTaskCronJob.PERIODIC_TASK_KEY,
periodicTask);
+
+ CronTrigger trigger = TriggerBuilder.newTrigger()
+ .withIdentity(taskName + "-CronTrigger")
+ .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
+ .build();
+
+ _scheduler.scheduleJob(jobDetail, trigger);
+ } else {
+ // Legacy fallback for blank/unset crons
+ long intervalInSeconds = periodicTask.getIntervalInSeconds();
+ if (intervalInSeconds > 0) {
+ _executorService.scheduleWithFixedDelay(() -> {
+ try {
+ periodicTask.run();
+ } catch (Throwable e) {
+ LOGGER.warn("Caught exception while running Task: {}",
taskName, e);
+ }
+ }, periodicTask.getInitialDelayInSeconds(), intervalInSeconds,
TimeUnit.SECONDS);
}
- }, periodicTask.getInitialDelayInSeconds(), intervalInSeconds,
TimeUnit.SECONDS);
+ }
}
+ } catch (Exception e) {
+ LOGGER.error("Fatal error scheduling periodic tasks. Cleaning up and
halting startup.", e);
+ this.stop();
+ throw new RuntimeException("Controller startup failed due to periodic
task scheduling error", e);
}
}
@@ -109,6 +186,16 @@ public class PeriodicTaskScheduler {
LOGGER.info("Stopping all periodic tasks: {}", _periodicTasks);
_periodicTasks.values().parallelStream().forEach(PeriodicTask::stop);
}
+
+ if (_scheduler != null) {
+ try {
+ LOGGER.info("Stopping Quartz scheduler");
+ _scheduler.shutdown(true);
+ _scheduler = null;
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to shutdown Quartz scheduler", e);
+ }
+ }
}
/// Returns true if the task exists (regardless of whether it is scheduled
to run periodically or not).
@@ -123,6 +210,12 @@ public class PeriodicTaskScheduler {
/** Execute {@link PeriodicTask} immediately on the specified table. */
public void scheduleNow(String periodicTaskName, Properties
periodicTaskProperties) {
+ //in case the executor service hasnt been initialized its better to log a
warning than
+ // throw a NPE
+ if (_executorService == null) {
+ LOGGER.warn("Cannot schedule task '{}' immediately: Scheduler is not
running.", periodicTaskName);
+ return;
+ }
// During controller deployment, each controller can have a slightly
different list of periodic tasks if we add,
// remove, or rename periodic task. To avoid this situation, we check
again (besides the check at controller API
// level) whether the periodic task exists.
@@ -152,4 +245,15 @@ public class PeriodicTaskScheduler {
}
}, 0, TimeUnit.SECONDS);
}
+
+ private static Properties getQuartzProperties(int taskCount) {
+ Properties quartzProperties = new Properties();
+ //isolating from other scheduler instances by having a different scheduler
instance name
+ quartzProperties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME,
"ControllerPeriodicTaskScheduler");
+ //final thread count that will be used.
+ int threadCount = Math.min(taskCount, CONFIGURED_THREAD_COUNT);
+ quartzProperties.put("org.quartz.threadPool.class",
"org.quartz.simpl.SimpleThreadPool");
+ quartzProperties.put("org.quartz.threadPool.threadCount",
String.valueOf(threadCount));
+ return quartzProperties;
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index d8486e8ee83..14980b59e69 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -19,17 +19,27 @@
package org.apache.pinot.core.periodictask;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
public class PeriodicTaskSchedulerTest {
@@ -205,4 +215,146 @@ public class PeriodicTaskSchedulerTest {
// Confirm that all threads requested execution, even though only half the
threads completed execution.
assertEquals(attempts.get(), numThreads);
}
+
+ @Test
+ public void testCronScheduling() throws Exception {
+ AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+ //let the frequency be 3600 seconds (1 hour) to prove that the cron job
triggered the task.
+ List<PeriodicTask> periodicTasks = List.of(new
BasePeriodicTask("CronTask", 3600L, 3600L, "0/1 * * * * ?") {
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ numTimesRunCalled.incrementAndGet();
+ }
+ });
+
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.init(periodicTasks);
+
+ try {
+ taskScheduler.start();
+ Thread.sleep(1500L);
+ } finally {
+ taskScheduler.stop();
+ }
+
+ assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered
by Quartz CRON scheduler");
+ }
+
+ @Test
+ public void testLegacyFallbackScheduling() throws Exception {
+ AtomicInteger numTimesRunCalled = new AtomicInteger();
+ //fallback to the default fixed delay method to prove it still works fine
with code changes
+ List<PeriodicTask> periodicTasks = List.of(new
BasePeriodicTask("LegacyFallbackTask", 1L, 0L, null) {
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ numTimesRunCalled.incrementAndGet();
+ }
+ });
+
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.init(periodicTasks);
+
+ try {
+ taskScheduler.start();
+ Thread.sleep(1500L);
+ } finally {
+ taskScheduler.stop();
+ }
+
+ assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered
by legacy fixed-delay scheduler");
+ }
+
+ @Test
+ public void testInvalidCronExpression() throws Exception {
+ AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+ List<PeriodicTask> periodicTasks = List.of(new
BasePeriodicTask("InvalidCronTask", 1L, 1L, "60 * * * *") {
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ numTimesRunCalled.incrementAndGet();
+ }
+ });
+
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.init(periodicTasks);
+
+ assertThrows(IllegalArgumentException.class, taskScheduler::start);
+
+ assertEquals(numTimesRunCalled.get(), 0, "Task should never run if the
CRON expression is invalid");
+ }
+
+ /**
+ * Test that the scheduler used to schedule Pinot Minion tasks
(PinotTaskManager) is not
+ * using the same quartz scheduler instance. It also tests that if we stop a
Controller cron
+ * task, Minion tasks are not stopped or interfered with.
+ * @throws Exception
+ */
+ @Test
+ public void testControllerAndMinionCronSchedulerIsolation() throws Exception
{
+ //quartz minion task scheduler
+ Scheduler minionScheduler = StdSchedulerFactory.getDefaultScheduler();
+ minionScheduler.start();
+
+ //quartz controller task scheduler
+ PeriodicTaskScheduler controllerPeriodicTaskScheduler = new
PeriodicTaskScheduler();
+
+ try {
+ String minionJobName = "testMinionTableTask";
+ String minionGroupName = "MinionTaskGroup";
+ String controllerTaskName = "ControllerTestCronTask";
+
+ JobDetail minionJob = JobBuilder.newJob(MockMinionJob.class)
+ .withIdentity(minionJobName, minionGroupName)
+ .build();
+ CronTrigger minionTrigger = TriggerBuilder.newTrigger()
+ .withIdentity("testMinionTrigger", minionGroupName)
+ .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 * * * ?"))
+ .build();
+ minionScheduler.scheduleJob(minionJob, minionTrigger);
+
+ Assert.assertTrue(minionScheduler.checkExists(minionJob.getKey()),
"Minion job should be scheduled.");
+
+ PeriodicTask controllerCronTask = new
BasePeriodicTask(controllerTaskName, 0L, 0L, "0 0/5 * * * ?") {
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ //mocking runtime execution logic.
+ }
+ };
+
controllerPeriodicTaskScheduler.init(Collections.singletonList(controllerCronTask));
+ controllerPeriodicTaskScheduler.start();
+
+
Assert.assertFalse(minionScheduler.checkExists(JobKey.jobKey(controllerTaskName)),
+ "Regression Failure: The minion scheduler should not be "
+ + "able to see the controller's task namespace!");
+
+ int totalMinionJobs =
minionScheduler.getJobKeys(GroupMatcher.anyJobGroup()).size();
+ Assert.assertEquals(totalMinionJobs, 1,
+ "The broad matcher scan on the minion scheduler should "
+ + "find exactly 1 minion job, ignoring controller tasks.");
+
+ controllerPeriodicTaskScheduler.stop();
+
+ Assert.assertTrue(minionScheduler.isStarted(),
+ "Regression Failure: Stopping the controller scheduler accidentally
tore down the minion scheduler!");
+ Assert.assertFalse(minionScheduler.isShutdown(),
+ "The minion scheduler must remain active.");
+ Assert.assertTrue(minionScheduler.checkExists(minionJob.getKey()),
+ "The scheduled minion tasks should still exist in the default
scheduler runtime.");
+ } finally {
+ if (!minionScheduler.isShutdown()) {
+ minionScheduler.shutdown(true);
+ }
+ controllerPeriodicTaskScheduler.stop();
+ }
+ }
+
+ /**
+ * Dummy job implementation needed to fulfill Quartz's verification layer.
+ */
+ public static class MockMinionJob implements Job {
+ @Override
+ public void execute(JobExecutionContext context) {
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]