ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r511941702



##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##########
@@ -167,6 +169,50 @@ public void run()
     );
   }
 
+  public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, 
Callable<Signal> callable)
+  {
+    scheduleAtFixedRate(exec, rate, rate, callable);
+  }
+
+  /**
+   * Run callable once every period, after the given initial delay. Uses
+   * {@link CronScheduler} for task scheduling. Exceptions are caught and 
logged
+   * as errors.
+   */
+  public static void scheduleAtFixedRate(
+      final CronScheduler exec,
+      final Duration initialDelay,
+      final Duration rate,
+      final Callable<Signal> callable
+  )
+  {
+    log.debug("Scheduling periodically: %s with period %s", callable, rate);
+    Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+    exec.scheduleAt(delayInstance,

Review comment:
       Hey @leventov, do you think the below implementation for 
`startMonitor(final Monitor monitor)` method in `MonitorScheduler` class will 
suffice? Also, can this cause any inconsistency  since `scheduledFuture` is 
volatile and is shared amongst all the monitors?
   
       synchronized (lock) {
         monitor.start();
         Long rate = config.getEmitterPeriod().getMillis();
         scheduledFuture = scheduler.scheduleAtFixedRate(
             rate,
             rate,
             TimeUnit.MILLISECONDS,
             new CronTask()
             {
               @Override
               public void run(long scheduledRunTimeMillis)
               {
                 try {
                   if (monitor.monitor(emitter) && hasMonitor(monitor)) {
                     log.trace("Running %s (period %s)", this, rate);
                   } else {
                     log.debug("Stopping rescheduling %s (delay %s)", this, 
rate);
                     removeMonitor(monitor);
                     while (scheduledFuture == null) {
                       Thread.sleep(1);
                     }
                     scheduledFuture.cancel(false);
                   }
                 } catch (Throwable e) {
                   log.error(e, "Uncaught exception.");
                 }
               }
             });
       }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to