This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d04cddeb719 MINOR: Allow for configurable delay for periodic tasks
(#19143)
d04cddeb719 is described below
commit d04cddeb71934dcb476a4b2532dd83d5ddf3fd27
Author: Kevin Wu <[email protected]>
AuthorDate: Fri Mar 7 10:24:34 2025 -0600
MINOR: Allow for configurable delay for periodic tasks (#19143)
This patch allows for the immediatePeriodNs to be passed in when creating a
periodic task
Reviewers: Colin P. McCabe <[email protected]>
---
.../org/apache/kafka/controller/PeriodicTask.java | 30 +++++++++++++++++++++-
.../controller/PeriodicTaskControlManager.java | 3 +--
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
index 8fae9c782b1..d664ca8deed 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
@@ -20,6 +20,8 @@ package org.apache.kafka.controller;
import java.util.EnumSet;
import java.util.function.Supplier;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
class PeriodicTask {
/**
* The name of this periodic task.
@@ -34,7 +36,12 @@ class PeriodicTask {
private final Supplier<ControllerResult<Boolean>> op;
/**
- * The period of the task, in nanoseconds.
+ * The period of the task when ControllerResult.response is true, in
nanoseconds.
+ */
+ private final long immediatePeriodNs;
+
+ /**
+ * The default period of the task when ControllerResult.response is false,
in nanoseconds.
*/
private final long periodNs;
@@ -43,6 +50,8 @@ class PeriodicTask {
*/
private final EnumSet<PeriodicTaskFlag> flags;
+ private static final long DEFAULT_IMMEDIATE_PERIOD_NS =
MILLISECONDS.toNanos(10);
+
PeriodicTask(
String name,
Supplier<ControllerResult<Boolean>> op,
@@ -51,6 +60,21 @@ class PeriodicTask {
) {
this.name = name;
this.op = op;
+ this.immediatePeriodNs = DEFAULT_IMMEDIATE_PERIOD_NS;
+ this.periodNs = periodNs;
+ this.flags = flags;
+ }
+
+ PeriodicTask(
+ String name,
+ Supplier<ControllerResult<Boolean>> op,
+ long periodNs,
+ EnumSet<PeriodicTaskFlag> flags,
+ long immediatePeriodNs
+ ) {
+ this.name = name;
+ this.op = op;
+ this.immediatePeriodNs = immediatePeriodNs;
this.periodNs = periodNs;
this.flags = flags;
}
@@ -63,6 +87,10 @@ class PeriodicTask {
return op;
}
+ long immediatePeriodNs() {
+ return immediatePeriodNs;
+ }
+
long periodNs() {
return periodNs;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
index 821fa47df20..80857761521 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -186,7 +185,7 @@ class PeriodicTaskControlManager {
// collection of operations before picking from the non-deferred
collection of
// operations. This can result in some unfairness if deferred
operation are
// scheduled for immediate execution. This delays them by a small
amount of time.
- return MILLISECONDS.toNanos(10);
+ return task.immediatePeriodNs();
} else if (error) {
// If the periodic task hit an error, reschedule it in 5 minutes.
This is to avoid
// scenarios where we spin in a tight loop hitting errors, but
still give the task