This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a29be0d04f [improve][broker] Ensure metadata session state visibility 
and improve Unstable observability for ServiceUnitStateChannelImpl (#25132)
2a29be0d04f is described below

commit 2a29be0d04f293c767ccf91dde807fd036d2ca95
Author: Kai Wang <[email protected]>
AuthorDate: Mon Jan 19 13:00:52 2026 +0800

    [improve][broker] Ensure metadata session state visibility and improve 
Unstable observability for ServiceUnitStateChannelImpl (#25132)
---
 .../channel/ServiceUnitStateChannelImpl.java       | 20 +++++-
 .../channel/ServiceUnitStateChannelTest.java       | 77 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 50e325cecfa..0e7f15e64db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -127,8 +127,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     private ServiceUnitStateTableView tableview;
     private ScheduledFuture<?> monitorTask;
-    private SessionEvent lastMetadataSessionEvent = SessionReestablished;
-    private long lastMetadataSessionEventTimestamp = 0;
+    private volatile SessionEvent lastMetadataSessionEvent = 
SessionReestablished;
+    private volatile long lastMetadataSessionEventTimestamp = 0;
     private long inFlightStateWaitingTimeInMillis;
 
     private long ownershipMonitorDelayTimeInSecs;
@@ -1707,7 +1707,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
         var metadataState = getMetadataState();
         if (metadataState != Stable) {
-            log.warn("metadata state:{} is not Stable. Skipping ownership 
monitor.", metadataState);
+            log.warn("metadata state:{} is not Stable. Skipping ownership 
monitor. lastMetadataSessionEvent:{},"
+                            + " lastMetadataSessionEventTimestamp:{}",
+                    metadataState, lastMetadataSessionEvent, 
lastMetadataSessionEventTimestamp);
             return;
         }
 
@@ -1976,9 +1978,21 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
         var metric = Metrics.create(dimensions);
 
+        var metadataState = getMetadataState();
+        long now = System.currentTimeMillis();
+        long lastSessionEventAgeSeconds =
+                lastMetadataSessionEventTimestamp > 0
+                        ? MILLISECONDS.toSeconds(now - 
lastMetadataSessionEventTimestamp)
+                        : -1;
+
         metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", 
totalOrphanServiceUnitCleanupCnt);
         metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", 
totalServiceUnitTombstoneCleanupCnt);
         metric.put("brk_sunit_state_chn_owned_su_total", 
getTotalOwnedServiceUnitCnt());
+        metric.put("brk_sunit_state_chn_metadata_state", 
metadataState.ordinal());
+        
metric.put("brk_sunit_state_chn_last_metadata_session_event_is_reestablished",
+                lastMetadataSessionEvent == SessionReestablished ? 1 : 0);
+        
metric.put("brk_sunit_state_chn_last_metadata_session_event_timestamp_ms", 
lastMetadataSessionEventTimestamp);
+        
metric.put("brk_sunit_state_chn_last_metadata_session_event_age_seconds", 
lastSessionEventAgeSeconds);
         metrics.add(metric);
 
         return metrics;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index bbde38bfbec..5baf91f2d5d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreTableView;
@@ -731,6 +732,82 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+
+    @Test
+    public void metadataStateMetricsTest() throws IllegalAccessException {
+        ServiceUnitStateChannelImpl channel1 = (ServiceUnitStateChannelImpl) 
this.channel1;
+
+        long now = System.currentTimeMillis();
+        long oldTimestamp = now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1;
+        FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", 
SessionReestablished, true);
+        FieldUtils.writeDeclaredField(channel1, 
"lastMetadataSessionEventTimestamp", oldTimestamp, true);
+        long beforeMetricsCall = System.currentTimeMillis();
+        var metrics = channel1.getMetrics();
+        long afterMetricsCall = System.currentTimeMillis();
+        assertEquals(0, getMetric(metrics, 
"brk_sunit_state_chn_metadata_state").intValue());
+        assertEquals(1, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+                .intValue());
+        assertEquals(oldTimestamp, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+                .longValue());
+        long ageSeconds = getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+                .longValue();
+        long minAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeMetricsCall 
- oldTimestamp);
+        long maxAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(afterMetricsCall 
- oldTimestamp);
+        assertTrue(ageSeconds >= minAgeSeconds && ageSeconds <= maxAgeSeconds,
+                "Unexpected age seconds: " + ageSeconds + ", expected within 
[" + minAgeSeconds + ", "
+                        + maxAgeSeconds + "]");
+
+        FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", 
SessionReestablished, true);
+        FieldUtils.writeDeclaredField(channel1, 
"lastMetadataSessionEventTimestamp", now, true);
+        metrics = channel1.getMetrics();
+        assertEquals(1, getMetric(metrics, 
"brk_sunit_state_chn_metadata_state").intValue());
+        assertEquals(1, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+                .intValue());
+        assertEquals(now, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+                .longValue());
+        ageSeconds = getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+                .longValue();
+        assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age 
seconds: " + ageSeconds);
+
+        FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", 
SessionLost, true);
+        FieldUtils.writeDeclaredField(channel1, 
"lastMetadataSessionEventTimestamp", now, true);
+        metrics = channel1.getMetrics();
+        assertEquals(2, getMetric(metrics, 
"brk_sunit_state_chn_metadata_state").intValue());
+        assertEquals(0, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+                .intValue());
+        assertEquals(now, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+                .longValue());
+        ageSeconds = getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+                .longValue();
+        assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age 
seconds: " + ageSeconds);
+
+        FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", 
SessionReestablished, true);
+        FieldUtils.writeDeclaredField(channel1, 
"lastMetadataSessionEventTimestamp", 0L, true);
+        metrics = channel1.getMetrics();
+        assertEquals(0, getMetric(metrics, 
"brk_sunit_state_chn_metadata_state").intValue());
+        assertEquals(1, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+                .intValue());
+        assertEquals(0L, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+                .longValue());
+        assertEquals(-1L, getMetric(metrics, 
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+                .longValue());
+    }
+
+    private static Number getMetric(List<Metrics> metrics, String metricName) {
+        for (Metrics metric : metrics) {
+            Object value = metric.getMetrics().get(metricName);
+            if (value == null) {
+                continue;
+            }
+            if (!(value instanceof Number)) {
+                fail(metricName + " is not numeric: " + value);
+            }
+            return (Number) value;
+        }
+        fail("Missing " + metricName + " metric");
+        return -1L;
+    }
+
     @Test(priority = 8)
     public void handleBrokerCreationEventTest() throws IllegalAccessException {
         var cleanupJobs = getCleanupJobs(channel1);

Reply via email to