This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e5f3ea784 [coordinator] Coordinator report event metrics of enqueue 
rate and event process time labeled by event_type (#1466)
e5f3ea784 is described below

commit e5f3ea78451522f2b0b1cd7d4599e68135eb6845
Author: Yang Wang <[email protected]>
AuthorDate: Mon Sep 15 22:29:00 2025 +0800

    [coordinator] Coordinator report event metrics of enqueue rate and event 
process time labeled by event_type (#1466)
---
 .../coordinator/event/CoordinatorEventManager.java | 21 +++---
 .../metrics/group/CoordinatorEventMetricGroup.java | 86 ++++++++++++++++++++++
 .../metrics/group/CoordinatorMetricGroup.java      | 11 +++
 .../maintenance/observability/monitor-metrics.md   | 15 ++--
 4 files changed, 116 insertions(+), 17 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
index 06a92dabb..36d82df53 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metrics.Histogram;
 import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.server.coordinator.CoordinatorContext;
 import org.apache.fluss.server.coordinator.statemachine.ReplicaState;
+import org.apache.fluss.server.metrics.group.CoordinatorEventMetricGroup;
 import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
 import org.apache.fluss.utils.concurrent.ShutdownableThread;
 
@@ -58,7 +59,6 @@ public final class CoordinatorEventManager implements 
EventManager {
     private final Lock putLock = new ReentrantLock();
 
     // metrics
-    private Histogram eventProcessingTime;
     private Histogram eventQueueTime;
 
     // Coordinator metrics moved from CoordinatorEventProcessor
@@ -79,13 +79,6 @@ public final class CoordinatorEventManager implements 
EventManager {
     }
 
     private void registerMetrics() {
-        coordinatorMetricGroup.gauge(MetricNames.EVENT_QUEUE_SIZE, 
queue::size);
-
-        eventProcessingTime =
-                coordinatorMetricGroup.histogram(
-                        MetricNames.EVENT_PROCESSING_TIME_MS,
-                        new DescriptiveStatisticsHistogram(WINDOW_SIZE));
-
         eventQueueTime =
                 coordinatorMetricGroup.histogram(
                         MetricNames.EVENT_QUEUE_TIME_MS,
@@ -188,7 +181,10 @@ public final class CoordinatorEventManager implements 
EventManager {
                         QueuedEvent queuedEvent =
                                 new QueuedEvent(event, 
System.currentTimeMillis());
                         queue.put(queuedEvent);
-
+                        coordinatorMetricGroup
+                                .getOrAddEventTypeMetricGroup(event.getClass())
+                                .queuedEventCount()
+                                .inc();
                         LOG.debug(
                                 "Put coordinator event {} of event type {}.",
                                 event,
@@ -243,7 +239,12 @@ public final class CoordinatorEventManager implements 
EventManager {
                 LOG.error("Uncaught error processing event {}.", 
coordinatorEvent, e);
             } finally {
                 long costTimeMs = System.currentTimeMillis() - 
eventStartTimeMs;
-                eventProcessingTime.update(costTimeMs);
+                // Use event type specific histogram
+                CoordinatorEventMetricGroup eventMetricGroup =
+                        coordinatorMetricGroup.getOrAddEventTypeMetricGroup(
+                                coordinatorEvent.getClass());
+                eventMetricGroup.eventProcessingTime().update(costTimeMs);
+                eventMetricGroup.queuedEventCount().dec();
                 LOG.debug(
                         "Finished processing event {} of event type {} in 
{}ms.",
                         coordinatorEvent,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorEventMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorEventMetricGroup.java
new file mode 100644
index 000000000..d9a06324d
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorEventMetricGroup.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
+import org.apache.fluss.metrics.Histogram;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
+
+import java.util.Map;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+
+/**
+ * Metric group for coordinator event types. This group adds an additional 
dimension "event_type" to
+ * the metrics and manages event-specific metrics.
+ */
+public class CoordinatorEventMetricGroup extends AbstractMetricGroup {
+
+    private final Class<? extends CoordinatorEvent> eventClass;
+    private final Histogram eventProcessingTime;
+    private final Counter queuedEventCount;
+
+    public CoordinatorEventMetricGroup(
+            MetricRegistry registry,
+            Class<? extends CoordinatorEvent> eventClass,
+            CoordinatorMetricGroup parent) {
+        super(registry, makeScope(parent, eventClass.getSimpleName()), parent);
+        this.eventClass = eventClass;
+
+        this.eventProcessingTime =
+                histogram(
+                        MetricNames.EVENT_PROCESSING_TIME_MS,
+                        new DescriptiveStatisticsHistogram(100));
+        this.queuedEventCount =
+                counter(MetricNames.EVENT_QUEUE_SIZE, new 
ThreadSafeSimpleCounter());
+    }
+
+    @Override
+    protected String getGroupName(CharacterFilter filter) {
+        return "event";
+    }
+
+    @Override
+    protected void putVariables(Map<String, String> variables) {
+        variables.put("event_type", eventClass.getSimpleName());
+    }
+
+    /**
+     * Returns the histogram for event processing time.
+     *
+     * @return the event processing time histogram
+     */
+    public Histogram eventProcessingTime() {
+        return eventProcessingTime;
+    }
+
+    /**
+     * Returns the counter for event count.
+     *
+     * @return the event count counter
+     */
+    public Counter queuedEventCount() {
+        return queuedEventCount;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
index 24f9b3261..251fb462b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
@@ -20,6 +20,8 @@ package org.apache.fluss.server.metrics.group;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
+import org.apache.fluss.utils.MapUtils;
 
 import java.util.Map;
 
@@ -32,6 +34,9 @@ public class CoordinatorMetricGroup extends 
AbstractMetricGroup {
     protected final String hostname;
     protected final String serverId;
 
+    private final Map<Class<? extends CoordinatorEvent>, 
CoordinatorEventMetricGroup>
+            eventMetricGroups = MapUtils.newConcurrentHashMap();
+
     public CoordinatorMetricGroup(
             MetricRegistry registry, String clusterId, String hostname, String 
serverId) {
         super(registry, new String[] {clusterId, hostname, NAME}, null);
@@ -51,4 +56,10 @@ public class CoordinatorMetricGroup extends 
AbstractMetricGroup {
         variables.put("host", hostname);
         variables.put("server_id", serverId);
     }
+
+    public CoordinatorEventMetricGroup getOrAddEventTypeMetricGroup(
+            Class<? extends CoordinatorEvent> eventClass) {
+        return eventMetricGroups.computeIfAbsent(
+                eventClass, e -> new CoordinatorEventMetricGroup(registry, 
eventClass, this));
+    }
 }
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index ac351d510..a01f9e67c 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -295,7 +295,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   <tbody>
     <tr>
       <th rowspan="9"><strong>coordinator</strong></th>
-      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="9">-</td>
+      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="7">-</td>
       <td>activeCoordinatorCount</td>
       <td>The number of active CoordinatorServer in this cluster.</td>
       <td>Gauge</td>
@@ -325,19 +325,20 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The total number of replicas in the progress to be deleted in this 
cluster.</td>
       <td>Gauge</td>
     </tr>
-    <tr>
-      <td>eventQueueSize</td>
-      <td>The number of events waiting to be processed in the queue.</td>
-      <td>Gauge</td>
-    </tr>
     <tr>
       <td>eventQueueTimeMs</td>
       <td>The time that an event spent waiting in the queue to be 
processed.</td>
       <td>Histogram</td>
     </tr>
+    <tr>
+      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="2">event</td>
+      <td>eventQueueSize</td>
+      <td>The number of events currently waiting to be processed in the 
coordinator event queue. This metric is labeled with <code>event_type</code> to 
distinguish between different types of coordinator events.</td>
+      <td>Gauge</td>
+    </tr>
     <tr>
       <td>eventProcessingTimeMs</td>
-      <td>The time that an event took to be processed.</td>
+      <td>The time that an event took to be processed by the coordinator event 
processor. This metric is labeled with <code>event_type</code> to distinguish 
between different types of coordinator events.</td>
       <td>Histogram</td>
     </tr>
   </tbody>

Reply via email to