This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e95d49231 Refactored `MetadataSegmentView` to use at most 500ms 
initial delay  (#19116)
37e95d49231 is described below

commit 37e95d4923180627c1f12eb92b06cc573ad4756f
Author: Cece Mei <[email protected]>
AuthorDate: Wed Mar 11 16:50:42 2026 -0700

    Refactored `MetadataSegmentView` to use at most 500ms initial delay  
(#19116)
    
    * ScheduledExecutors
    
    * min
---
 .../util/common/concurrent/ScheduledExecutors.java |  17 ++-
 .../common/concurrent/ScheduledExecutorsTest.java  | 131 ++++++++++++++++++++-
 .../sql/calcite/schema/MetadataSegmentView.java    |  35 ++----
 3 files changed, 149 insertions(+), 34 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index da8f5bf475a..bd2fea04d46 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -103,8 +103,21 @@ public class ScheduledExecutors
   }
 
   /**
-   * Run runnable once every period, after the given initial delay. Exceptions
-   * are caught and logged as errors.
+   * Schedules a runnable to execute repeatedly at a fixed rate. The first 
execution occurs after the initial delay,
+   * and subsequent executions are scheduled at fixed intervals measured from 
the start of each execution.
+   * <p>
+   * This differs from {@link #scheduleWithFixedDelay} in that the period is 
measured from the start of each
+   * execution rather than from the completion. If an execution takes longer 
than the period, the next execution
+   * will begin immediately after the current one starts.
+   * <p>
+   * This also differs from {@link 
ScheduledExecutorService#scheduleAtFixedRate} in that it prevents task pileup:
+   * only one future execution is scheduled at a time rather than scheduling 
all future executions upfront.
+   * This prevents a backlog of pending tasks from building up if the executor 
is delayed or tasks run slowly.
+   *
+   * @param exec         the ScheduledExecutorService to use for scheduling
+   * @param initialDelay the duration to wait before the first execution
+   * @param period       the target duration between the start of consecutive 
executions
+   * @param runnable     the task to execute repeatedly
    */
   public static void scheduleAtFixedRate(
       final ScheduledExecutorService exec,
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
index 78ed41a85c9..81dc7ccd8c1 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
@@ -20,27 +20,148 @@
 package org.apache.druid.java.util.common.concurrent;
 
 import org.joda.time.Duration;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ScheduledExecutorsTest
 {
   @Test
-  public void testscheduleWithFixedDelay() throws InterruptedException
+  public void testscheduleWithFixedDelay() throws Exception
   {
-    Duration initialDelay = new Duration(1000);
-    Duration delay = new Duration(1000);
+    Duration initialDelay = Duration.millis(100);
+    Duration delay = Duration.millis(500);
+    int taskDuration = 100; // ms
     ScheduledExecutorService exec = 
Execs.scheduledSingleThreaded("BasicAuthenticatorCacheManager-Exec--%d");
+
+    List<Long> taskStartTimes = new ArrayList<>();
+    AtomicInteger executionCount = new AtomicInteger(0);
+    CountDownLatch latch = new CountDownLatch(1);
+    long startTime = System.currentTimeMillis();
+
     ScheduledExecutors.scheduleWithFixedDelay(
         exec,
         initialDelay,
         delay,
         () -> {
-          System.out.println("TEST!");
+          try {
+            long taskStart = System.currentTimeMillis();
+            int count = executionCount.getAndIncrement();
+            synchronized (taskStartTimes) {
+              taskStartTimes.add(taskStart);
+            }
+
+            // Each task takes 100ms
+            Thread.sleep(taskDuration);
+
+            if (count == 3) {
+              latch.countDown();
+            }
+          }
+          catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
         }
     );
-    Thread.sleep(5 * 1000);
+
+    boolean completed = latch.await(5, TimeUnit.SECONDS);
     exec.shutdown();
+
+    Assert.assertTrue("Should complete within timeout", completed);
+    Assert.assertEquals("Should have exactly 4 executions", 4, 
executionCount.get());
+
+    // Verify first task starts at approximately the initial delay, in real 
life this is greater than 100ms due to overhead.
+    long firstTaskStart = taskStartTimes.get(0) - startTime;
+    Assert.assertTrue(
+        "First task should start at approximately initial delay (100ms), was: 
" + firstTaskStart,
+        firstTaskStart > 100 && firstTaskStart < 500
+    );
+
+    // Verify subsequent tasks wait for delay from previous task end
+    // Task ends at: taskStart + taskDuration
+    for (int i = 1; i < taskStartTimes.size(); i++) {
+      long previousTaskEnd = taskStartTimes.get(i - 1) + taskDuration;
+      long delayBetweenTasks = taskStartTimes.get(i) - previousTaskEnd;
+      // Should be approximately 500ms (the delay between task end and next 
task start)
+      Assert.assertTrue(
+          "Delay from task " + (i - 1) + " end to task " + i + " start should 
be ~500ms, was: " + delayBetweenTasks,
+          delayBetweenTasks >= 450 && delayBetweenTasks <= 650
+      );
+    }
+  }
+
+  @Test
+  public void testScheduleAtFixedRateWithLongRunningTask() throws Exception
+  {
+    Duration initialDelay = Duration.millis(100);
+    Duration period = Duration.millis(500);
+    ScheduledExecutorService exec = 
Execs.scheduledSingleThreaded("testScheduleAtFixedRate-%d");
+
+    List<Long> executionStartTimes = new ArrayList<>();
+    AtomicInteger executionCount = new AtomicInteger(0);
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ScheduledExecutors.scheduleAtFixedRate(
+        exec,
+        initialDelay,
+        period,
+        () -> {
+          try {
+            int count = executionCount.getAndIncrement();
+            long startTime = System.currentTimeMillis();
+            synchronized (executionStartTimes) {
+              executionStartTimes.add(startTime);
+            }
+
+            if (count == 0) {
+              // First task takes longer than the period (800ms > 500ms period)
+              Thread.sleep(800);
+            } else {
+              // Subsequent tasks finish quickly
+              Thread.sleep(100);
+            }
+
+            // Stop after 4 executions
+            if (count == 3) {
+              latch.countDown();
+            }
+          }
+          catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        }
+    );
+
+    // Wait for completion
+    boolean completed = latch.await(10, TimeUnit.SECONDS);
+    exec.shutdown();
+
+    Assert.assertTrue("Should complete within timeout", completed);
+    Assert.assertEquals("Should have exactly 4 executions", 4, 
executionStartTimes.size());
+
+    // Verify first task took longer than period (no pileup should occur)
+    // Second task should start immediately after first task finishes (not 
during)
+    long timeBetweenFirstAndSecond = executionStartTimes.get(1) - 
executionStartTimes.get(0);
+    // Should be at least 800ms (first task duration), but not much more since 
it schedules immediately
+    Assert.assertTrue(
+        "Second task should start after first task completes (~800ms), was: " 
+ timeBetweenFirstAndSecond,
+        timeBetweenFirstAndSecond >= 750 && timeBetweenFirstAndSecond <= 900
+    );
+
+    // Verify subsequent tasks maintain the period
+    long timeBetweenSecondAndThird = executionStartTimes.get(2) - 
executionStartTimes.get(1);
+    // Should be approximately 500ms (the period), since tasks now finish 
quickly
+    Assert.assertTrue(
+        "Subsequent tasks should maintain period (~500ms), was: " + 
timeBetweenSecondAndThird,
+        timeBetweenSecondAndThird >= 450 && timeBetweenSecondAndThird <= 600
+    );
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
index 4fc8dc30e23..34fd6fba779 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
@@ -34,6 +34,7 @@ import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -46,6 +47,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentStatusInCluster;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.joda.time.Duration;
 
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
@@ -114,7 +116,12 @@ public class MetadataSegmentView
     }
     try {
       if (isCacheEnabled) {
-        scheduledExec.schedule(new PollTask(), pollPeriodInMS, 
TimeUnit.MILLISECONDS);
+        ScheduledExecutors.scheduleAtFixedRate(
+            scheduledExec,
+            Duration.millis(Math.min(pollPeriodInMS, 500L)),
+            Duration.millis(pollPeriodInMS),
+            this::poll
+        );
       }
       lifecycleLock.started();
       log.info("MetadataSegmentView is started.");
@@ -199,30 +206,4 @@ public class MetadataSegmentView
         true
     );
   }
-
-  private class PollTask implements Runnable
-  {
-    @Override
-    public void run()
-    {
-      long delayMS = pollPeriodInMS;
-      try {
-        final long pollStartTime = System.nanoTime();
-        poll();
-        final long pollEndTime = System.nanoTime();
-        final long pollTimeNS = pollEndTime - pollStartTime;
-        final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS);
-        delayMS = Math.max(pollPeriodInMS - pollTimeMS, 0);
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Problem polling Coordinator.").emit();
-      }
-      finally {
-        if (!Thread.currentThread().isInterrupted()) {
-          scheduledExec.schedule(new PollTask(), delayMS, 
TimeUnit.MILLISECONDS);
-        }
-      }
-    }
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to