This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 37e95d49231 Refactored `MetadataSegmentView` to use at most 500ms
initial delay (#19116)
37e95d49231 is described below
commit 37e95d4923180627c1f12eb92b06cc573ad4756f
Author: Cece Mei <[email protected]>
AuthorDate: Wed Mar 11 16:50:42 2026 -0700
Refactored `MetadataSegmentView` to use at most 500ms initial delay
(#19116)
* ScheduledExecutors
* min
---
.../util/common/concurrent/ScheduledExecutors.java | 17 ++-
.../common/concurrent/ScheduledExecutorsTest.java | 131 ++++++++++++++++++++-
.../sql/calcite/schema/MetadataSegmentView.java | 35 ++----
3 files changed, 149 insertions(+), 34 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index da8f5bf475a..bd2fea04d46 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -103,8 +103,21 @@ public class ScheduledExecutors
}
/**
- * Run runnable once every period, after the given initial delay. Exceptions
- * are caught and logged as errors.
+ * Schedules a runnable to execute repeatedly at a fixed rate. The first
execution occurs after the initial delay,
+ * and subsequent executions are scheduled at fixed intervals measured from
the start of each execution.
+ * <p>
+ * This differs from {@link #scheduleWithFixedDelay} in that the period is
measured from the start of each
+ * execution rather than from the completion. If an execution takes longer
than the period, the next execution
+ * will begin immediately after the current one starts.
+ * <p>
+ * This also differs from {@link
ScheduledExecutorService#scheduleAtFixedRate} in that it prevents task pileup:
+ * only one future execution is scheduled at a time rather than scheduling
all future executions upfront.
+ * This prevents a backlog of pending tasks from building up if the executor
is delayed or tasks run slowly.
+ *
+ * @param exec the ScheduledExecutorService to use for scheduling
+ * @param initialDelay the duration to wait before the first execution
+ * @param period the target duration between the start of consecutive
executions
+ * @param runnable the task to execute repeatedly
*/
public static void scheduleAtFixedRate(
final ScheduledExecutorService exec,
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
index 78ed41a85c9..81dc7ccd8c1 100644
---
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
@@ -20,27 +20,148 @@
package org.apache.druid.java.util.common.concurrent;
import org.joda.time.Duration;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class ScheduledExecutorsTest
{
@Test
- public void testscheduleWithFixedDelay() throws InterruptedException
+ public void testscheduleWithFixedDelay() throws Exception
{
- Duration initialDelay = new Duration(1000);
- Duration delay = new Duration(1000);
+ Duration initialDelay = Duration.millis(100);
+ Duration delay = Duration.millis(500);
+ int taskDuration = 100; // ms
ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("BasicAuthenticatorCacheManager-Exec--%d");
+
+ List<Long> taskStartTimes = new ArrayList<>();
+ AtomicInteger executionCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+ long startTime = System.currentTimeMillis();
+
ScheduledExecutors.scheduleWithFixedDelay(
exec,
initialDelay,
delay,
() -> {
- System.out.println("TEST!");
+ try {
+ long taskStart = System.currentTimeMillis();
+ int count = executionCount.getAndIncrement();
+ synchronized (taskStartTimes) {
+ taskStartTimes.add(taskStart);
+ }
+
+ // Each task takes 100ms
+ Thread.sleep(taskDuration);
+
+ if (count == 3) {
+ latch.countDown();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
);
- Thread.sleep(5 * 1000);
+
+ boolean completed = latch.await(5, TimeUnit.SECONDS);
exec.shutdown();
+
+ Assert.assertTrue("Should complete within timeout", completed);
+ Assert.assertEquals("Should have exactly 4 executions", 4,
executionCount.get());
+
+ // Verify first task starts at approximately the initial delay, in real
life this is greater than 100ms due to overhead.
+ long firstTaskStart = taskStartTimes.get(0) - startTime;
+ Assert.assertTrue(
+ "First task should start at approximately initial delay (100ms), was:
" + firstTaskStart,
+ firstTaskStart > 100 && firstTaskStart < 500
+ );
+
+ // Verify subsequent tasks wait for delay from previous task end
+ // Task ends at: taskStart + taskDuration
+ for (int i = 1; i < taskStartTimes.size(); i++) {
+ long previousTaskEnd = taskStartTimes.get(i - 1) + taskDuration;
+ long delayBetweenTasks = taskStartTimes.get(i) - previousTaskEnd;
+ // Should be approximately 500ms (the delay between task end and next
task start)
+ Assert.assertTrue(
+ "Delay from task " + (i - 1) + " end to task " + i + " start should
be ~500ms, was: " + delayBetweenTasks,
+ delayBetweenTasks >= 450 && delayBetweenTasks <= 650
+ );
+ }
+ }
+
+ @Test
+ public void testScheduleAtFixedRateWithLongRunningTask() throws Exception
+ {
+ Duration initialDelay = Duration.millis(100);
+ Duration period = Duration.millis(500);
+ ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testScheduleAtFixedRate-%d");
+
+ List<Long> executionStartTimes = new ArrayList<>();
+ AtomicInteger executionCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ initialDelay,
+ period,
+ () -> {
+ try {
+ int count = executionCount.getAndIncrement();
+ long startTime = System.currentTimeMillis();
+ synchronized (executionStartTimes) {
+ executionStartTimes.add(startTime);
+ }
+
+ if (count == 0) {
+ // First task takes longer than the period (800ms > 500ms period)
+ Thread.sleep(800);
+ } else {
+ // Subsequent tasks finish quickly
+ Thread.sleep(100);
+ }
+
+ // Stop after 4 executions
+ if (count == 3) {
+ latch.countDown();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ // Wait for completion
+ boolean completed = latch.await(10, TimeUnit.SECONDS);
+ exec.shutdown();
+
+ Assert.assertTrue("Should complete within timeout", completed);
+ Assert.assertEquals("Should have exactly 4 executions", 4,
executionStartTimes.size());
+
+ // Verify first task took longer than period (no pileup should occur)
+ // Second task should start immediately after first task finishes (not
during)
+ long timeBetweenFirstAndSecond = executionStartTimes.get(1) -
executionStartTimes.get(0);
+ // Should be at least 800ms (first task duration), but not much more since
it schedules immediately
+ Assert.assertTrue(
+ "Second task should start after first task completes (~800ms), was: "
+ timeBetweenFirstAndSecond,
+ timeBetweenFirstAndSecond >= 750 && timeBetweenFirstAndSecond <= 900
+ );
+
+ // Verify subsequent tasks maintain the period
+ long timeBetweenSecondAndThird = executionStartTimes.get(2) -
executionStartTimes.get(1);
+ // Should be approximately 500ms (the period), since tasks now finish
quickly
+ Assert.assertTrue(
+ "Subsequent tasks should maintain period (~500ms), was: " +
timeBetweenSecondAndThird,
+ timeBetweenSecondAndThird >= 450 && timeBetweenSecondAndThird <= 600
+ );
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
index 4fc8dc30e23..34fd6fba779 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
@@ -34,6 +34,7 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -46,6 +47,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentStatusInCluster;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.joda.time.Duration;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
@@ -114,7 +116,12 @@ public class MetadataSegmentView
}
try {
if (isCacheEnabled) {
- scheduledExec.schedule(new PollTask(), pollPeriodInMS,
TimeUnit.MILLISECONDS);
+ ScheduledExecutors.scheduleAtFixedRate(
+ scheduledExec,
+ Duration.millis(Math.min(pollPeriodInMS, 500L)),
+ Duration.millis(pollPeriodInMS),
+ this::poll
+ );
}
lifecycleLock.started();
log.info("MetadataSegmentView is started.");
@@ -199,30 +206,4 @@ public class MetadataSegmentView
true
);
}
-
- private class PollTask implements Runnable
- {
- @Override
- public void run()
- {
- long delayMS = pollPeriodInMS;
- try {
- final long pollStartTime = System.nanoTime();
- poll();
- final long pollEndTime = System.nanoTime();
- final long pollTimeNS = pollEndTime - pollStartTime;
- final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS);
- delayMS = Math.max(pollPeriodInMS - pollTimeMS, 0);
- }
- catch (Exception e) {
- log.makeAlert(e, "Problem polling Coordinator.").emit();
- }
- finally {
- if (!Thread.currentThread().isInterrupted()) {
- scheduledExec.schedule(new PollTask(), delayMS,
TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]