This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 0d2fd09f83a KAFKA-16446: Improve controller event duration logging 
(#15622)
0d2fd09f83a is described below

commit 0d2fd09f83a706d47e22560a1a0ea71810b9ea55
Author: David Arthur <[email protected]>
AuthorDate: Mon Jan 6 16:34:46 2025 -0500

    KAFKA-16446: Improve controller event duration logging (#15622)
    
    There are times when the controller has a high event processing time, such 
as during startup, or when creating a topic with many partitions. We can see 
these processing times in the p99 metric 
(kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs), 
however it's difficult to see exactly which event is causing high processing 
time.
    
    With DEBUG logs, we see every event along with its processing time. Even 
with this, it's a bit tedious to find the event with a high processing time.
    
    This PR logs all events which take longer than 2 seconds at ERROR level. 
This will help identify events that are taking far too long, and which could be 
disruptive to the operation of the controller. The slow event logging looks 
like this:
    
    ```
    [2024-12-20 15:03:39,754] ERROR [QuorumController id=1] Exceptionally slow 
controller event createTopics took 5240 ms.  
(org.apache.kafka.controller.EventPerformanceMonitor)
    ```
    
    Also, every 60 seconds, it logs some event time statistics, including 
average time, maximum time, and the name of the event which took the longest. 
This periodic message looks like this:
    
    ```
    [2024-12-20 15:35:04,798] INFO [QuorumController id=1] In the last 60000 ms 
period, 333 events were completed, which took an average of 12.34 ms each. The 
slowest event was handleCommit[baseOffset=0], which took 41.90 ms. 
(org.apache.kafka.controller.EventPerformanceMonitor)
    ```
    
    An operator can disable these logs by adding the following to their log4j 
config:
    
    ```
    org.apache.kafka.controller.EventPerformanceMonitor=OFF
    ```
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/server/ControllerServer.scala |   4 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +
 .../kafka/controller/EventPerformanceMonitor.java  | 212 +++++++++++++++++++++
 .../apache/kafka/controller/QuorumController.java  |  64 ++++++-
 .../controller/EventPerformanceMonitorTest.java    | 120 ++++++++++++
 .../apache/kafka/server/config/KRaftConfigs.java   |  10 +
 6 files changed, 403 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index d7a39449068..93657b8b303 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -242,7 +242,9 @@ class ControllerServer(
           setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
           
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
           
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
-          setInterBrokerListenerName(config.interBrokerListenerName.value())
+          setInterBrokerListenerName(config.interBrokerListenerName.value()).
+          
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
+          
setControllerPerformanceAlwaysLogThresholdMs(config.controllerPerformanceAlwaysLogThresholdMs)
       }
       controller = controllerBuilder.build()
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ed117fde1c5..424c54e17c8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -218,6 +218,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val initialRegistrationTimeoutMs: Int = 
getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
   val brokerHeartbeatIntervalMs: Int = 
getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
   val brokerSessionTimeoutMs: Int = 
getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG)
+  val controllerPerformanceSamplePeriodMs: Long = 
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
+  val controllerPerformanceAlwaysLogThresholdMs: Long = 
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)
 
   private def parseProcessRoles(): Set[ProcessRole] = {
     val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
 
b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
new file mode 100644
index 00000000000..fbe8b1c3cbb
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.text.DecimalFormat;
+import java.util.AbstractMap;
+import java.util.Map;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Track the performance of controller events. Periodically log the slowest 
events.
+ * Log any event slower than a certain threshold.
+ */
+class EventPerformanceMonitor {
+    /**
+     * The format to use when displaying milliseconds.
+     */
+    private static final DecimalFormat MILLISECOND_DECIMAL_FORMAT = new 
DecimalFormat("#0.00");
+
+    static class Builder {
+        LogContext logContext = null;
+        long periodNs = SECONDS.toNanos(60);
+        long alwaysLogThresholdNs = SECONDS.toNanos(2);
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setPeriodNs(long periodNs) {
+            this.periodNs = periodNs;
+            return this;
+        }
+
+        Builder setAlwaysLogThresholdNs(long alwaysLogThresholdNs) {
+            this.alwaysLogThresholdNs = alwaysLogThresholdNs;
+            return this;
+        }
+
+        EventPerformanceMonitor build() {
+            if (logContext == null) logContext = new LogContext();
+            return new EventPerformanceMonitor(logContext,
+                    periodNs,
+                    alwaysLogThresholdNs);
+        }
+    }
+
+    /**
+     * The log4j object to use.
+     */
+    private final Logger log;
+
+    /**
+     * The period in nanoseconds.
+     */
+    private long periodNs;
+
+    /**
+     * The always-log threshold in nanoseconds.
+     */
+    private long alwaysLogThresholdNs;
+
+    /**
+     * The name of the slowest event we've seen so far, or null if none has 
been seen.
+     */
+    private String slowestEventName;
+
+    /**
+     * The duration of the slowest event we've seen so far, or 0 if none has 
been seen.
+     */
+    private long slowestEventDurationNs;
+
+    /**
+     * The total duration of all the events we've seen.
+     */
+    private long totalEventDurationNs;
+
+    /**
+     * The number of events we've seen.
+     */
+    private int numEvents;
+
+    private EventPerformanceMonitor(
+        LogContext logContext,
+        long periodNs,
+        long alwaysLogThresholdNs
+    ) {
+        this.log = logContext.logger(EventPerformanceMonitor.class);
+        this.periodNs = periodNs;
+        this.alwaysLogThresholdNs = alwaysLogThresholdNs;
+        reset();
+    }
+
+    long periodNs() {
+        return periodNs;
+    }
+
+    Map.Entry<String, Long> slowestEvent() {
+        return new AbstractMap.SimpleImmutableEntry<>(slowestEventName, 
slowestEventDurationNs);
+    }
+
+    /**
+     * Reset all internal state.
+     */
+    void reset() {
+        this.slowestEventName = null;
+        this.slowestEventDurationNs = 0;
+        this.totalEventDurationNs = 0;
+        this.numEvents = 0;
+    }
+
+    /**
+     * Handle a controller event being finished.
+     *
+     * @param name          The name of the controller event.
+     * @param durationNs    The duration of the controller event in 
nanoseconds.
+     */
+    void observeEvent(String name, long durationNs) {
+        String message = doObserveEvent(name, durationNs);
+        if (message != null) {
+            log.error("{}", message);
+        }
+    }
+
+    /**
+     * Handle a controller event being finished.
+     *
+     * @param name          The name of the controller event.
+     * @param durationNs    The duration of the controller event in 
nanoseconds.
+     *
+     * @return              The message to log, or null otherwise.
+     */
+    String doObserveEvent(String name, long durationNs) {
+        if (slowestEventName == null || slowestEventDurationNs < durationNs) {
+            slowestEventName = name;
+            slowestEventDurationNs = durationNs;
+        }
+        totalEventDurationNs += durationNs;
+        numEvents++;
+        if (durationNs < alwaysLogThresholdNs) {
+            return null;
+        }
+        return "Exceptionally slow controller event " + name + " took " +
+            NANOSECONDS.toMillis(durationNs) + " ms.";
+    }
+
+    /**
+     * Generate a log message summarizing the events of the last period,
+     * and then reset our internal state.
+     */
+    void generatePeriodicPerformanceMessage() {
+        String message = periodicPerformanceMessage();
+        log.info("{}", message);
+        reset();
+    }
+
+    /**
+     * Generate a log message summarizing the events of the last period.
+     *
+     * @return                          The summary string.
+     */
+    String periodicPerformanceMessage() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("In the last ");
+        bld.append(NANOSECONDS.toMillis(periodNs));
+        bld.append(" ms period, ");
+        if (numEvents == 0) {
+            bld.append("there were no controller events completed.");
+        } else {
+            bld.append(numEvents).append(" controller events were completed, 
which took an average of ");
+            bld.append(formatNsAsDecimalMs(totalEventDurationNs / numEvents));
+            bld.append(" ms each. The slowest event was 
").append(slowestEventName);
+            bld.append(", which took ");
+            bld.append(formatNsAsDecimalMs(slowestEventDurationNs));
+            bld.append(" ms.");
+        }
+        return bld.toString();
+    }
+
+    /**
+     * Translate a duration in nanoseconds to a decimal duration in 
milliseconds.
+     *
+     * @param durationNs    The duration in nanoseconds.
+     * @return              The decimal duration in milliseconds.
+     */
+    static String formatNsAsDecimalMs(long durationNs) {
+        double number = NANOSECONDS.toMicros(durationNs);
+        number /= 1000;
+        return MILLISECOND_DECIMAL_FORMAT.format(number);
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e3b362c85b..acf5c87f816 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -175,16 +175,21 @@ import static 
org.apache.kafka.controller.QuorumController.ControllerOperationFl
  */
 public final class QuorumController implements Controller {
     /**
-     * The maximum records that the controller will write in a single batch.
+     * The default maximum records that the controller will write in a single 
batch.
      */
-    private static final int MAX_RECORDS_PER_BATCH = 10000;
+    private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000;
+
+    /**
+     * The default minimum event time that can be logged as a slow event.
+     */
+    private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200;
 
     /**
      * The maximum records any user-initiated operation is allowed to generate.
      *
      * For now, this is set to the maximum records in a single batch.
      */
-    static final int MAX_RECORDS_PER_USER_OP = MAX_RECORDS_PER_BATCH;
+    static final int MAX_RECORDS_PER_USER_OP = DEFAULT_MAX_RECORDS_PER_BATCH;
 
     /**
      * A builder class which creates the QuorumController.
@@ -213,7 +218,9 @@ public final class QuorumController implements Controller {
         private ConfigurationValidator configurationValidator = 
ConfigurationValidator.NO_OP;
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = null;
-        private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
+        private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH;
+        private long controllerPerformanceSamplePeriodMs = 60000L;
+        private long controllerPerformanceAlwaysLogThresholdMs = 2000L;
         private DelegationTokenCache tokenCache;
         private String tokenSecretKeyString;
         private long delegationTokenMaxLifeMs;
@@ -321,6 +328,16 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setControllerPerformanceSamplePeriodMs(long 
controllerPerformanceSamplePeriodMs) {
+            this.controllerPerformanceSamplePeriodMs = 
controllerPerformanceSamplePeriodMs;
+            return this;
+        }
+
+        public Builder setControllerPerformanceAlwaysLogThresholdMs(long 
controllerPerformanceAlwaysLogThresholdMs) {
+            this.controllerPerformanceAlwaysLogThresholdMs = 
controllerPerformanceAlwaysLogThresholdMs;
+            return this;
+        }
+
         public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> 
createTopicPolicy) {
             this.createTopicPolicy = createTopicPolicy;
             return this;
@@ -433,7 +450,9 @@ public final class QuorumController implements Controller {
                     delegationTokenExpiryTimeMs,
                     delegationTokenExpiryCheckIntervalMs,
                     uncleanLeaderElectionCheckIntervalMs,
-                    interBrokerListenerName
+                    interBrokerListenerName,
+                    controllerPerformanceSamplePeriodMs,
+                    controllerPerformanceAlwaysLogThresholdMs
                 );
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
@@ -524,6 +543,7 @@ public final class QuorumController implements Controller {
         long deltaNs = endProcessingTime - startProcessingTimeNs;
         log.debug("Processed {} in {} us", name,
             MICROSECONDS.convert(deltaNs, NANOSECONDS));
+        performanceMonitor.observeEvent(name, deltaNs);
         
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
     }
 
@@ -536,6 +556,8 @@ public final class QuorumController implements Controller {
         if (startProcessingTimeNs.isPresent()) {
             long endProcessingTime = time.nanoseconds();
             long deltaNs = endProcessingTime - 
startProcessingTimeNs.getAsLong();
+            performanceMonitor.observeEvent(name, deltaNs);
+            
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
             deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, 
NANOSECONDS));
         } else {
             deltaUs = OptionalLong.empty();
@@ -1446,6 +1468,11 @@ public final class QuorumController implements 
Controller {
      */
     private final RecordRedactor recordRedactor;
 
+    /**
+     * Monitors the performance of controller events and generates logs about 
it.
+     */
+    private final EventPerformanceMonitor performanceMonitor;
+
     private QuorumController(
         FaultHandler nonFatalFaultHandler,
         FaultHandler fatalFaultHandler,
@@ -1477,7 +1504,9 @@ public final class QuorumController implements Controller 
{
         long delegationTokenExpiryTimeMs,
         long delegationTokenExpiryCheckIntervalMs,
         long uncleanLeaderElectionCheckIntervalMs,
-        String interBrokerListenerName
+        String interBrokerListenerName,
+        long controllerPerformanceSamplePeriodMs,
+        long controllerPerformanceAlwaysLogThresholdMs
     ) {
         this.nonFatalFaultHandler = nonFatalFaultHandler;
         this.fatalFaultHandler = fatalFaultHandler;
@@ -1574,6 +1603,11 @@ public final class QuorumController implements 
Controller {
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
         this.recordRedactor = new RecordRedactor(configSchema);
+        this.performanceMonitor = new EventPerformanceMonitor.Builder().
+            setLogContext(logContext).
+            
setPeriodNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceSamplePeriodMs)).
+            
setAlwaysLogThresholdNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceAlwaysLogThresholdMs)).
+            build();
         if (maxIdleIntervalNs.isPresent()) {
             registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
         }
@@ -1587,7 +1621,7 @@ public final class QuorumController implements Controller 
{
         }
         
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
         
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));
-
+        registerGeneratePeriodicPerformanceMessage();
         // OffsetControlManager must be initialized last, because its 
constructor will take the
         // initial in-memory snapshot of all extant timeline data structures.
         this.offsetControl = new OffsetControlManager.Builder().
@@ -1597,7 +1631,6 @@ public final class QuorumController implements Controller 
{
             setTime(time).
             build();
         log.info("Creating new QuorumController with clusterId {}", clusterId);
-
         this.raftClient.register(metaLogListener);
     }
 
@@ -1681,6 +1714,21 @@ public final class QuorumController implements 
Controller {
             EnumSet.of(PeriodicTaskFlag.VERBOSE)));
     }
 
+    /**
+     * Register the generatePeriodicPerformanceMessage task.
+     *
+     * This task periodically logs some statistics about controller 
performance.
+     */
+    private void registerGeneratePeriodicPerformanceMessage() {
+        periodicControl.registerTask(new 
PeriodicTask("generatePeriodicPerformanceMessage",
+            () -> {
+                performanceMonitor.generatePeriodicPerformanceMessage();
+                return ControllerResult.of(Collections.emptyList(), false);
+            },
+            performanceMonitor.periodNs(),
+            EnumSet.noneOf(PeriodicTaskFlag.class)));
+    }
+
     /**
      * Register the delegation token expiration task.
      *
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/EventPerformanceMonitorTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/EventPerformanceMonitorTest.java
new file mode 100644
index 00000000000..81e01679dc0
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/EventPerformanceMonitorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.AbstractMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class EventPerformanceMonitorTest {
+    @Test
+    public void testDefaultPeriodNs() {
+        assertEquals(SECONDS.toNanos(60),
+            new EventPerformanceMonitor.Builder().build().periodNs());
+    }
+
+    @Test
+    public void testSlowestEventWithNoEvents() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        assertEquals(new AbstractMap.SimpleImmutableEntry<>(null, 0L),
+            monitor.slowestEvent());
+    }
+
+    @Test
+    public void testSlowestEventWithThreeEvents() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        monitor.observeEvent("fastEvent", MILLISECONDS.toNanos(2));
+        monitor.observeEvent("slowEvent", MILLISECONDS.toNanos(100));
+        assertEquals(new AbstractMap.SimpleImmutableEntry<>("slowEvent", 
MILLISECONDS.toNanos(100)),
+            monitor.slowestEvent());
+    }
+
+    @Test
+    public void testLogSlowEvent() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        assertEquals("Exceptionally slow controller event slowEvent took 5000 
ms.",
+            monitor.doObserveEvent("slowEvent", SECONDS.toNanos(5)));
+    }
+
+    @Test
+    public void testDoNotLogFastEvent() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        assertNull(monitor.doObserveEvent("slowEvent", 
MILLISECONDS.toNanos(250)));
+    }
+
+    @Test
+    public void testFormatNsAsDecimalMsWithZero() {
+        assertEquals("0.00",
+            EventPerformanceMonitor.formatNsAsDecimalMs(0));
+    }
+
+    @Test
+    public void testFormatNsAsDecimalMsWith100() {
+        assertEquals("100.00",
+            
EventPerformanceMonitor.formatNsAsDecimalMs(MILLISECONDS.toNanos(100)));
+    }
+
+    @Test
+    public void testFormatNsAsDecimalMsWith123456789() {
+        assertEquals("123.46",
+            EventPerformanceMonitor.formatNsAsDecimalMs(123456789));
+    }
+
+    @Test
+    public void testPeriodicPerformanceMessageWithNoEvents() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        assertEquals("In the last 60000 ms period, there were no controller 
events completed.",
+            monitor.periodicPerformanceMessage());
+    }
+
+    @Test
+    public void testPeriodicPerformanceMessageWithOneEvent() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        monitor.observeEvent("myEvent", MILLISECONDS.toNanos(12));
+        assertEquals("In the last 60000 ms period, 1 controller events were 
completed, which took an " +
+            "average of 12.00 ms each. The slowest event was myEvent, which 
took 12.00 ms.",
+                monitor.periodicPerformanceMessage());
+    }
+
+    @Test
+    public void testPeriodicPerformanceMessageWithThreeEvents() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        monitor.observeEvent("myEvent", MILLISECONDS.toNanos(12));
+        monitor.observeEvent("myEvent2", MILLISECONDS.toNanos(19));
+        monitor.observeEvent("myEvent3", MILLISECONDS.toNanos(1));
+        assertEquals("In the last 60000 ms period, 3 controller events were 
completed, which took an " +
+            "average of 10.67 ms each. The slowest event was myEvent2, which 
took 19.00 ms.",
+                monitor.periodicPerformanceMessage());
+    }
+
+    @Test
+    public void testGeneratePeriodicPerformanceMessageResetsState() {
+        EventPerformanceMonitor monitor = new 
EventPerformanceMonitor.Builder().build();
+        monitor.observeEvent("myEvent", MILLISECONDS.toNanos(12));
+        monitor.observeEvent("myEvent2", MILLISECONDS.toNanos(19));
+        monitor.observeEvent("myEvent3", MILLISECONDS.toNanos(1));
+        monitor.generatePeriodicPerformanceMessage();
+        assertEquals("In the last 60000 ms period, there were no controller 
events completed.",
+            monitor.periodicPerformanceMessage());
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index 7a76a8d793c..bb7d9234ea4 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -111,6 +111,14 @@ public class KRaftConfigs {
     public static final String SERVER_MAX_STARTUP_TIME_MS_DOC = "The maximum 
number of milliseconds we will wait for the server to come up. " +
             "By default there is no limit. This should be used for testing 
only.";
 
+    public static final String CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS = 
"controller.performance.sample.period.ms";
+    public static final long CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DEFAULT = 
60000;
+    public static final String CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DOC = 
"The number of milliseconds between periodic controller event performance log 
messages.";
+
+    public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS 
= "controller.performance.always.log.threshold.ms";
+    public static final long 
CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT = 2000;
+    public static final String 
CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error 
message about controller events that take longer than this threshold.";
+
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
             .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
             .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, 
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, 
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
@@ -128,5 +136,7 @@ public class KRaftConfigs {
             .define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, 
METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, 
METADATA_MAX_RETENTION_BYTES_DOC)
             .define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, 
LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
             .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, 
METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, 
METADATA_MAX_IDLE_INTERVAL_MS_DOC)
+            .defineInternal(CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS, LONG, 
CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DEFAULT, atLeast(100), MEDIUM, 
CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DOC)
+            .defineInternal(CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS, 
LONG, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT, atLeast(0), 
MEDIUM, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC)
             .defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, 
SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, 
SERVER_MAX_STARTUP_TIME_MS_DOC);
 }

Reply via email to