ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r512608527
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##########
@@ -167,6 +169,50 @@ public void run()
);
}
+ public static void scheduleAtFixedRate(CronScheduler exec, Duration rate,
Callable<Signal> callable)
+ {
+ scheduleAtFixedRate(exec, rate, rate, callable);
+ }
+
+ /**
+ * Run callable once every period, after the given initial delay. Uses
+ * {@link CronScheduler} for task scheduling. Exceptions are caught and
logged
+ * as errors.
+ */
+ public static void scheduleAtFixedRate(
+ final CronScheduler exec,
+ final Duration initialDelay,
+ final Duration rate,
+ final Callable<Signal> callable
+ )
+ {
+ log.debug("Scheduling periodically: %s with period %s", callable, rate);
+ Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+ exec.scheduleAt(delayInstance,
Review comment:
Keeping the above points in mind, I think this implementation will do.
synchronized (lock) {
monitor.start();
Long rate = config.getEmitterPeriod().getMillis();
Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
rate,
rate,
TimeUnit.MILLISECONDS,
new CronTask()
{
private volatile Future<Boolean> monitorFuture = null;
@Override
public void run(long scheduledRunTimeMillis)
{
try {
if (monitorFuture != null && monitorFuture.isDone()
&& !(monitorFuture.get() && hasMonitor(monitor))) {
removeMonitor(monitor);
monitor.getScheduledFuture().cancel(false);
log.debug("Stopped rescheduling %s (delay %s)", this,
rate);
return;
}
log.trace("Running %s (period %s)", this, rate);
monitorFuture = executor.submit(new Callable<Boolean>()
{
public Boolean call()
{
try {
return monitor.monitor(emitter);
} catch (Throwable e) {
log.error(e, "Uncaught exception.");
return false;
}
}
});
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
}
}
});
monitor.setScheduledFuture(scheduledFuture);
}
In this:
1. Each monitor has a separate future.
2. Cron task is cheap, it checks a boolean condition. If condition is true
it cancels the scheduling process for the particular monitor, else submits a
callable for monitoring to executor service
3. No race condition for cancelling scheduledFuture on first iteration.
Please let me know your thoughts on this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]