This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9186c382214 Add MseMetrics with configurable emission mode for 
multi-stage engine (#18550)
9186c382214 is described below

commit 9186c382214a601bd49313fe2ab8ef82b9d28483
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Jun 3 15:39:40 2026 -0700

    Add MseMetrics with configurable emission mode for multi-stage engine 
(#18550)
---
 .../jmx_prometheus_javaagent/configs/broker.yml    |   7 +-
 .../broker/broker/helix/BaseBrokerStarter.java     |   2 +
 .../org/apache/pinot/common/metrics/MseMeter.java  |  87 +++++
 .../apache/pinot/common/metrics/MseMetrics.java    | 267 +++++++++++++++
 .../pinot/common/metrics/MseMetricsMode.java       |  38 +++
 .../org/apache/pinot/common/metrics/MseTimer.java  |  70 ++++
 .../pinot/common/metrics/MseMetricsTest.java       | 363 +++++++++++++++++++++
 .../prometheus/BrokerPrometheusMetricsTest.java    |  36 ++
 .../prometheus/PrometheusTemplateRegexpTest.java   |  26 +-
 .../prometheus/ServerPrometheusMetricsTest.java    |  36 ++
 .../DropwizardBrokerPrometheusMetricsTest.java     |  12 +
 .../DropwizardServerPrometheusMetricsTest.java     |  12 +
 .../apache/pinot/query/runtime/QueryRunner.java    |   8 +-
 .../runtime/executor/OpChainSchedulerService.java  |  14 +-
 .../runtime/operator/MailboxSendOperator.java      |   8 +-
 .../query/runtime/operator/MultiStageOperator.java |  42 +--
 .../pinot/query/service/server/QueryServer.java    |  12 +-
 .../server/starter/helix/BaseServerStarter.java    |   2 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   8 +
 19 files changed, 1000 insertions(+), 50 deletions(-)

diff --git 
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml 
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
index 771f45fe526..f0fbe97f569 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
@@ -34,7 +34,8 @@ rules:
 - pattern: 
"\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"BrokerMetrics\", 
name=\"pinot\\.broker\\.(uncaughtGet|uncaughtPost|queryRejected|requestCompilation|resourceMissing)Exceptions\"><>(\\w+)"
   name: "pinot_broker_exceptions_$1_$2"
   cache: true
-# All global gauge/meters/timers
-- pattern: "\"?org\\.apache\\.pinot\\.common\\.metrics\"?<type=\"?\\w+\"?, 
name=\"?pinot\\.broker\\.(\\w+)\"?><>(\\w+)"
-  name: "pinot_broker_$1_$2"
+# All global gauge/meters/timers. Group-flexible at the prefix so non-broker 
MBean groups
+# registered in this JVM (e.g. pinot.mse.* from the multi-stage engine 
emitter) are also exported.
+- pattern: "\"?org\\.apache\\.pinot\\.common\\.metrics\"?<type=\"?\\w+\"?, 
name=\"?pinot\\.(\\w+)\\.(\\w+)\"?><>(\\w+)"
+  name: "pinot_$1_$2_$3"
   cache: true
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 255aeded2d2..eb6428cb877 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -80,6 +80,7 @@ import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.metrics.MseMetrics;
 import org.apache.pinot.common.utils.PinotAppConfigs;
 import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -416,6 +417,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
         _brokerConf.getProperty(Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
             Broker.AdaptiveServerSelector.DEFAULT_TYPE), 1);
     BrokerMetrics.register(_brokerMetrics);
+    MseMetrics.registerFromConfig(_brokerConf, _metricsRegistry);
 
     LOGGER.info("Connecting spectator Helix manager");
     initSpectatorHelixManager();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java
new file mode 100644
index 00000000000..9e3171ca3b4
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.Utils;
+
+
+/// Meters for the multi-stage engine, emitted via [MseMetrics] as 
`pinot.mse.*` when the cluster
+/// is configured for [MseMetricsMode#MSE] or [MseMetricsMode#DUAL].
+///
+/// Each entry optionally carries a [ServerMeter] counterpart. When present,
+/// [MseMetricsMode#SERVER] and [MseMetricsMode#DUAL] forward emissions to the 
existing
+/// `pinot.server.*` series. Entries with no counterpart (MSE-native metrics 
added after the
+/// migration) are emitted only under MSE / DUAL modes and silently dropped in 
SERVER mode.
+public enum MseMeter implements AbstractMetrics.Meter {
+  QUERIES("queries", true, ServerMeter.MSE_QUERIES),
+  OPCHAINS_STARTED("opchains", true, ServerMeter.MSE_OPCHAINS_STARTED),
+  OPCHAINS_COMPLETED("opchains", true, ServerMeter.MSE_OPCHAINS_COMPLETED),
+  CPU_EXECUTION_TIME_MS("milliseconds", true, 
ServerMeter.MSE_CPU_EXECUTION_TIME_MS),
+  MEMORY_ALLOCATED_BYTES("bytes", true, 
ServerMeter.MSE_MEMORY_ALLOCATED_BYTES),
+  EMITTED_ROWS("rows", true, ServerMeter.MSE_EMITTED_ROWS),
+  RUNNER_STARTED_TASKS("tasks", true, 
ServerMeter.MULTI_STAGE_RUNNER_STARTED_TASKS),
+  RUNNER_COMPLETED_TASKS("tasks", true, 
ServerMeter.MULTI_STAGE_RUNNER_COMPLETED_TASKS),
+  SUBMISSION_STARTED_TASKS("tasks", true, 
ServerMeter.MULTI_STAGE_SUBMISSION_STARTED_TASKS),
+  SUBMISSION_COMPLETED_TASKS("tasks", true, 
ServerMeter.MULTI_STAGE_SUBMISSION_COMPLETED_TASKS),
+  HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true, 
ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED),
+  WINDOW_TIMES_MAX_ROWS_REACHED("times", true, 
ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED),
+  IN_MEMORY_MESSAGES("messages", true, 
ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES),
+  RAW_MESSAGES("messages", true, ServerMeter.MULTI_STAGE_RAW_MESSAGES),
+  RAW_BYTES("bytes", true, ServerMeter.MULTI_STAGE_RAW_BYTES);
+
+  private final String _meterName;
+  private final String _unit;
+  private final boolean _global;
+  @Nullable
+  private final ServerMeter _serverMeter;
+
+  MseMeter(String unit, boolean global) {
+    this(unit, global, null);
+  }
+
+  MseMeter(String unit, boolean global, @Nullable ServerMeter serverMeter) {
+    _unit = unit;
+    _global = global;
+    _serverMeter = serverMeter;
+    _meterName = Utils.toCamelCase(name().toLowerCase());
+  }
+
+  @Override
+  public String getMeterName() {
+    return _meterName;
+  }
+
+  @Override
+  public String getUnit() {
+    return _unit;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return _global;
+  }
+
+  /// Existing [ServerMeter] this entry forwards to in SERVER / DUAL mode, or 
`null` for
+  /// MSE-native meters with no legacy `pinot.server.*` series.
+  @Nullable
+  public ServerMeter getServerMeter() {
+    return _serverMeter;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java
new file mode 100644
index 00000000000..f21919e1b36
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Mode-aware metrics shim for the multi-stage engine.
+///
+/// All MSE engine call sites route through [#get()] instead of 
[ServerMetrics] directly. The
+/// active [MseMetricsMode] controls where emissions land:
+///
+/// - [MseMetricsMode#SERVER] (default): forwarded to [ServerMetrics] so 
existing `pinot.server.*`
+///   dashboards continue to work unchanged.
+/// - [MseMetricsMode#MSE]: emitted to this instance's own `pinot.mse.*` 
registry.
+/// - [MseMetricsMode#DUAL]: emitted to both, for dashboard migration windows.
+///
+/// The pre-registration [#NOOP] is in `SERVER` mode, so call sites that emit 
before any explicit
+/// registration behave the same as if they had called [ServerMetrics] 
directly. Because SERVER
+/// mode resolves the underlying handle through [ServerMetrics#get()], 
emissions land in the noop
+/// registry whenever [ServerMetrics#register] has not been called on the 
local JVM; in that case
+/// the `MSE` or `DUAL` mode must be selected for the series to surface.
+///
+/// MSE-native metrics — [MseMeter] and [MseTimer] entries with no 
`ServerMeter` / `ServerTimer`
+/// counterpart — are emitted only under `MSE` and `DUAL` modes. In `SERVER` 
mode they are
+/// silently dropped (the legacy namespace has no series to forward to).
+///
+/// **Initialization order:** call [#registerFromConfig] before constructing 
components that
+/// resolve a [PinotMeter] handle once and cache it for the JVM lifetime 
(notably the MSE
+/// `MetricsExecutor` cached handles and the inner `Metrics` class in 
`OpChainSchedulerService`).
+/// The server and broker starters preserve this ordering by registering 
immediately after
+/// `ServerMetrics.register` / `BrokerMetrics.register` and before any MSE 
runtime component is
+/// built.
+public class MseMetrics
+    extends AbstractMetrics<AbstractMetrics.QueryPhase, MseMeter, 
AbstractMetrics.Gauge, MseTimer> {
+
+  public static final String METRIC_PREFIX = "pinot.mse.";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MseMetrics.class);
+
+  private static final AbstractMetrics.QueryPhase[] EMPTY_PHASES = new 
AbstractMetrics.QueryPhase[0];
+  private static final AbstractMetrics.Gauge[] EMPTY_GAUGES = new 
AbstractMetrics.Gauge[0];
+
+  private static final PinotMeter NOOP_PINOT_METER = new 
NoopPinotMetricsRegistry().newMeter(null, null, null);
+
+  private static final MseMetrics NOOP = new MseMetrics(MseMetricsMode.SERVER, 
new NoopPinotMetricsRegistry());
+  private static final AtomicReference<MseMetrics> INSTANCE = new 
AtomicReference<>(NOOP);
+
+  /// Registers `mseMetrics` as the JVM-wide instance. Returns `true` if 
installed; `false` if
+  /// another instance was already registered (compare-and-set semantics, 
matching
+  /// [ServerMetrics#register]).
+  public static boolean register(MseMetrics mseMetrics) {
+    return INSTANCE.compareAndSet(NOOP, mseMetrics);
+  }
+
+  @VisibleForTesting
+  public static void deregister() {
+    INSTANCE.set(NOOP);
+  }
+
+  public static MseMetrics get() {
+    return INSTANCE.get();
+  }
+
+  /// Reads [Helix#CONFIG_OF_MSE_METRICS_MODE] from `instanceConfig` (which 
already contains
+  /// cluster-config keys merged in via 
`ServiceStartableUtils.applyClusterConfig(...)` at
+  /// startup) and registers a new [MseMetrics] instance with the resolved 
mode.
+  /// [MseMetricsMode#SERVER] reuses [NoopPinotMetricsRegistry] since no 
`pinot.mse.*` series are
+  /// emitted in that mode. Subsequent calls in the same JVM are no-ops 
(compare-and-set with the
+  /// NOOP placeholder).
+  public static void registerFromConfig(PinotConfiguration instanceConfig, 
PinotMetricsRegistry metricsRegistry) {
+    String modeStr = 
instanceConfig.getProperty(Helix.CONFIG_OF_MSE_METRICS_MODE, 
Helix.DEFAULT_MSE_METRICS_MODE);
+    MseMetricsMode mode;
+    try {
+      mode = MseMetricsMode.valueOf(modeStr.trim().toUpperCase());
+    } catch (IllegalArgumentException e) {
+      LOGGER.warn("Invalid value {}={}, falling back to {}", 
Helix.CONFIG_OF_MSE_METRICS_MODE, modeStr,
+          Helix.DEFAULT_MSE_METRICS_MODE);
+      mode = MseMetricsMode.valueOf(Helix.DEFAULT_MSE_METRICS_MODE);
+    }
+    PinotMetricsRegistry effectiveRegistry =
+        mode == MseMetricsMode.SERVER ? new NoopPinotMetricsRegistry() : 
metricsRegistry;
+    if (register(new MseMetrics(mode, effectiveRegistry))) {
+      LOGGER.info("Registered MseMetrics in {} mode", mode);
+    } else {
+      LOGGER.info("MseMetrics already registered ({} mode); ignoring duplicate 
registration with {} mode",
+          get().getMode(), mode);
+    }
+  }
+
+  private final MseMetricsMode _mode;
+
+  /// DUAL-mode wrappers are cached so callers asking for the same [MseMeter] 
repeatedly (e.g.
+  /// the MSE engine emission loops in `MultiStageOperator`) share one wrapper 
instead of
+  /// allocating a fresh one per increment. `null` for non-DUAL modes.
+  @Nullable
+  private final EnumMap<MseMeter, PinotMeter> _dualMeterCache;
+
+  public MseMetrics(MseMetricsMode mode, PinotMetricsRegistry metricsRegistry) 
{
+    super(METRIC_PREFIX, metricsRegistry, MseMetrics.class, false, 
Collections.emptySet());
+    _mode = mode;
+    _dualMeterCache = mode == MseMetricsMode.DUAL ? new 
EnumMap<>(MseMeter.class) : null;
+  }
+
+  public MseMetricsMode getMode() {
+    return _mode;
+  }
+
+  @Override
+  protected AbstractMetrics.QueryPhase[] getQueryPhases() {
+    return EMPTY_PHASES;
+  }
+
+  @Override
+  protected MseMeter[] getMeters() {
+    return MseMeter.values();
+  }
+
+  @Override
+  protected AbstractMetrics.Gauge[] getGauges() {
+    return EMPTY_GAUGES;
+  }
+
+  /// Single source of truth for the mode logic; the 2-arg 
[#addMeteredGlobalValue] and
+  /// [#getMeteredValue] delegate here. Reuses `reusedMeter` as a fast path so 
callers that cache
+  /// a handle (e.g. `MetricsExecutor`) avoid repeated registry lookups.
+  @Override
+  public PinotMeter addMeteredGlobalValue(MseMeter meter, long unitCount, 
PinotMeter reusedMeter) {
+    if (reusedMeter != null) {
+      reusedMeter.mark(unitCount);
+      return reusedMeter;
+    }
+    PinotMeter handle = getMeteredValue(meter);
+    handle.mark(unitCount);
+    return handle;
+  }
+
+  @Override
+  public PinotMeter getMeteredValue(MseMeter meter) {
+    ServerMeter serverMeter = meter.getServerMeter();
+    switch (_mode) {
+      case MSE:
+        return super.getMeteredValue(meter);
+      case DUAL:
+        if (serverMeter == null) {
+          return super.getMeteredValue(meter);
+        }
+        // computeIfAbsent rather than locking — EnumMap supports concurrent 
reads of distinct
+        // keys, and a racing put of the same key just dedupes to one entry on 
the next read.
+        // The DualPinotMeter itself fan-outs to two underlying handles which 
are themselves
+        // dedup'd by their registries, so a duplicate wrapper is harmless if 
it does happen.
+        return _dualMeterCache.computeIfAbsent(meter,
+            m -> new DualPinotMeter(super.getMeteredValue(m), 
ServerMetrics.get().getMeteredValue(serverMeter)));
+      case SERVER:
+      default:
+        return serverMeter == null ? NOOP_PINOT_METER : 
ServerMetrics.get().getMeteredValue(serverMeter);
+    }
+  }
+
+  @Override
+  public void addTimedValue(MseTimer timer, long duration, TimeUnit timeUnit) {
+    ServerTimer serverTimer = timer.getServerTimer();
+    if (_mode != MseMetricsMode.MSE && serverTimer != null) {
+      ServerMetrics.get().addTimedValue(serverTimer, duration, timeUnit);
+    }
+    if (_mode != MseMetricsMode.SERVER) {
+      super.addTimedValue(timer, duration, timeUnit);
+    }
+  }
+
+  /// Fan-out [PinotMeter] returned in [MseMetricsMode#DUAL] so callers that 
cache a handle mark
+  /// both registries on every increment. Read-side methods delegate to the 
primary (MSE) handle.
+  private static final class DualPinotMeter implements PinotMeter {
+    private final PinotMeter _primary;
+    private final PinotMeter _secondary;
+
+    private DualPinotMeter(PinotMeter primary, PinotMeter secondary) {
+      _primary = primary;
+      _secondary = secondary;
+    }
+
+    @Override
+    public void mark() {
+      _primary.mark();
+      _secondary.mark();
+    }
+
+    @Override
+    public void mark(long unitCount) {
+      _primary.mark(unitCount);
+      _secondary.mark(unitCount);
+    }
+
+    @Override
+    public long count() {
+      return _primary.count();
+    }
+
+    @Override
+    public Object getMetered() {
+      return _primary.getMetered();
+    }
+
+    @Override
+    public TimeUnit rateUnit() {
+      return _primary.rateUnit();
+    }
+
+    @Override
+    public String eventType() {
+      return _primary.eventType();
+    }
+
+    @Override
+    public double fifteenMinuteRate() {
+      return _primary.fifteenMinuteRate();
+    }
+
+    @Override
+    public double fiveMinuteRate() {
+      return _primary.fiveMinuteRate();
+    }
+
+    @Override
+    public double meanRate() {
+      return _primary.meanRate();
+    }
+
+    @Override
+    public double oneMinuteRate() {
+      return _primary.oneMinuteRate();
+    }
+
+    @Override
+    public Object getMetric() {
+      return _primary.getMetric();
+    }
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetricsMode.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetricsMode.java
new file mode 100644
index 00000000000..ddfc3a1439d
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetricsMode.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+/// Selects how [MseMetrics] emits multi-stage engine metrics.
+///
+/// - [#SERVER] (default): forward to [ServerMetrics] only (existing 
`pinot.server.*` series).
+/// - [#MSE]: emit to a dedicated `pinot.mse.*` registry only.
+/// - [#DUAL]: emit to both, for dashboard migration windows.
+///
+/// Read at startup from cluster config; mode changes require restart.
+///
+/// **Migration path:** SERVER is the default to preserve `pinot.server.*` 
dashboards. Operators
+/// migrating to the `pinot.mse.*` surface should flip to DUAL for an overlap 
window, point
+/// dashboards/alerts at `pinot.mse.*`, then flip to MSE. SERVER mode (and the
+/// [MseMeter#getServerMeter()] / [MseTimer#getServerTimer()] forwarding 
links) is the
+/// backward-compat shim and can be removed once the legacy series has no 
remaining consumers.
+public enum MseMetricsMode {
+  SERVER,
+  MSE,
+  DUAL
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseTimer.java
new file mode 100644
index 00000000000..33e5c3305f9
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseTimer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.Utils;
+
+
+/// Timers for the multi-stage engine, emitted via [MseMetrics] as 
`pinot.mse.*` when the cluster
+/// is configured for [MseMetricsMode#MSE] or [MseMetricsMode#DUAL].
+///
+/// Each entry optionally carries a [ServerTimer] counterpart. When present,
+/// [MseMetricsMode#SERVER] and [MseMetricsMode#DUAL] forward emissions to the 
existing
+/// `pinot.server.*` series. Entries with no counterpart (MSE-native timers 
added after the
+/// migration) are emitted only under MSE / DUAL modes and silently dropped in 
SERVER mode.
+public enum MseTimer implements AbstractMetrics.Timer {
+  HASH_JOIN_BUILD_TABLE_CPU_TIME_MS(true, 
ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS),
+  SERIALIZATION_CPU_TIME_MS(true, 
ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS),
+  DESERIALIZATION_CPU_TIME_MS(true, 
ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS),
+  RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS(true, 
ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS),
+  RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS(true, 
ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS);
+
+  private final String _timerName;
+  private final boolean _global;
+  @Nullable
+  private final ServerTimer _serverTimer;
+
+  MseTimer(boolean global) {
+    this(global, null);
+  }
+
+  MseTimer(boolean global, @Nullable ServerTimer serverTimer) {
+    _global = global;
+    _serverTimer = serverTimer;
+    _timerName = Utils.toCamelCase(name().toLowerCase());
+  }
+
+  @Override
+  public String getTimerName() {
+    return _timerName;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return _global;
+  }
+
+  /// Existing [ServerTimer] this entry forwards to in SERVER / DUAL mode, or 
`null` for
+  /// MSE-native timers with no legacy `pinot.server.*` series.
+  @Nullable
+  public ServerTimer getServerTimer() {
+    return _serverTimer;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MseMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MseMetricsTest.java
new file mode 100644
index 00000000000..efebe5c4a4f
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MseMetricsTest.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.plugin.metrics.fake.FakeMetricsFactory;
+import org.apache.pinot.plugin.metrics.fake.FakeMetricsInspector;
+import org.apache.pinot.plugin.metrics.fake.FakePinotMetricsRegistry;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetricName;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Mode-aware behavior tests for {@link MseMetrics}. SERVER mode must forward 
to
+ * {@link ServerMetrics} only; MSE mode emits {@code pinot.mse.*} only; DUAL 
hits both.
+ */
+public class MseMetricsTest {
+
+  @BeforeClass
+  public void initMetricsFactory() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(CONFIG_OF_METRICS_FACTORY_CLASS_NAME, 
FakeMetricsFactory.class.getName());
+    PinotMetricUtils.init(config);
+  }
+
+  @AfterClass
+  public void cleanUpMetricsFactory() {
+    PinotMetricUtils.cleanUp();
+  }
+
+  @AfterMethod
+  public void deregisterSingletons() {
+    MseMetrics.deregister();
+    ServerMetrics.deregister();
+  }
+
+  @Test
+  public void serverModeForwardsToServerMetricsOnly() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    FakeMetricsInspector serverInspector = new 
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    mseMetrics.addMeteredGlobalValue(MseMeter.QUERIES, 7L);
+
+    PinotMetricName serverName = serverInspector.lastMetric();
+    assertNotNull(serverName, "SERVER mode should register a meter under 
ServerMetrics");
+    assertEquals(serverInspector.getMeteredCount(serverName), 7L);
+    assertTrue(mseRegistry.allMetrics().isEmpty(), "SERVER mode must not emit 
to the pinot.mse.* registry");
+  }
+
+  @Test
+  public void mseModeEmitsToOwnRegistryOnly() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.MSE, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_STARTED, 3L);
+
+    PinotMetricName mseName = mseInspector.lastMetric();
+    assertNotNull(mseName, "MSE mode should register a meter under 
MseMetrics");
+    assertEquals(mseInspector.getMeteredCount(mseName), 3L);
+    assertTrue(serverMetrics.getMetricsRegistry().allMetrics().isEmpty(),
+        "MSE mode must not touch the ServerMetrics registry");
+  }
+
+  @Test
+  public void dualModeEmitsToBothRegistries() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    FakeMetricsInspector serverInspector = new 
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_COMPLETED, 5L);
+
+    assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 5L);
+    
assertEquals(serverInspector.getMeteredCount(serverInspector.lastMetric()), 5L);
+  }
+
+  @Test
+  public void dualModeReusedMeterMarksBothSides() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    FakeMetricsInspector serverInspector = new 
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    // Mimic the MetricsExecutor caching pattern: resolve once, then mark() 
repeatedly.
+    PinotMeter cached = mseMetrics.getMeteredValue(MseMeter.EMITTED_ROWS);
+    cached.mark(11L);
+    cached.mark(4L);
+
+    assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 15L);
+    
assertEquals(serverInspector.getMeteredCount(serverInspector.lastMetric()), 
15L);
+  }
+
+  @Test
+  public void addTimedValueForwardsByMode() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    FakeMetricsInspector serverInspector = new 
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    mseMetrics.addTimedValue(MseTimer.SERIALIZATION_CPU_TIME_MS, 250, 
TimeUnit.MILLISECONDS);
+
+    assertEquals(mseInspector.getTimerSumMs(mseInspector.lastMetric()), 250);
+    assertEquals(serverInspector.getTimerSumMs(serverInspector.lastMetric()), 
250);
+  }
+
+  @Test
+  public void getMeteredValueReturnsServerHandleInServerMode() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, new 
FakePinotMetricsRegistry());
+    MseMetrics.register(mseMetrics);
+
+    PinotMeter direct = 
serverMetrics.getMeteredValue(MseMeter.QUERIES.getServerMeter());
+    PinotMeter throughShim = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+    assertSame(throughShim, direct, "SERVER mode should hand back the same 
ServerMetrics handle");
+  }
+
+  @Test
+  public void noopFallbackIsServerMode() {
+    // Default (no register call): must behave like the old 
direct-ServerMetrics call sites.
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    FakeMetricsInspector serverInspector = new 
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+    MseMetrics.get().addMeteredGlobalValue(MseMeter.RUNNER_STARTED_TASKS, 1L);
+
+    
assertEquals(serverInspector.getMeteredCount(serverInspector.lastMetric()), 1L);
+  }
+
+  @Test
+  public void registerFromConfigParsesModeFromConfig() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty("pinot.metrics.mse.mode", "DUAL");
+
+    MseMetrics.registerFromConfig(config, new FakePinotMetricsRegistry());
+
+    assertEquals(MseMetrics.get().getMode(), MseMetricsMode.DUAL);
+  }
+
+  @Test
+  public void registerFromConfigDefaultsToServerMode() {
+    MseMetrics.registerFromConfig(new PinotConfiguration(), new 
FakePinotMetricsRegistry());
+
+    assertEquals(MseMetrics.get().getMode(), MseMetricsMode.SERVER);
+  }
+
+  @Test
+  public void registerFromConfigFallsBackOnInvalidValue() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty("pinot.metrics.mse.mode", "NOT_A_MODE");
+
+    MseMetrics.registerFromConfig(config, new FakePinotMetricsRegistry());
+
+    assertEquals(MseMetrics.get().getMode(), MseMetricsMode.SERVER);
+  }
+
+  @Test
+  public void serverModeWithoutRegisteredServerMetricsSilentlyDropsEmissions() 
{
+    // ServerMetrics.register() deliberately not called — the resolved handle 
is the NOOP singleton.
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    // Must not throw — SERVER mode resolves through ServerMetrics.get(); 
documented behavior is
+    // that emissions are silently dropped when the underlying singleton is 
the noop instance.
+    mseMetrics.addMeteredGlobalValue(MseMeter.QUERIES, 1L);
+    mseMetrics.addTimedValue(MseTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS, 5L, 
TimeUnit.MILLISECONDS);
+
+    assertTrue(mseRegistry.allMetrics().isEmpty(),
+        "SERVER mode must not register any pinot.mse.* meters even when 
forwarding is a noop");
+  }
+
+  @Test
+  public void mseModeStillEmitsWhenServerMetricsIsNoop() {
+    // ServerMetrics is noop; MSE mode emits to the local registry instead of 
forwarding, so
+    // engine emissions surface even without a registered ServerMetrics.
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.MSE, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_STARTED, 2L);
+
+    assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 2L);
+  }
+
+  @Test
+  public void dualModeWrapperIsCachedAcrossLookups() {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, new 
FakePinotMetricsRegistry());
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    PinotMeter first = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+    PinotMeter second = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+    PinotMeter other = mseMetrics.getMeteredValue(MseMeter.OPCHAINS_STARTED);
+
+    assertSame(second, first, "DUAL mode must return the cached wrapper for 
repeated lookups");
+    assertNotNull(other);
+    assertTrue(first != other, "Distinct meters must get distinct wrappers");
+  }
+
+  @Test
+  public void serverModeMeterWithoutCounterpartReturnsNoop()
+      throws Exception {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, new 
FakePinotMetricsRegistry());
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    // Simulate an MSE-native meter (no ServerMeter counterpart added in a 
future release) by
+    // nulling the field on an existing entry for the duration of the test.
+    withNullServerCounterpart(MseMeter.QUERIES, () -> {
+      PinotMeter handle = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+      assertNotNull(handle, "SERVER mode without a counterpart must still 
return a non-null handle");
+      handle.mark(7);
+      assertEquals(handle.count(), 0L, "Returned handle must be the noop meter 
— SERVER mode has no series to mark");
+    });
+  }
+
+  @Test
+  public void mseModeMeterWithoutCounterpartStillEmits()
+      throws Exception {
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.MSE, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    withNullServerCounterpart(MseMeter.OPCHAINS_STARTED, () -> {
+      mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_STARTED, 9L);
+    });
+
+    assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 9L);
+  }
+
+  @Test
+  public void dualModeMeterWithoutCounterpartOnlyEmitsToMseSide()
+      throws Exception {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+    FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    withNullServerCounterpart(MseMeter.EMITTED_ROWS, () -> {
+      mseMetrics.addMeteredGlobalValue(MseMeter.EMITTED_ROWS, 4L);
+    });
+
+    assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 4L);
+    assertTrue(serverMetrics.getMetricsRegistry().allMetrics().isEmpty(),
+        "DUAL mode must not register any pinot.server.* meter when there is no 
counterpart");
+  }
+
+  @Test
+  public void serverModeTimerWithoutCounterpartIsDropped()
+      throws Exception {
+    ServerMetrics serverMetrics = new ServerMetrics(new 
FakePinotMetricsRegistry());
+    ServerMetrics.register(serverMetrics);
+    MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, new 
FakePinotMetricsRegistry());
+    assertTrue(MseMetrics.register(mseMetrics));
+
+    withNullServerTimerCounterpart(MseTimer.SERIALIZATION_CPU_TIME_MS, () -> {
+      mseMetrics.addTimedValue(MseTimer.SERIALIZATION_CPU_TIME_MS, 250, 
TimeUnit.MILLISECONDS);
+    });
+
+    assertTrue(serverMetrics.getMetricsRegistry().allMetrics().isEmpty(),
+        "SERVER mode without a timer counterpart must not register any 
pinot.server.* timer");
+  }
+
+  @Test
+  public void registerIsCompareAndSetAgainstNoop() {
+    MseMetrics first = new MseMetrics(MseMetricsMode.MSE, new 
FakePinotMetricsRegistry());
+    assertTrue(MseMetrics.register(first));
+    assertSame(MseMetrics.get(), first);
+
+    MseMetrics second = new MseMetrics(MseMetricsMode.DUAL, new 
FakePinotMetricsRegistry());
+    assertFalse(MseMetrics.register(second), "Second register must fail; CAS 
guards against accidental swap");
+    assertSame(MseMetrics.get(), first);
+  }
+
+  /// Reflection helper: temporarily null out `MseMeter._serverMeter` to 
simulate an MSE-native
+  /// entry, run the test body, then restore. Used to exercise the 
no-counterpart branches
+  /// without adding a real null-counterpart entry to the public enum.
+  private static void withNullServerCounterpart(MseMeter meter, Runnable body)
+      throws Exception {
+    Field field = MseMeter.class.getDeclaredField("_serverMeter");
+    field.setAccessible(true);
+    Object original = field.get(meter);
+    field.set(meter, null);
+    try {
+      body.run();
+    } finally {
+      field.set(meter, original);
+    }
+  }
+
+  private static void withNullServerTimerCounterpart(MseTimer timer, Runnable 
body)
+      throws Exception {
+    Field field = MseTimer.class.getDeclaredField("_serverTimer");
+    field.setAccessible(true);
+    Object original = field.get(timer);
+    field.set(timer, null);
+    try {
+      body.run();
+    } finally {
+      field.set(timer, original);
+    }
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
index a6ce99fd0cf..017c28d2fe6 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
@@ -25,6 +25,10 @@ import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
+import org.apache.pinot.common.metrics.MseMetricsMode;
+import org.apache.pinot.common.metrics.MseTimer;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -52,12 +56,22 @@ public abstract class BrokerPrometheusMetricsTest extends 
PinotPrometheusMetrics
 
   private static final List<BrokerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME = 
List.of(BrokerGauge.REQUEST_SIZE);
 
+  // pinot.mse.* metrics share the role-agnostic prefix and must be exported 
from every JVM role
+  // that registers MseMetrics; on broker JVMs this exercises the broker.yml 
catch-all rule.
+  private static final String EXPORTED_MSE_METRIC_PREFIX = "pinot_mse_";
+
   private BrokerMetrics _brokerMetrics;
 
+  private MseMetrics _mseMetrics;
+
   @BeforeClass
   public void setup()
       throws Exception {
     _brokerMetrics = new 
BrokerMetrics(_pinotMetricsFactory.getPinotMetricsRegistry());
+    // MSE mode so emissions land in this JVM's PinotMetricsRegistry (the one 
the JMX exporter is
+    // scraping). MseMetrics is constructed with the shared registry, 
mirroring how broker JVMs
+    // running in MSE/DUAL mode emit pinot.mse.* beans alongside 
pinot.broker.*.
+    _mseMetrics = new MseMetrics(MseMetricsMode.MSE, 
_pinotMetricsFactory.getPinotMetricsRegistry());
   }
 
   @Test(dataProvider = "brokerTimers")
@@ -115,6 +129,18 @@ public abstract class BrokerPrometheusMetricsTest extends 
PinotPrometheusMetrics
     }
   }
 
+  @Test(dataProvider = "mseMeters")
+  public void mseMeterExportedFromBrokerJmx(MseMeter meter) {
+    _mseMetrics.addMeteredGlobalValue(meter, 1L);
+    assertMeterExportedCorrectly(meter.getMeterName(), 
EXPORTED_MSE_METRIC_PREFIX);
+  }
+
+  @Test(dataProvider = "mseTimers")
+  public void mseTimerExportedFromBrokerJmx(MseTimer timer) {
+    _mseMetrics.addTimedValue(timer, 30_000, TimeUnit.MILLISECONDS);
+    assertTimerExportedCorrectly(timer.getTimerName(), 
EXPORTED_MSE_METRIC_PREFIX);
+  }
+
   @DataProvider(name = "brokerTimers")
   public Object[] brokerTimers() {
     return BrokerTimer.values();
@@ -129,4 +155,14 @@ public abstract class BrokerPrometheusMetricsTest extends 
PinotPrometheusMetrics
   public Object[] brokerGauges() {
     return BrokerGauge.values();
   }
+
+  @DataProvider(name = "mseMeters")
+  public Object[] mseMeters() {
+    return MseMeter.values();
+  }
+
+  @DataProvider(name = "mseTimers")
+  public Object[] mseTimers() {
+    return MseTimer.values();
+  }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
index c7ad6eff399..c92f762fbeb 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
@@ -134,19 +134,31 @@ public class PrometheusTemplateRegexpTest {
   }
 
   /**
-   * broker.yml: global gauge/meter/timer (no table scope).
-   * e.g. pinot.broker.totalDocuments
+   * broker.yml: global gauge/meter/timer (no table scope). The catch-all is 
group-flexible at the
+   * prefix so non-broker MBean groups registered in the broker JVM (e.g. 
pinot.mse.*) are also
+   * exported with this rule.
+   * e.g. pinot.broker.totalDocuments, pinot.mse.queries
    */
   @Test
   public void testBrokerGlobalMeterPattern()
       throws Exception {
-    String pattern = loadPatternByName("broker.yml", "pinot_broker_$1_$2");
-    Matcher m = Pattern.compile(pattern).matcher(
+    String pattern = loadPatternByName("broker.yml", "pinot_$1_$2_$3");
+    Pattern compiled = Pattern.compile(pattern);
+
+    Matcher brokerMatch = compiled.matcher(
         "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", "
             + "name=\"pinot.broker.totalDocuments\"><>Value");
-    Assert.assertTrue(m.matches(), "Pattern should match global broker gauge");
-    Assert.assertEquals(m.group(1), "totalDocuments");
-    Assert.assertEquals(m.group(2), "Value");
+    Assert.assertTrue(brokerMatch.matches(), "Pattern should match global 
broker gauge");
+    Assert.assertEquals(brokerMatch.group(1), "broker");
+    Assert.assertEquals(brokerMatch.group(2), "totalDocuments");
+    Assert.assertEquals(brokerMatch.group(3), "Value");
+
+    Matcher mseMatch = compiled.matcher(
+        "\"org.apache.pinot.common.metrics\"<type=\"MseMetrics\", 
name=\"pinot.mse.queries\"><>Count");
+    Assert.assertTrue(mseMatch.matches(), "Pattern should also match 
pinot.mse.* mbeans on broker JVMs");
+    Assert.assertEquals(mseMatch.group(1), "mse");
+    Assert.assertEquals(mseMatch.group(2), "queries");
+    Assert.assertEquals(mseMatch.group(3), "Count");
   }
 
   // ---- Server patterns ----
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 79fb88f56ed..274a415c55a 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -20,6 +20,10 @@ package org.apache.pinot.common.metrics.prometheus;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
+import org.apache.pinot.common.metrics.MseMetricsMode;
+import org.apache.pinot.common.metrics.MseTimer;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -62,12 +66,22 @@ public abstract class ServerPrometheusMetricsTest extends 
PinotPrometheusMetrics
       List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, 
ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
           ServerGauge.LUCENE_INDEXING_DELAY_MS, 
ServerGauge.LUCENE_INDEXING_DELAY_DOCS);
 
+  // pinot.mse.* metrics share the role-agnostic prefix and must be exported 
from every JVM role
+  // that registers MseMetrics; on server JVMs this exercises the server.yml 
catch-all rule.
+  private static final String EXPORTED_MSE_METRIC_PREFIX = "pinot_mse_";
+
   private ServerMetrics _serverMetrics;
 
+  private MseMetrics _mseMetrics;
+
   @BeforeClass
   public void setup()
       throws Exception {
     _serverMetrics = new 
ServerMetrics(_pinotMetricsFactory.getPinotMetricsRegistry());
+    // MSE mode so emissions land in this JVM's PinotMetricsRegistry (the one 
the JMX exporter is
+    // scraping). MseMetrics is constructed with the shared registry, 
mirroring how server JVMs
+    // running in MSE/DUAL mode emit pinot.mse.* beans alongside 
pinot.server.*.
+    _mseMetrics = new MseMetrics(MseMetricsMode.MSE, 
_pinotMetricsFactory.getPinotMetricsRegistry());
   }
 
   @Test(dataProvider = "serverTimers")
@@ -149,6 +163,18 @@ public abstract class ServerPrometheusMetricsTest extends 
PinotPrometheusMetrics
     _serverMetrics.addMeteredTableValue(labels, serverMeter, 4L);
   }
 
+  @Test(dataProvider = "mseMeters")
+  public void mseMeterExportedFromServerJmx(MseMeter meter) {
+    _mseMetrics.addMeteredGlobalValue(meter, 1L);
+    assertMeterExportedCorrectly(meter.getMeterName(), 
EXPORTED_MSE_METRIC_PREFIX);
+  }
+
+  @Test(dataProvider = "mseTimers")
+  public void mseTimerExportedFromServerJmx(MseTimer timer) {
+    _mseMetrics.addTimedValue(timer, 30_000, TimeUnit.MILLISECONDS);
+    assertTimerExportedCorrectly(timer.getTimerName(), 
EXPORTED_MSE_METRIC_PREFIX);
+  }
+
   @DataProvider(name = "serverTimers")
   public Object[] serverTimers() {
     return ServerTimer.values();  // Provide all values of ServerTimer enum
@@ -164,6 +190,16 @@ public abstract class ServerPrometheusMetricsTest extends 
PinotPrometheusMetrics
     return ServerGauge.values();  // Provide all values of ServerTimer enum
   }
 
+  @DataProvider(name = "mseMeters")
+  public Object[] mseMeters() {
+    return MseMeter.values();
+  }
+
+  @DataProvider(name = "mseTimers")
+  public Object[] mseTimers() {
+    return MseTimer.values();
+  }
+
   private boolean meterTrackingRealtimeExceptions(ServerMeter serverMeter) {
     return serverMeter == ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS
         || serverMeter == ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS
diff --git 
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
 
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
index d74c7f63d81..619d9b7bbba 100644
--- 
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
+++ 
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
@@ -22,6 +22,8 @@ package org.apache.pinot.plugin.metrics.dropwizard.prometheus;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseTimer;
 import org.apache.pinot.common.metrics.prometheus.BrokerPrometheusMetricsTest;
 import org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory;
 import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
@@ -58,4 +60,14 @@ public class DropwizardBrokerPrometheusMetricsTest extends 
BrokerPrometheusMetri
   public void gaugeTest(BrokerGauge gauge) {
     super.gaugeTest(gauge);
   }
+
+  @Test(dataProvider = "mseMeters", enabled = false)
+  public void mseMeterExportedFromBrokerJmx(MseMeter meter) {
+    super.mseMeterExportedFromBrokerJmx(meter);
+  }
+
+  @Test(dataProvider = "mseTimers", enabled = false)
+  public void mseTimerExportedFromBrokerJmx(MseTimer timer) {
+    super.mseTimerExportedFromBrokerJmx(timer);
+  }
 }
diff --git 
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
 
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
index ac6dfc60252..13be096fc28 100644
--- 
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
+++ 
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.pinot.plugin.metrics.dropwizard.prometheus;
 
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseTimer;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerTimer;
@@ -59,4 +61,14 @@ public class DropwizardServerPrometheusMetricsTest extends 
ServerPrometheusMetri
   public void gaugeTest(ServerGauge serverGauge) {
     super.gaugeTest(serverGauge);
   }
+
+  @Test(dataProvider = "mseMeters", enabled = false)
+  public void mseMeterExportedFromServerJmx(MseMeter meter) {
+    super.mseMeterExportedFromServerJmx(meter);
+  }
+
+  @Test(dataProvider = "mseTimers", enabled = false)
+  public void mseTimerExportedFromServerJmx(MseTimer timer) {
+    super.mseTimerExportedFromServerJmx(timer);
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 53398023be7..37cf8f8c1a4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -37,7 +37,8 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.config.TlsConfig;
-import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -193,9 +194,10 @@ public class QueryRunner {
             Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);
 
     ServerMetrics serverMetrics = ServerMetrics.get();
+    MseMetrics mseMetrics = MseMetrics.get();
     MetricsExecutor metricsExecutor = new MetricsExecutor(baseExecutorService,
-        
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_RUNNER_STARTED_TASKS),
-        
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_RUNNER_COMPLETED_TASKS));
+        mseMetrics.getMeteredValue(MseMeter.RUNNER_STARTED_TASKS),
+        mseMetrics.getMeteredValue(MseMeter.RUNNER_COMPLETED_TASKS));
     _executorService = 
QueryThreadContext.contextAwareExecutorService(metricsExecutor);
 
     int hardLimit = 
HardLimitExecutor.getMultiStageExecutorHardLimit(serverConf);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 599d052cf24..e946ae4c467 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -35,7 +35,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.StatMap;
-import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
@@ -275,11 +276,12 @@ public class OpChainSchedulerService {
   }
 
   private static class Metrics {
-    private final PinotMeter _startedOpchains = 
ServerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter();
-    private final PinotMeter _competedOpchains = 
ServerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter();
-    private final PinotMeter _emittedRows = 
ServerMeter.MSE_EMITTED_ROWS.getGlobalMeter();
-    private final PinotMeter _cpuExecutionTimeMs = 
ServerMeter.MSE_CPU_EXECUTION_TIME_MS.getGlobalMeter();
-    private final PinotMeter _memoryAllocatedBytes = 
ServerMeter.MSE_MEMORY_ALLOCATED_BYTES.getGlobalMeter();
+    private final MseMetrics _mseMetrics = MseMetrics.get();
+    private final PinotMeter _startedOpchains = 
_mseMetrics.getMeteredValue(MseMeter.OPCHAINS_STARTED);
+    private final PinotMeter _competedOpchains = 
_mseMetrics.getMeteredValue(MseMeter.OPCHAINS_COMPLETED);
+    private final PinotMeter _emittedRows = 
_mseMetrics.getMeteredValue(MseMeter.EMITTED_ROWS);
+    private final PinotMeter _cpuExecutionTimeMs = 
_mseMetrics.getMeteredValue(MseMeter.CPU_EXECUTION_TIME_MS);
+    private final PinotMeter _memoryAllocatedBytes = 
_mseMetrics.getMeteredValue(MseMeter.MEMORY_ALLOCATED_BYTES);
 
     private static final String EMITTED_ROWS = "EMITTED_ROWS";
     private static final String EXECUTION_TIME_MS = "EXECUTION_TIME_MS";
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 05bfe47dda6..8a481526d3b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,7 +28,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datatable.StatMap;
-import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.MseMetrics;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
@@ -308,17 +308,17 @@ public class MailboxSendOperator extends 
MultiStageOperator {
   }
 
   private void updateMetrics(MultiStageQueryStats queryStats) {
-    ServerMetrics serverMetrics = ServerMetrics.get();
+    MseMetrics mseMetrics = MseMetrics.get();
     if (queryStats == null) {
       LOGGER.info("Query stats not found in the EOS block.");
     } else {
       for (MultiStageQueryStats.StageStats.Closed closed : 
queryStats.getClosedStats()) {
         if (closed != null) {
-          closed.forEach((type, stats) -> type.updateServerMetrics(stats, 
serverMetrics));
+          closed.forEach((type, stats) -> type.updateMseMetrics(stats, 
mseMetrics));
         }
       }
       queryStats.getCurrentStats().forEach((type, stats) -> {
-        type.updateServerMetrics(stats, serverMetrics);
+        type.updateMseMetrics(stats, mseMetrics);
       });
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index b33391d1284..2075169bf03 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -29,9 +29,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.StatMap;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
+import org.apache.pinot.common.metrics.MseTimer;
 import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -259,7 +259,7 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
       /// So far this keys do not need to be modified from here because they 
are incremented in a per-worker basis:
       /// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED
       /// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED
-      /// public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics);
+      /// public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics);
     },
     FILTER(1, FilterOperator.StatKey.class) {
       @Override
@@ -280,15 +280,15 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
       }
 
       @Override
-      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
-        super.updateServerMetrics(map, serverMetrics);
+      public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
+        super.updateMseMetrics(map, mseMetrics);
         @SuppressWarnings("unchecked")
         StatMap<HashJoinOperator.StatKey> stats = 
(StatMap<HashJoinOperator.StatKey>) map;
         boolean maxRowsInJoinReached = 
stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED);
         if (maxRowsInJoinReached) {
-          
serverMetrics.addMeteredGlobalValue(ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED,
 1);
+          
mseMetrics.addMeteredGlobalValue(MseMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED, 1);
         }
-        
serverMetrics.addTimedValue(ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS,
+        mseMetrics.addTimedValue(MseTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS,
             
stats.getLong(HashJoinOperator.StatKey.TIME_BUILDING_HASH_TABLE_MS), 
TimeUnit.MILLISECONDS);
       }
     },
@@ -329,23 +329,23 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
       }
 
       @Override
-      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
-        super.updateServerMetrics(map, serverMetrics);
+      public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
+        super.updateMseMetrics(map, mseMetrics);
         @SuppressWarnings("unchecked")
         StatMap<BaseMailboxReceiveOperator.StatKey> stats = 
(StatMap<BaseMailboxReceiveOperator.StatKey>) map;
 
-        
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES,
+        mseMetrics.addMeteredGlobalValue(MseMeter.IN_MEMORY_MESSAGES,
             
stats.getInt(BaseMailboxReceiveOperator.StatKey.IN_MEMORY_MESSAGES));
-        
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_MESSAGES,
+        mseMetrics.addMeteredGlobalValue(MseMeter.RAW_MESSAGES,
             stats.getInt(BaseMailboxReceiveOperator.StatKey.RAW_MESSAGES));
-        serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_BYTES,
+        mseMetrics.addMeteredGlobalValue(MseMeter.RAW_BYTES,
             
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZED_BYTES));
 
-        
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS,
+        mseMetrics.addTimedValue(MseTimer.DESERIALIZATION_CPU_TIME_MS,
             
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZATION_TIME_MS), 
TimeUnit.MILLISECONDS);
-        
serverMetrics.addTimedValue(ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS,
+        mseMetrics.addTimedValue(MseTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS,
             
stats.getLong(BaseMailboxReceiveOperator.StatKey.DOWNSTREAM_WAIT_MS), 
TimeUnit.MILLISECONDS);
-        
serverMetrics.addTimedValue(ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS,
+        mseMetrics.addTimedValue(MseTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS,
             
stats.getLong(BaseMailboxReceiveOperator.StatKey.UPSTREAM_WAIT_MS), 
TimeUnit.MILLISECONDS);
       }
     },
@@ -358,10 +358,10 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
       }
 
       @Override
-      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+      public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
         @SuppressWarnings("unchecked")
         StatMap<MailboxSendOperator.StatKey> stats = 
(StatMap<MailboxSendOperator.StatKey>) map;
-        
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS,
+        mseMetrics.addTimedValue(MseTimer.SERIALIZATION_CPU_TIME_MS,
             stats.getLong(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS), 
TimeUnit.MILLISECONDS);
       }
     },
@@ -417,11 +417,11 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
       }
 
       @Override
-      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+      public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
         @SuppressWarnings("unchecked")
         StatMap<WindowAggregateOperator.StatKey> stats = 
(StatMap<WindowAggregateOperator.StatKey>) map;
         if 
(stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED)) {
-          
serverMetrics.addMeteredGlobalValue(ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED, 
1);
+          
mseMetrics.addMeteredGlobalValue(MseMeter.WINDOW_TIMES_MAX_ROWS_REACHED, 1);
         }
       }
     },
@@ -508,7 +508,7 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
      */
     public abstract void mergeInto(BrokerResponseNativeV2 response, StatMap<?> 
map);
 
-    public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+    public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
       // Do nothing by default
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index de88051cfb5..8c018058389 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -39,8 +39,8 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.apache.pinot.common.config.TlsConfig;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.NamedThreadFactory;
@@ -163,10 +163,10 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
             "query_submission_executor_on_" + _port + "_port",
             CommonConstants.Server.DEFAULT_MULTISTAGE_SUBMISSION_EXEC_TYPE);
 
-    ServerMetrics serverMetrics = ServerMetrics.get();
+    MseMetrics mseMetrics = MseMetrics.get();
     _submissionExecutorService = new MetricsExecutor(baseExecutorService,
-        
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_STARTED_TASKS),
-        
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_COMPLETED_TASKS));
+        mseMetrics.getMeteredValue(MseMeter.SUBMISSION_STARTED_TASKS),
+        mseMetrics.getMeteredValue(MseMeter.SUBMISSION_COMPLETED_TASKS));
 
     NamedThreadFactory explainThreadFactory = new 
NamedThreadFactory("query_explain_on_" + _port + "_port");
     _explainExecutorService = 
Executors.newSingleThreadExecutor(explainThreadFactory);
@@ -260,7 +260,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   @Override
   public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryResponse> responseObserver) {
     // Match the SSE QUERIES counter semantics by counting requests as soon as 
they reach the handler.
-    ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
+    MseMetrics.get().addMeteredGlobalValue(MseMeter.QUERIES, 1L);
     Map<String, String> reqMetadata;
     try {
       reqMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 59627f3ef25..46682cc4a4f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -62,6 +62,7 @@ import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.DefaultClusterConfigChangeHandler;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metrics.MseMetrics;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -674,6 +675,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _serverMetrics.initializeGlobalMeters();
     _serverMetrics.setValueOfGlobalGauge(ServerGauge.VERSION, 
PinotVersion.VERSION_METRIC_NAME, 1);
     ServerMetrics.register(_serverMetrics);
+    MseMetrics.registerFromConfig(_serverConf, metricsRegistry);
 
     LOGGER.info("Initializing reload job status cache");
     _reloadJobStatusCache = new ServerReloadJobStatusCache(_instanceId);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ecd52695a2e..6b4628c1ac1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -279,6 +279,14 @@ public class CommonConstants {
         
"pinot.beta.multistage.engine.max.server.query.threads.hardlimit.factor";
     public static final String 
DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR = "4";
 
+    /// Cluster-config knob that selects how the multi-stage engine emits 
metrics.
+    /// Read at startup by server and broker; mode changes require a restart 
to take effect.
+    /// Valid values: {@code SERVER} (default; forward to {@code 
pinot.server.*}), {@code MSE}
+    /// (emit only {@code pinot.mse.*}), {@code DUAL} (emit both). See
+    /// {@code org.apache.pinot.common.metrics.MseMetricsMode}.
+    public static final String CONFIG_OF_MSE_METRICS_MODE = 
"pinot.metrics.mse.mode";
+    public static final String DEFAULT_MSE_METRICS_MODE = "SERVER";
+
     // Preprocess throttle configs
     public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM =
         "pinot.server.max.segment.preprocess.parallelism";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to