This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2a29be0d04f [improve][broker] Ensure metadata session state visibility
and improve Unstable observability for ServiceUnitStateChannelImpl (#25132)
2a29be0d04f is described below
commit 2a29be0d04f293c767ccf91dde807fd036d2ca95
Author: Kai Wang <[email protected]>
AuthorDate: Mon Jan 19 13:00:52 2026 +0800
[improve][broker] Ensure metadata session state visibility and improve
Unstable observability for ServiceUnitStateChannelImpl (#25132)
---
.../channel/ServiceUnitStateChannelImpl.java | 20 +++++-
.../channel/ServiceUnitStateChannelTest.java | 77 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 50e325cecfa..0e7f15e64db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -127,8 +127,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private ServiceUnitStateTableView tableview;
private ScheduledFuture<?> monitorTask;
- private SessionEvent lastMetadataSessionEvent = SessionReestablished;
- private long lastMetadataSessionEventTimestamp = 0;
+ private volatile SessionEvent lastMetadataSessionEvent =
SessionReestablished;
+ private volatile long lastMetadataSessionEventTimestamp = 0;
private long inFlightStateWaitingTimeInMillis;
private long ownershipMonitorDelayTimeInSecs;
@@ -1707,7 +1707,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
var metadataState = getMetadataState();
if (metadataState != Stable) {
- log.warn("metadata state:{} is not Stable. Skipping ownership
monitor.", metadataState);
+ log.warn("metadata state:{} is not Stable. Skipping ownership
monitor. lastMetadataSessionEvent:{},"
+ + " lastMetadataSessionEventTimestamp:{}",
+ metadataState, lastMetadataSessionEvent,
lastMetadataSessionEventTimestamp);
return;
}
@@ -1976,9 +1978,21 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
var metric = Metrics.create(dimensions);
+ var metadataState = getMetadataState();
+ long now = System.currentTimeMillis();
+ long lastSessionEventAgeSeconds =
+ lastMetadataSessionEventTimestamp > 0
+ ? MILLISECONDS.toSeconds(now -
lastMetadataSessionEventTimestamp)
+ : -1;
+
metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total",
totalOrphanServiceUnitCleanupCnt);
metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total",
totalServiceUnitTombstoneCleanupCnt);
metric.put("brk_sunit_state_chn_owned_su_total",
getTotalOwnedServiceUnitCnt());
+ metric.put("brk_sunit_state_chn_metadata_state",
metadataState.ordinal());
+
metric.put("brk_sunit_state_chn_last_metadata_session_event_is_reestablished",
+ lastMetadataSessionEvent == SessionReestablished ? 1 : 0);
+
metric.put("brk_sunit_state_chn_last_metadata_session_event_timestamp_ms",
lastMetadataSessionEventTimestamp);
+
metric.put("brk_sunit_state_chn_last_metadata_session_event_age_seconds",
lastSessionEventAgeSeconds);
metrics.add(metric);
return metrics;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index bbde38bfbec..5baf91f2d5d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
@@ -731,6 +732,82 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
+
+ @Test
+ public void metadataStateMetricsTest() throws IllegalAccessException {
+ ServiceUnitStateChannelImpl channel1 = (ServiceUnitStateChannelImpl)
this.channel1;
+
+ long now = System.currentTimeMillis();
+ long oldTimestamp = now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1;
+ FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent",
SessionReestablished, true);
+ FieldUtils.writeDeclaredField(channel1,
"lastMetadataSessionEventTimestamp", oldTimestamp, true);
+ long beforeMetricsCall = System.currentTimeMillis();
+ var metrics = channel1.getMetrics();
+ long afterMetricsCall = System.currentTimeMillis();
+ assertEquals(0, getMetric(metrics,
"brk_sunit_state_chn_metadata_state").intValue());
+ assertEquals(1, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+ .intValue());
+ assertEquals(oldTimestamp, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+ .longValue());
+ long ageSeconds = getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+ .longValue();
+ long minAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeMetricsCall
- oldTimestamp);
+ long maxAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(afterMetricsCall
- oldTimestamp);
+ assertTrue(ageSeconds >= minAgeSeconds && ageSeconds <= maxAgeSeconds,
+ "Unexpected age seconds: " + ageSeconds + ", expected within
[" + minAgeSeconds + ", "
+ + maxAgeSeconds + "]");
+
+ FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent",
SessionReestablished, true);
+ FieldUtils.writeDeclaredField(channel1,
"lastMetadataSessionEventTimestamp", now, true);
+ metrics = channel1.getMetrics();
+ assertEquals(1, getMetric(metrics,
"brk_sunit_state_chn_metadata_state").intValue());
+ assertEquals(1, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+ .intValue());
+ assertEquals(now, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+ .longValue());
+ ageSeconds = getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+ .longValue();
+ assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age
seconds: " + ageSeconds);
+
+ FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent",
SessionLost, true);
+ FieldUtils.writeDeclaredField(channel1,
"lastMetadataSessionEventTimestamp", now, true);
+ metrics = channel1.getMetrics();
+ assertEquals(2, getMetric(metrics,
"brk_sunit_state_chn_metadata_state").intValue());
+ assertEquals(0, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+ .intValue());
+ assertEquals(now, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+ .longValue());
+ ageSeconds = getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+ .longValue();
+ assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age
seconds: " + ageSeconds);
+
+ FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent",
SessionReestablished, true);
+ FieldUtils.writeDeclaredField(channel1,
"lastMetadataSessionEventTimestamp", 0L, true);
+ metrics = channel1.getMetrics();
+ assertEquals(0, getMetric(metrics,
"brk_sunit_state_chn_metadata_state").intValue());
+ assertEquals(1, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
+ .intValue());
+ assertEquals(0L, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
+ .longValue());
+ assertEquals(-1L, getMetric(metrics,
"brk_sunit_state_chn_last_metadata_session_event_age_seconds")
+ .longValue());
+ }
+
+ private static Number getMetric(List<Metrics> metrics, String metricName) {
+ for (Metrics metric : metrics) {
+ Object value = metric.getMetrics().get(metricName);
+ if (value == null) {
+ continue;
+ }
+ if (!(value instanceof Number)) {
+ fail(metricName + " is not numeric: " + value);
+ }
+ return (Number) value;
+ }
+ fail("Missing " + metricName + " metric");
+ return -1L;
+ }
+
@Test(priority = 8)
public void handleBrokerCreationEventTest() throws IllegalAccessException {
var cleanupJobs = getCleanupJobs(channel1);