This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d04cddeb719 MINOR: Allow for configurable delay for periodic tasks 
(#19143)
d04cddeb719 is described below

commit d04cddeb71934dcb476a4b2532dd83d5ddf3fd27
Author: Kevin Wu <[email protected]>
AuthorDate: Fri Mar 7 10:24:34 2025 -0600

    MINOR: Allow for configurable delay for periodic tasks (#19143)
    
    This patch allows for the immediatePeriodNs to be passed in when creating a 
periodic task
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../org/apache/kafka/controller/PeriodicTask.java  | 30 +++++++++++++++++++++-
 .../controller/PeriodicTaskControlManager.java     |  3 +--
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java 
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
index 8fae9c782b1..d664ca8deed 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
@@ -20,6 +20,8 @@ package org.apache.kafka.controller;
 import java.util.EnumSet;
 import java.util.function.Supplier;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 class PeriodicTask {
     /**
      * The name of this periodic task.
@@ -34,7 +36,12 @@ class PeriodicTask {
     private final Supplier<ControllerResult<Boolean>> op;
 
     /**
-     * The period of the task, in nanoseconds.
+     * The period of the task when ControllerResult.response is true, in 
nanoseconds.
+     */
+    private final long immediatePeriodNs;
+
+    /**
+     * The default period of the task when ControllerResult.response is false, 
in nanoseconds.
      */
     private final long periodNs;
 
@@ -43,6 +50,8 @@ class PeriodicTask {
      */
     private final EnumSet<PeriodicTaskFlag> flags;
 
+    private static final long DEFAULT_IMMEDIATE_PERIOD_NS = 
MILLISECONDS.toNanos(10);
+
     PeriodicTask(
         String name,
         Supplier<ControllerResult<Boolean>> op,
@@ -51,6 +60,21 @@ class PeriodicTask {
     ) {
         this.name = name;
         this.op = op;
+        this.immediatePeriodNs = DEFAULT_IMMEDIATE_PERIOD_NS;
+        this.periodNs = periodNs;
+        this.flags = flags;
+    }
+
+    PeriodicTask(
+        String name,
+        Supplier<ControllerResult<Boolean>> op,
+        long periodNs,
+        EnumSet<PeriodicTaskFlag> flags,
+        long immediatePeriodNs
+    ) {
+        this.name = name;
+        this.op = op;
+        this.immediatePeriodNs = immediatePeriodNs;
         this.periodNs = periodNs;
         this.flags = flags;
     }
@@ -63,6 +87,10 @@ class PeriodicTask {
         return op;
     }
 
+    long immediatePeriodNs() {
+        return immediatePeriodNs;
+    }
+
     long periodNs() {
         return periodNs;
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
index 821fa47df20..80857761521 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Supplier;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
@@ -186,7 +185,7 @@ class PeriodicTaskControlManager {
             // collection of operations before picking from the non-deferred 
collection of
             // operations. This can result in some unfairness if deferred 
operation are
             // scheduled for immediate execution. This delays them by a small 
amount of time.
-            return MILLISECONDS.toNanos(10);
+            return task.immediatePeriodNs();
         } else if (error) {
             // If the periodic task hit an error, reschedule it in 5 minutes. 
This is to avoid
             // scenarios where we spin in a tight loop hitting errors, but 
still give the task

Reply via email to