This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b9a26f34ba Add support for cron based scheduling in controller 
(#18256)
3b9a26f34ba is described below

commit 3b9a26f34badee886f79c25d741436eb98fe08cf
Author: Priyanshu <[email protected]>
AuthorDate: Tue Jun 2 04:26:14 2026 +0530

    Add support for cron based scheduling in controller (#18256)
    
    * adding new configs in controllercong
    
    * adding constructors for cron expression acceptance
    
    * adding constructors for cron expression acceptance
    
    * adding quartz scheduler for cron job scheduling
    
    * fixing issues
    
    * fixing tests
    
    * fixing tests
    
    * unit tests for cron changes
    
    * removing unnecessary comments
    
    * fixing issues
    
    * fixing issues
    
    * fixing issues
    
    * fixing minor issues
    
    * fixing minor issues
    
    * fixing minor issues
    
    * improving code quality
    
    * resolving comments
    
    * fixing checkstyle violations
    
    * Delete 
pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
    
    * addressing comments
    
    * resolving comments
    
    * addressing comments
    
    * addressing comments
    
    * resolving comments
    
    * resolving comments
    
    * fixing failing CI
---
 .../apache/pinot/controller/ControllerConf.java    | 108 +++++++++++++-
 .../controller/helix/RealtimeConsumerMonitor.java  |   3 +-
 .../controller/helix/SegmentStatusChecker.java     |   4 +-
 .../core/cleanup/StaleInstancesCleanupTask.java    |   3 +-
 .../helix/core/minion/PinotTaskManager.java        |   4 +-
 .../helix/core/minion/TaskMetricsEmitter.java      |   2 +-
 .../core/periodictask/ControllerPeriodicTask.java  |   6 +-
 .../helix/core/rebalance/RebalanceChecker.java     |   5 +-
 .../rebalance/tenant/TenantRebalanceChecker.java   |   3 +-
 .../helix/core/relocation/SegmentRelocator.java    |   5 +-
 .../helix/core/retention/RetentionManager.java     |   5 +-
 .../BrokerResourceValidationManager.java           |   5 +-
 .../OfflineSegmentValidationManager.java           |   4 +-
 .../validation/RealtimeOffsetAutoResetManager.java |   5 +-
 .../RealtimeSegmentValidationManager.java          |   5 +-
 .../periodictask/ControllerPeriodicTaskTest.java   |   4 +-
 .../pinot/core/periodictask/BasePeriodicTask.java  |  30 +++-
 .../pinot/core/periodictask/PeriodicTask.java      |   8 ++
 .../core/periodictask/PeriodicTaskCronJob.java     |  53 +++++++
 .../core/periodictask/PeriodicTaskScheduler.java   | 152 ++++++++++++++++----
 .../periodictask/PeriodicTaskSchedulerTest.java    | 160 ++++++++++++++++++++-
 21 files changed, 514 insertions(+), 60 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e9ac8f6a081..c4d9b131d19 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -105,23 +105,31 @@ public class ControllerConf extends PinotConfiguration {
   public static class ControllerPeriodicTasksConf {
     // frequency configs
     public static final String RETENTION_MANAGER_FREQUENCY_PERIOD = 
"controller.retention.frequencyPeriod";
+    public static final String RETENTION_MANAGER_CRON_EXPRESSION = 
"controller.retention.cronExpression";
     public static final String 
OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_PERIOD =
         "controller.offline.segment.interval.checker.frequencyPeriod";
     public static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD =
         "controller.realtime.segment.validation.frequencyPeriod";
+    public static final String REALTIME_SEGMENT_VALIDATION_CRON_EXPRESSION =
+        "controller.realtime.segment.validation.cronExpression";
     public static final String 
REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS =
         "controller.realtime.segment.validation.initialDelayInSeconds";
     public static final String REALTIME_OFFSET_AUTO_RESET_BACKFILL_ENABLED =
         "controller.realtime.offsetAutoReset.backfill.enabled";
     public static final String 
REALTIME_OFFSET_AUTO_RESET_BACKFILL_FREQUENCY_PERIOD =
         "controller.realtime.offsetAutoReset.backfill.frequencyPeriod";
+    public static final String 
REALTIME_OFFSET_AUTO_RESET_BACKFILL_CRON_EXPRESSION =
+        "controller.realtime.offsetAutoReset.backfill.cronExpression";
     public static final String 
REALTIME_OFFSET_AUTO_RESET_BACKFILL_INITIAL_DELAY_IN_SECONDS =
         "controller.realtime.offsetAutoReset.backfill.initialDelayInSeconds";
     public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD =
         "controller.broker.resource.validation.frequencyPeriod";
+    public static final String BROKER_RESOURCE_VALIDATION_CRON_EXPRESSION =
+        "controller.broker.resource.validation.cronExpression";
     public static final String 
BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
         "controller.broker.resource.validation.initialDelayInSeconds";
     public static final String STATUS_CHECKER_FREQUENCY_PERIOD = 
"controller.statuschecker.frequencyPeriod";
+    public static final String STATUS_CHECKER_CRON_EXPRESSION = 
"controller.statuschecker.cronExpression";
     public static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD =
         "controller.statuschecker.waitForPushTimePeriod";
     public static final String TASK_MANAGER_FREQUENCY_PERIOD = 
"controller.task.frequencyPeriod";
@@ -131,6 +139,8 @@ public class ControllerConf extends PinotConfiguration {
 
     public static final String STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD =
         "controller.stale.instances.cleanup.task.frequencyPeriod";
+    public static final String STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION =
+        "controller.stale.instances.cleanup.task.cronExpression";
     public static final String 
STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
         "controller.stale.instances.cleanup.task.initialDelaySeconds";
     public static final String 
STALE_INSTANCES_CLEANUP_TASK_INSTANCES_RETENTION_PERIOD =
@@ -138,6 +148,8 @@ public class ControllerConf extends PinotConfiguration {
 
     public static final String TASK_METRICS_EMITTER_FREQUENCY_PERIOD =
         "controller.minion.task.metrics.emitter.frequencyPeriod";
+    public static final String TASK_METRICS_EMITTER_CRON_EXPRESSION =
+        "controller.minion.task.metrics.emitter.cronExpression";
 
     public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = 
"controller.task.scheduler.enabled";
     // This is the expiry for the ended tasks. Helix cleans up the task info 
from ZK after the expiry time from the
@@ -184,7 +196,7 @@ public class ControllerConf extends PinotConfiguration {
     public static final boolean DEFAULT_CONCURRENT_SCHEDULING_ENABLED = false;
 
     public static final String SEGMENT_RELOCATOR_FREQUENCY_PERIOD = 
"controller.segment.relocator.frequencyPeriod";
-
+    public static final String SEGMENT_RELOCATOR_CRON_EXPRESSION = 
"controller.segment.relocator.cronExpression";
     public static final String SEGMENT_RELOCATOR_REASSIGN_INSTANCES = 
"controller.segment.relocator.reassignInstances";
     public static final String SEGMENT_RELOCATOR_BOOTSTRAP = 
"controller.segment.relocator.bootstrap";
     public static final String SEGMENT_RELOCATOR_DOWNTIME = 
"controller.segment.relocator.downtime";
@@ -211,10 +223,12 @@ public class ControllerConf extends PinotConfiguration {
         "controller.segmentRelocator.batchSizePerServer";
 
     public static final String REBALANCE_CHECKER_FREQUENCY_PERIOD = 
"controller.rebalance.checker.frequencyPeriod";
+    public static final String REBALANCE_CHECKER_CRON_EXPRESSION = 
"controller.rebalance.checker.cronExpression";
     // Because segment level validation is expensive and requires heavy ZK 
access, we run segment level validation
     // with a separate interval
     public static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_PERIOD =
         "controller.segment.level.validation.intervalPeriod";
+
     public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
         "controller.segment.error.autoReset";
     public static final String ENABLE_PARTIAL_OFFLINE_REPLICA_REPAIR =
@@ -230,13 +244,16 @@ public class ControllerConf extends PinotConfiguration {
         "controller.offlineSegmentIntervalChecker.initialDelayInSeconds";
     public static final String OFFLINE_SEGMENT_VALIDATION_FREQUENCY_PERIOD =
         "controller.offline.segment.validation.frequencyPeriod";
-
+    public static final String OFFLINE_SEGMENT_VALIDATION_CRON_EXPRESSION =
+        "controller.offline.segment.validation.cronExpression";
     public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
         "controller.segmentRelocator.initialDelayInSeconds";
     public static final String REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
         "controller.rebalanceChecker.initialDelayInSeconds";
     public static final String TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD =
         "controller.tenant.rebalance.checker.frequencyPeriod";
+    public static final String TENANT_REBALANCE_CHECKER_CRON_EXPRESSION =
+        "controller.tenant.rebalance.checker.cronExpression";
     public static final String 
TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
         "controller.tenant.rebalance.checker.initialDelayInSeconds";
 
@@ -311,7 +328,8 @@ public class ControllerConf extends PinotConfiguration {
         "controller.realtimeConsumerMonitor.frequencyPeriod";
     public static final String RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS =
         "controller.realtimeConsumerMonitor.initialDelayInSeconds";
-
+    public static final String RT_CONSUMER_MONITOR_CRON_EXPRESSION =
+        "controller.realtimeConsumerMonitor.cronExpression";
     public static final String DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_PERIOD = 
"-1s"; // Disabled by default
   }
 
@@ -731,11 +749,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getRetentionControllerCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_CRON_EXPRESSION);
+  }
+
   public void setRetentionControllerFrequencyInSeconds(int 
retentionFrequencyInSeconds) {
     setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_PERIOD,
         Long.toString(retentionFrequencyInSeconds) + "s");
   }
 
+  public void setRetentionControllerCronExpression(String cronExpression) {
+    setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_CRON_EXPRESSION, 
cronExpression);
+  }
+
   /**
    * Returns the offline segment interval checker frequency in seconds.
    * Reads {@code controller.offline.segment.interval.checker.frequencyPeriod} 
as a period string (e.g. "24h"),
@@ -774,11 +800,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getRealtimeSegmentValidationCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_CRON_EXPRESSION);
+  }
+
   public void setRealtimeSegmentValidationFrequencyInSeconds(int 
validationFrequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD,
         Long.toString(validationFrequencyInSeconds) + "s");
   }
 
+  public void setRealtimeSegmentValidationCronExpression(String 
cronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_CRON_EXPRESSION,
 cronExpression);
+  }
+
   public boolean isRealtimeOffsetAutoResetBackfillEnabled() {
     return 
getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_ENABLED,
 false);
   }
@@ -793,11 +827,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getRealtimeOffsetAutoResetBackfillCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_CRON_EXPRESSION);
+  }
+
   public void setRealtimeOffsetAutoResetBackfillFrequencyInSeconds(int 
offsetAutoResetBackfillFrequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_FREQUENCY_PERIOD,
         Integer.toString(offsetAutoResetBackfillFrequencyInSeconds) + "s");
   }
 
+  public void setRealtimeOffsetAutoResetBackfillCronExpression(String 
cronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.REALTIME_OFFSET_AUTO_RESET_BACKFILL_CRON_EXPRESSION,
 cronExpression);
+  }
+
   /**
    * Returns the broker resource validation frequency in seconds.
    * Reads {@code controller.broker.resource.validation.frequencyPeriod} as a 
period string (e.g. "1h"),
@@ -814,11 +856,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getBrokerResourceValidationCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_CRON_EXPRESSION);
+  }
+
   public void setBrokerResourceValidationFrequencyInSeconds(int 
validationFrequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD,
         Long.toString(validationFrequencyInSeconds) + "s");
   }
 
+  public void setBrokerResourceValidationCronExpression(String cronExpression) 
{
+    
setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_CRON_EXPRESSION,
 cronExpression);
+  }
+
   public long getBrokerResourceValidationInitialDelayInSeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
         getPeriodicTaskInitialDelayInSeconds());
@@ -833,11 +883,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getStatusCheckerCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_CRON_EXPRESSION);
+  }
+
   public void setStatusCheckerFrequencyInSeconds(int 
statusCheckerFrequencyInSeconds) {
     setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD,
         Long.toString(statusCheckerFrequencyInSeconds) + "s");
   }
 
+  public void setStatusCheckerCronExpression(String cronExpression) {
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_CRON_EXPRESSION, 
cronExpression);
+  }
+
   public int getRebalanceCheckerFrequencyInSeconds() {
     String period = 
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_FREQUENCY_PERIOD,
         
ControllerPeriodicTasksConf.DEFAULT_REBALANCE_CHECKER_FREQUENCY_PERIOD);
@@ -847,6 +905,10 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getRebalanceCheckerCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_CRON_EXPRESSION);
+  }
+
   public long getRebalanceCheckerInitialDelayInSeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -861,6 +923,10 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getTenantRebalanceCheckerCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_CRON_EXPRESSION);
+  }
+
   public long getTenantRebalanceCheckerInitialDelayInSeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.TENANT_REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -875,6 +941,10 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getRealtimeConsumerMonitorCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_CRON_EXPRESSION);
+  }
+
   public long getRealtimeConsumerMonitorInitialDelayInSeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -889,11 +959,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getTaskMetricsEmitterCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION);
+  }
+
   public void setTaskMetricsEmitterFrequencyInSeconds(int 
taskMetricsEmitterFrequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD,
         Long.toString(taskMetricsEmitterFrequencyInSeconds) + "s");
   }
 
+  public void setTaskMetricsEmitterCronExpression(String 
taskMetricsEmitterCronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_CRON_EXPRESSION, 
taskMetricsEmitterCronExpression);
+  }
+
   public int getStatusCheckerWaitForPushTimeInSeconds() {
     String period = 
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD,
         
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_PERIOD);
@@ -922,11 +1000,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getSegmentRelocatorCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_CRON_EXPRESSION);
+  }
+
   public void setSegmentRelocatorFrequencyInSeconds(int 
segmentRelocatorFrequencyInSeconds) {
     setProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_FREQUENCY_PERIOD,
         Long.toString(segmentRelocatorFrequencyInSeconds) + "s");
   }
 
+  public void setSegmentRelocatorCronExpression(String 
segmentRelocatorCronExpression) {
+    setProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_CRON_EXPRESSION, 
segmentRelocatorCronExpression);
+  }
+
   public boolean getSegmentRelocatorReassignInstances() {
     return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_REASSIGN_INSTANCES))
         .map(Boolean::parseBoolean).orElse(false);
@@ -1060,10 +1146,18 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getStaleInstancesCleanupTaskCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION);
+  }
+
   public void setStaleInstanceCleanupTaskFrequencyInSeconds(String 
frequencyPeriod) {
     
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_FREQUENCY_PERIOD,
 frequencyPeriod);
   }
 
+  public void setStaleInstancesCleanupTaskCronExpression(String 
cronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_CRON_EXPRESSION,
 cronExpression);
+  }
+
   public long getStaleInstanceCleanupTaskInitialDelaySeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.STALE_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -1269,11 +1363,19 @@ public class ControllerConf extends PinotConfiguration {
     return (int) convertPeriodToSeconds(period);
   }
 
+  public String getOfflineSegmentValidationCronExpression() {
+    return 
getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_VALIDATION_CRON_EXPRESSION);
+  }
+
   public void setOfflineSegmentValidationFrequencyInSeconds(int 
validationFrequencyInSeconds) {
     
setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_VALIDATION_FREQUENCY_PERIOD,
         TimeUtils.convertMillisToPeriod(validationFrequencyInSeconds * 1000L));
   }
 
+  public void setOfflineSegmentValidationCronExpression(String 
validationCronExpression) {
+    
setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_VALIDATION_CRON_EXPRESSION,
 validationCronExpression);
+  }
+
   public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
         getPeriodicTaskInitialDelayInSeconds());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
index b4246f05123..77488c36dff 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
@@ -50,7 +50,8 @@ public class RealtimeConsumerMonitor extends 
ControllerPeriodicTask<RealtimeCons
       LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics,
       ConsumingSegmentInfoReader consumingSegmentInfoReader) {
     super("RealtimeConsumerMonitor", 
controllerConf.getRealtimeConsumerMonitorRunFrequency(),
-        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), 
pinotHelixResourceManager,
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(),
+        controllerConf.getRealtimeConsumerMonitorCronExpression(), 
pinotHelixResourceManager,
         leadControllerManager, controllerMetrics);
     _consumingSegmentInfoReader = consumingSegmentInfoReader;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index ab20f6fb745..d65504d377b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -93,8 +93,8 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
       LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
       TableSizeReader tableSizeReader) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
-        config.getStatusCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
-        controllerMetrics);
+            config.getStatusCheckerInitialDelayInSeconds(), 
config.getStatusCheckerCronExpression(),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
     _waitForPushTimeSeconds = 
config.getStatusCheckerWaitForPushTimeInSeconds();
     _tableSizeReader = tableSizeReader;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
index 6bf9f248dae..e0634ccd2a0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
@@ -61,7 +61,8 @@ public class StaleInstancesCleanupTask extends 
BasePeriodicTask {
   public StaleInstancesCleanupTask(PinotHelixResourceManager 
pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, ControllerConf 
controllerConf, ControllerMetrics controllerMetrics) {
     super(TASK_NAME, 
controllerConf.getStaleInstancesCleanupTaskFrequencyInSeconds(),
-        controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds());
+        controllerConf.getStaleInstanceCleanupTaskInitialDelaySeconds(),
+        controllerConf.getStaleInstancesCleanupTaskCronExpression());
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index ce986dd5add..940797d5b8e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -163,8 +163,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> 
taskManagerStatusCache, Executor executor,
       PoolingHttpClientConnectionManager connectionManager, 
ResourceUtilizationManager resourceUtilizationManager) {
     super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(),
-        controllerConf.getPinotTaskManagerInitialDelaySeconds(), 
helixResourceManager, leadControllerManager,
-        controllerMetrics);
+            controllerConf.getPinotTaskManagerInitialDelaySeconds(), null, 
helixResourceManager,
+        leadControllerManager, controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _resourceUtilizationManager = resourceUtilizationManager;
     _taskManagerStatusCache = taskManagerStatusCache;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
index b562118ab10..8c0515c55b4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
@@ -73,7 +73,7 @@ public class TaskMetricsEmitter extends BasePeriodicTask {
       PinotHelixTaskResourceManager helixTaskResourceManager, 
LeadControllerManager leadControllerManager,
       ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
     super(TASK_NAME, controllerConf.getTaskMetricsEmitterFrequencyInSeconds(),
-        controllerConf.getPeriodicTaskInitialDelayInSeconds());
+        controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
controllerConf.getTaskMetricsEmitterCronExpression());
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _helixTaskResourceManager = helixTaskResourceManager;
     _controllerMetrics = controllerMetrics;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7d2accc9934..0877672dfa6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -56,9 +56,9 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask impleme
   protected Set<String> _prevLeaderOfTables = new HashSet<>();
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, 
long initialDelayInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager, 
LeadControllerManager leadControllerManager,
-      ControllerMetrics controllerMetrics) {
-    super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
+      String cronExpression, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics) {
+    super(taskName, runFrequencyInSeconds, initialDelayInSeconds, 
cronExpression);
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index 481f990e873..51cc5194c2d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -63,8 +63,9 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
       PinotHelixResourceManager pinotHelixResourceManager, 
LeadControllerManager leadControllerManager,
       ControllerConf config, ControllerMetrics controllerMetrics) {
     super(RebalanceChecker.class.getSimpleName(), 
config.getRebalanceCheckerFrequencyInSeconds(),
-        config.getRebalanceCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
-        controllerMetrics);
+            config.getRebalanceCheckerInitialDelayInSeconds(), 
config.getRebalanceCheckerCronExpression(),
+        pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _tableRebalanceManager = tableRebalanceManager;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
index 0c1eaf88c10..0edf1c9fc53 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java
@@ -63,7 +63,8 @@ public class TenantRebalanceChecker extends BasePeriodicTask {
   public TenantRebalanceChecker(ControllerConf config,
       PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer 
tenantRebalancer) {
     super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(),
-        config.getTenantRebalanceCheckerInitialDelayInSeconds());
+        config.getTenantRebalanceCheckerInitialDelayInSeconds(),
+        config.getTenantRebalanceCheckerCronExpression());
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _tenantRebalancer = tenantRebalancer;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 579787fb6c5..7c55131ca5f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -94,8 +94,9 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
       ControllerConf config, ControllerMetrics controllerMetrics, 
ExecutorService executorService,
       HttpClientConnectionManager connectionManager) {
     super(SegmentRelocator.class.getSimpleName(), 
config.getSegmentRelocatorFrequencyInSeconds(),
-        config.getSegmentRelocatorInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
-        controllerMetrics);
+            config.getSegmentRelocatorInitialDelayInSeconds(), 
config.getSegmentRelocatorCronExpression(),
+        pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _tableRebalanceManager = tableRebalanceManager;
     _executorService = executorService;
     _connectionManager = connectionManager;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9e6c3bbd8f6..55de58eae34 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -94,8 +94,9 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
       LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
       BrokerServiceHelper brokerServiceHelper) {
     super(TASK_NAME, config.getRetentionControllerFrequencyInSeconds(),
-        config.getRetentionManagerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
-        controllerMetrics);
+            config.getRetentionManagerInitialDelayInSeconds(), 
config.getRetentionControllerCronExpression(),
+        pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _untrackedSegmentDeletionEnabled = 
config.getUntrackedSegmentDeletionEnabled();
     _untrackedSegmentsRetentionTimeInDays = 
config.getUntrackedSegmentsRetentionTimeInDays();
     _agedSegmentsDeletionBatchSize = config.getAgedSegmentsDeletionBatchSize();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 7f0d1dc1564..747d139e640 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -44,8 +44,9 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask<Brok
   public BrokerResourceValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics) {
     super("BrokerResourceValidationManager", 
config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getBrokerResourceValidationInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
-        controllerMetrics);
+            config.getBrokerResourceValidationInitialDelayInSeconds(),
+        config.getBrokerResourceValidationCronExpression(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
   }
 
   @Override
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
index 59f0881c3f7..c9535c1b62b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentValidationManager.java
@@ -66,8 +66,8 @@ public class OfflineSegmentValidationManager extends 
ControllerPeriodicTask<Offl
       LeadControllerManager leadControllerManager, ValidationMetrics 
validationMetrics,
       ControllerMetrics controllerMetrics, ResourceUtilizationManager 
resourceUtilizationManager) {
     super("OfflineSegmentValidationManager", 
config.getOfflineSegmentValidationFrequencyInSeconds(),
-        config.getPeriodicTaskInitialDelayInSeconds(), 
pinotHelixResourceManager,
-        leadControllerManager, controllerMetrics);
+            config.getPeriodicTaskInitialDelayInSeconds(), 
config.getOfflineSegmentValidationCronExpression(),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
     _validationMetrics = validationMetrics;
     _segmentAutoResetOnErrorAtValidation = 
config.isAutoResetErrorSegmentsOnValidationEnabled();
     _resourceUtilizationManager = resourceUtilizationManager;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
index ab73ceab6b2..889be0600ee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java
@@ -56,8 +56,9 @@ public class RealtimeOffsetAutoResetManager extends 
ControllerPeriodicTask<Realt
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
       ControllerMetrics controllerMetrics) {
     super("RealtimeOffsetAutoResetManager", 
config.getRealtimeOffsetAutoResetBackfillFrequencyInSeconds(),
-        config.getRealtimeOffsetAutoResetBackfillInitialDelaySeconds(), 
pinotHelixResourceManager,
-        leadControllerManager, controllerMetrics);
+            config.getRealtimeOffsetAutoResetBackfillInitialDelaySeconds(),
+        config.getRealtimeOffsetAutoResetBackfillCronExpression(),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _tableToHandler = new ConcurrentHashMap<>();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 30e899a4548..e5bb5a871bb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -77,8 +77,9 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics, StorageQuotaChecker quotaChecker,
       ResourceUtilizationManager resourceUtilizationManager) {
     super("RealtimeSegmentValidationManager", 
config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), 
pinotHelixResourceManager,
-        leadControllerManager, controllerMetrics);
+            config.getRealtimeSegmentValidationManagerInitialDelaySeconds(),
+        config.getRealtimeSegmentValidationCronExpression(),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
     _controllerMetrics = controllerMetrics;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index f4e0eb46b14..9e0a4f1b121 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -58,8 +58,8 @@ public class ControllerPeriodicTaskTest {
   private static final String TASK_NAME = "TestTask";
 
   private final ControllerPeriodicTask _task = new 
ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
-      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
_resourceManager, _leadControllerManager,
-      _controllerMetrics) {
+        _controllerConf.getPeriodicTaskInitialDelayInSeconds(), null, 
_resourceManager,
+      _leadControllerManager, _controllerMetrics) {
 
     @Override
     protected void setUpTask() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index c5910b27b42..e80d90da54f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.periodictask;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
   protected final String _taskName;
   protected final long _intervalInSeconds;
   protected final long _initialDelayInSeconds;
+  protected final String _cronExpression;
   protected final ReentrantLock _runLock;
 
   // Lock used to synchronize life-cycle functions
@@ -61,11 +63,30 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
   }
 
   public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long 
initialDelayInSeconds) {
+    this(taskName, runFrequencyInSeconds, initialDelayInSeconds, null);
+  }
+
+  public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long 
initialDelayInSeconds,
+      @Nullable String cronExpression) {
     _taskName = taskName;
-    _intervalInSeconds = runFrequencyInSeconds;
-    _initialDelayInSeconds = initialDelayInSeconds;
     _runLock = new ReentrantLock();
     _lifeCycleLock = new Object();
+    boolean hasCronScheduling = cronExpression != null && 
!cronExpression.trim().isEmpty();
+    boolean hasFrequencyScheduling = runFrequencyInSeconds > 0;
+
+    if (hasCronScheduling && hasFrequencyScheduling) {
+      LOGGER.warn("Task '{}' is configured with both a cron expression ('{}') "
+              + "and a fixed execution frequency ({}s). Preferring cron 
scheduling.",
+          taskName, cronExpression, runFrequencyInSeconds);
+      _intervalInSeconds = 0;
+      _initialDelayInSeconds = 0;
+      _cronExpression = cronExpression;
+    } else {
+      _intervalInSeconds = runFrequencyInSeconds;
+      _initialDelayInSeconds = initialDelayInSeconds;
+      //this will be null/empty anyway if it's not set.
+      _cronExpression = cronExpression;
+    }
   }
 
   @Override
@@ -83,6 +104,11 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
     return _initialDelayInSeconds;
   }
 
+  @Override
+  public String getCronExpression() {
+    return _cronExpression;
+  }
+
   /**
    * Returns the status of the {@code started} flag. This flag will be set 
after calling {@link #start()}, and reset
    * after calling {@link #stop()}.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
index 4a5e120b95a..d10bccfdf37 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.periodictask;
 
 import java.util.Properties;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 
@@ -51,6 +52,13 @@ public interface PeriodicTask extends Runnable {
    */
   long getInitialDelayInSeconds();
 
+  /**
+   * Returns the CRON expression for absolute scheduling, or null if 
fixed-delay scheduling should be used.
+   * @return Cron expression
+   */
+  @Nullable
+  String getCronExpression();
+
   /**
    * Performs necessary setups and starts the periodic task. Should be called 
before scheduling the periodic task. Can
    * be called after calling {@link #stop()} to restart the periodic task.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java
new file mode 100644
index 00000000000..275d9021b2c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskCronJob.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.periodictask;
+
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@DisallowConcurrentExecution
+public class PeriodicTaskCronJob implements Job {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicTaskCronJob.class);
+  public static final String PERIODIC_TASK_KEY = "PeriodicTask";
+
+  public PeriodicTaskCronJob() {
+  }
+
+  @Override
+  public void execute(JobExecutionContext jobExecutionContext)
+      throws JobExecutionException {
+    PeriodicTask periodicTask = (PeriodicTask) jobExecutionContext
+        .getJobDetail()
+        .getJobDataMap()
+        .get(PERIODIC_TASK_KEY);
+
+    if (periodicTask != null) {
+      try {
+        periodicTask.run();
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while running Task: {}", 
periodicTask.getTaskName(), e);
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index 37b63dd6f41..534c8799468 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -28,6 +28,15 @@ import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +47,14 @@ import org.slf4j.LoggerFactory;
  */
 public class PeriodicTaskScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
-
+  private static final int MAX_QUARTZ_THREADS_HARD_LIMIT = 20;
+  //configure a thread limit for the quartz scheduler to use
+  private static final int CONFIGURED_THREAD_COUNT = Math.min(
+      MAX_QUARTZ_THREADS_HARD_LIMIT,
+      Math.max(1, Runtime.getRuntime().availableProcessors())
+  );
   private ScheduledExecutorService _executorService;
+  private Scheduler _scheduler;
   private Map<String, PeriodicTask> _periodicTasks;
 
   /**
@@ -57,41 +72,103 @@ public class PeriodicTaskScheduler {
 
   /**
    * Starts scheduling periodic tasks.
+   * It uses the cron expression if provided, if not, it falls back to the 
default fixed delay scheduling.
+   *
    */
   public synchronized void start() {
     if (_executorService != null) {
       LOGGER.warn("Periodic task scheduler already started");
+      return;
     }
+    Preconditions.checkState(_periodicTasks != null,
+        "Periodic task scheduler has not been initialized");
 
     if (_periodicTasks.isEmpty()) {
       LOGGER.warn("No periodic task scheduled");
-    } else {
-      Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
-      LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
-      _executorService = 
Executors.newScheduledThreadPool(_periodicTasks.size());
+      return;
+    }
+
+    Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
+    LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
+
+    for (PeriodicTask task : periodicTasks) {
+      String cron = task.getCronExpression();
+      if (cron != null && !cron.trim().isEmpty() && 
!CronExpression.isValidExpression(cron)) {
+        throw new IllegalArgumentException(
+            String.format("Invalid CRON expression '%s' for task '%s'. Halting 
controller startup.",
+                cron, task.getTaskName())
+        );
+      }
+    }
+
+    boolean hasCronTasks = false;
+
+    for (PeriodicTask task : periodicTasks) {
+      String cron = task.getCronExpression();
+      if (cron != null && !cron.trim().isEmpty()) {
+        if (!CronExpression.isValidExpression(cron)) {
+          throw new IllegalArgumentException(
+              String.format("Invalid CRON expression '%s' for task '%s'. "
+                      + "Halting controller startup.",
+                  cron, task.getTaskName())
+          );
+        }
+        hasCronTasks = true;
+      }
+    }
+
+    if (hasCronTasks) {
+      try {
+        int periodicTaskCount = _periodicTasks.size();
+        Properties quartzProperties = getQuartzProperties(periodicTaskCount);
+        StdSchedulerFactory customSchedulerFactory = new 
StdSchedulerFactory(quartzProperties);
+        _scheduler = customSchedulerFactory.getScheduler();
+        _scheduler.start();
+      } catch (SchedulerException e) {
+        throw new RuntimeException("Failed to initialize Quartz scheduler. 
Halting controller startup.", e);
+      }
+    }
+
+    _executorService = Executors.newScheduledThreadPool(periodicTasks.size());
+
+    try {
       for (PeriodicTask periodicTask : periodicTasks) {
         periodicTask.start();
-        String periodicTaskTaskName = periodicTask.getTaskName();
-        long intervalInSeconds = periodicTask.getIntervalInSeconds();
-        if (intervalInSeconds <= 0) {
-          LOGGER.info("Skip scheduling periodic task: {} for periodic 
execution (it can be manually triggered)",
-              periodicTaskTaskName);
-          continue;
-        }
-        _executorService.scheduleWithFixedDelay(() -> {
-          try {
-            LOGGER.info("Starting {} with running frequency of {} seconds.", 
periodicTaskTaskName, intervalInSeconds);
-            periodicTask.run();
-          } catch (Throwable e) {
-            // catch all errors to prevent subsequent executions from being 
silently suppressed
-            // <pre>
-            // See <a 
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService
-            // 
.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a>
-            // </pre>
-            LOGGER.warn("Caught exception while running Task: {}", 
periodicTaskTaskName, e);
+
+        String cronExpression = periodicTask.getCronExpression();
+        String taskName = periodicTask.getTaskName();
+
+        if (cronExpression != null && !cronExpression.trim().isEmpty()) {
+          LOGGER.info("Scheduling periodic task {} with cron expression: {}", 
taskName, cronExpression);
+          JobDetail jobDetail = JobBuilder.newJob(PeriodicTaskCronJob.class)
+              .withIdentity(taskName)
+              .build();
+          jobDetail.getJobDataMap().put(PeriodicTaskCronJob.PERIODIC_TASK_KEY, 
periodicTask);
+
+          CronTrigger trigger = TriggerBuilder.newTrigger()
+              .withIdentity(taskName + "-CronTrigger")
+              .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
+              .build();
+
+          _scheduler.scheduleJob(jobDetail, trigger);
+        } else {
+          // Legacy fallback for blank/unset crons
+          long intervalInSeconds = periodicTask.getIntervalInSeconds();
+          if (intervalInSeconds > 0) {
+            _executorService.scheduleWithFixedDelay(() -> {
+              try {
+                periodicTask.run();
+              } catch (Throwable e) {
+                LOGGER.warn("Caught exception while running Task: {}", 
taskName, e);
+              }
+            }, periodicTask.getInitialDelayInSeconds(), intervalInSeconds, 
TimeUnit.SECONDS);
           }
-        }, periodicTask.getInitialDelayInSeconds(), intervalInSeconds, 
TimeUnit.SECONDS);
+        }
       }
+    } catch (Exception e) {
+      LOGGER.error("Fatal error scheduling periodic tasks. Cleaning up and 
halting startup.", e);
+      this.stop();
+      throw new RuntimeException("Controller startup failed due to periodic 
task scheduling error", e);
     }
   }
 
@@ -109,6 +186,16 @@ public class PeriodicTaskScheduler {
       LOGGER.info("Stopping all periodic tasks: {}", _periodicTasks);
       _periodicTasks.values().parallelStream().forEach(PeriodicTask::stop);
     }
+
+    if (_scheduler != null) {
+      try {
+        LOGGER.info("Stopping Quartz scheduler");
+        _scheduler.shutdown(true);
+        _scheduler = null;
+      } catch (SchedulerException e) {
+        LOGGER.error("Failed to shutdown Quartz scheduler", e);
+      }
+    }
   }
 
   /// Returns true if the task exists (regardless of whether it is scheduled 
to run periodically or not).
@@ -123,6 +210,12 @@ public class PeriodicTaskScheduler {
 
   /** Execute {@link PeriodicTask} immediately on the specified table. */
   public void scheduleNow(String periodicTaskName, Properties 
periodicTaskProperties) {
+    //in case the executor service hasnt been initialized its better to log a 
warning than
+    // throw a NPE
+    if (_executorService == null) {
+      LOGGER.warn("Cannot schedule task '{}' immediately: Scheduler is not 
running.", periodicTaskName);
+      return;
+    }
     // During controller deployment, each controller can have a slightly 
different list of periodic tasks if we add,
     // remove, or rename periodic task. To avoid this situation, we check 
again (besides the check at controller API
     // level) whether the periodic task exists.
@@ -152,4 +245,15 @@ public class PeriodicTaskScheduler {
       }
     }, 0, TimeUnit.SECONDS);
   }
+
+  private static Properties getQuartzProperties(int taskCount) {
+    Properties quartzProperties = new Properties();
+    //isolating from other scheduler instances by having a different scheduler 
instance name
+    quartzProperties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, 
"ControllerPeriodicTaskScheduler");
+    //final thread count that will be used.
+    int threadCount = Math.min(taskCount, CONFIGURED_THREAD_COUNT);
+    quartzProperties.put("org.quartz.threadPool.class", 
"org.quartz.simpl.SimpleThreadPool");
+    quartzProperties.put("org.quartz.threadPool.threadCount", 
String.valueOf(threadCount));
+    return quartzProperties;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index d8486e8ee83..14980b59e69 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -19,17 +19,27 @@
 package org.apache.pinot.core.periodictask;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 
 public class PeriodicTaskSchedulerTest {
@@ -205,4 +215,146 @@ public class PeriodicTaskSchedulerTest {
     // Confirm that all threads requested execution, even though only half the 
threads completed execution.
     assertEquals(attempts.get(), numThreads);
   }
+
+  @Test
+  public void testCronScheduling() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    //let the frequency be 3600 seconds (1 hour) to prove that the cron job 
triggered the task.
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("CronTask", 3600L, 3600L, "0/1 * * * * ?") {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+
+    try {
+      taskScheduler.start();
+      Thread.sleep(1500L);
+    } finally {
+      taskScheduler.stop();
+    }
+
+    assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered 
by Quartz CRON scheduler");
+  }
+
+  @Test
+  public void testLegacyFallbackScheduling() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+    //fallback to the default fixed delay method to prove it still works fine 
with code changes
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("LegacyFallbackTask", 1L, 0L, null) {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+
+    try {
+      taskScheduler.start();
+      Thread.sleep(1500L);
+    } finally {
+      taskScheduler.stop();
+    }
+
+    assertTrue(numTimesRunCalled.get() >= 1, "Task should have been triggered 
by legacy fixed-delay scheduler");
+  }
+
+  @Test
+  public void testInvalidCronExpression() throws Exception {
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("InvalidCronTask", 1L, 1L, "60 * * * *") {
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        numTimesRunCalled.incrementAndGet();
+      }
+    });
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+
+    assertThrows(IllegalArgumentException.class, taskScheduler::start);
+
+    assertEquals(numTimesRunCalled.get(), 0, "Task should never run if the 
CRON expression is invalid");
+  }
+
+  /**
+   * Test that the scheduler used to schedule Pinot Minion tasks 
(PinotTaskManager) is not
+   * using the same quartz scheduler instance. It also tests that if we stop a 
Controller cron
+   * task, Minion tasks are not stopped or interfered with.
+   * @throws Exception
+   */
+  @Test
+  public void testControllerAndMinionCronSchedulerIsolation() throws Exception 
{
+    //quartz minion task scheduler
+    Scheduler minionScheduler = StdSchedulerFactory.getDefaultScheduler();
+    minionScheduler.start();
+
+    //quartz controller task scheduler
+    PeriodicTaskScheduler controllerPeriodicTaskScheduler = new 
PeriodicTaskScheduler();
+
+    try {
+      String minionJobName = "testMinionTableTask";
+      String minionGroupName = "MinionTaskGroup";
+      String controllerTaskName = "ControllerTestCronTask";
+
+      JobDetail minionJob = JobBuilder.newJob(MockMinionJob.class)
+          .withIdentity(minionJobName, minionGroupName)
+          .build();
+      CronTrigger minionTrigger = TriggerBuilder.newTrigger()
+          .withIdentity("testMinionTrigger", minionGroupName)
+          .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 * * * ?"))
+          .build();
+      minionScheduler.scheduleJob(minionJob, minionTrigger);
+
+      Assert.assertTrue(minionScheduler.checkExists(minionJob.getKey()), 
"Minion job should be scheduled.");
+
+      PeriodicTask controllerCronTask = new 
BasePeriodicTask(controllerTaskName, 0L, 0L, "0 0/5 * * * ?") {
+        @Override
+        protected void runTask(Properties periodicTaskProperties) {
+          //mocking runtime execution logic.
+        }
+      };
+      
controllerPeriodicTaskScheduler.init(Collections.singletonList(controllerCronTask));
+      controllerPeriodicTaskScheduler.start();
+
+      
Assert.assertFalse(minionScheduler.checkExists(JobKey.jobKey(controllerTaskName)),
+          "Regression Failure: The minion scheduler should not be "
+              + "able to see the controller's task namespace!");
+
+      int totalMinionJobs = 
minionScheduler.getJobKeys(GroupMatcher.anyJobGroup()).size();
+      Assert.assertEquals(totalMinionJobs, 1,
+          "The broad matcher scan on the minion scheduler should "
+              + "find exactly 1 minion job, ignoring controller tasks.");
+
+      controllerPeriodicTaskScheduler.stop();
+
+      Assert.assertTrue(minionScheduler.isStarted(),
+          "Regression Failure: Stopping the controller scheduler accidentally 
tore down the minion scheduler!");
+      Assert.assertFalse(minionScheduler.isShutdown(),
+          "The minion scheduler must remain active.");
+      Assert.assertTrue(minionScheduler.checkExists(minionJob.getKey()),
+          "The scheduled minion tasks should still exist in the default 
scheduler runtime.");
+    } finally {
+      if (!minionScheduler.isShutdown()) {
+        minionScheduler.shutdown(true);
+      }
+      controllerPeriodicTaskScheduler.stop();
+    }
+  }
+
+  /**
+   * Dummy job implementation needed to fulfill Quartz's verification layer.
+   */
+  public static class MockMinionJob implements Job {
+    @Override
+    public void execute(JobExecutionContext context) {
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to