This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9186c382214 Add MseMetrics with configurable emission mode for
multi-stage engine (#18550)
9186c382214 is described below
commit 9186c382214a601bd49313fe2ab8ef82b9d28483
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Jun 3 15:39:40 2026 -0700
Add MseMetrics with configurable emission mode for multi-stage engine
(#18550)
---
.../jmx_prometheus_javaagent/configs/broker.yml | 7 +-
.../broker/broker/helix/BaseBrokerStarter.java | 2 +
.../org/apache/pinot/common/metrics/MseMeter.java | 87 +++++
.../apache/pinot/common/metrics/MseMetrics.java | 267 +++++++++++++++
.../pinot/common/metrics/MseMetricsMode.java | 38 +++
.../org/apache/pinot/common/metrics/MseTimer.java | 70 ++++
.../pinot/common/metrics/MseMetricsTest.java | 363 +++++++++++++++++++++
.../prometheus/BrokerPrometheusMetricsTest.java | 36 ++
.../prometheus/PrometheusTemplateRegexpTest.java | 26 +-
.../prometheus/ServerPrometheusMetricsTest.java | 36 ++
.../DropwizardBrokerPrometheusMetricsTest.java | 12 +
.../DropwizardServerPrometheusMetricsTest.java | 12 +
.../apache/pinot/query/runtime/QueryRunner.java | 8 +-
.../runtime/executor/OpChainSchedulerService.java | 14 +-
.../runtime/operator/MailboxSendOperator.java | 8 +-
.../query/runtime/operator/MultiStageOperator.java | 42 +--
.../pinot/query/service/server/QueryServer.java | 12 +-
.../server/starter/helix/BaseServerStarter.java | 2 +
.../apache/pinot/spi/utils/CommonConstants.java | 8 +
19 files changed, 1000 insertions(+), 50 deletions(-)
diff --git
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
index 771f45fe526..f0fbe97f569 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
@@ -34,7 +34,8 @@ rules:
- pattern:
"\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"BrokerMetrics\",
name=\"pinot\\.broker\\.(uncaughtGet|uncaughtPost|queryRejected|requestCompilation|resourceMissing)Exceptions\"><>(\\w+)"
name: "pinot_broker_exceptions_$1_$2"
cache: true
-# All global gauge/meters/timers
-- pattern: "\"?org\\.apache\\.pinot\\.common\\.metrics\"?<type=\"?\\w+\"?,
name=\"?pinot\\.broker\\.(\\w+)\"?><>(\\w+)"
- name: "pinot_broker_$1_$2"
+# All global gauge/meters/timers. Group-flexible at the prefix so non-broker
MBean groups
+# registered in this JVM (e.g. pinot.mse.* from the multi-stage engine
emitter) are also exported.
+- pattern: "\"?org\\.apache\\.pinot\\.common\\.metrics\"?<type=\"?\\w+\"?,
name=\"?pinot\\.(\\w+)\\.(\\w+)\"?><>(\\w+)"
+ name: "pinot_$1_$2_$3"
cache: true
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 255aeded2d2..eb6428cb877 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -80,6 +80,7 @@ import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.metrics.MseMetrics;
import org.apache.pinot.common.utils.PinotAppConfigs;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -416,6 +417,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_brokerConf.getProperty(Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
Broker.AdaptiveServerSelector.DEFAULT_TYPE), 1);
BrokerMetrics.register(_brokerMetrics);
+ MseMetrics.registerFromConfig(_brokerConf, _metricsRegistry);
LOGGER.info("Connecting spectator Helix manager");
initSpectatorHelixManager();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java
new file mode 100644
index 00000000000..9e3171ca3b4
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMeter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.Utils;
+
+
+/// Meters for the multi-stage engine, emitted via [MseMetrics] as
`pinot.mse.*` when the cluster
+/// is configured for [MseMetricsMode#MSE] or [MseMetricsMode#DUAL].
+///
+/// Each entry optionally carries a [ServerMeter] counterpart. When present,
+/// [MseMetricsMode#SERVER] and [MseMetricsMode#DUAL] forward emissions to the
existing
+/// `pinot.server.*` series. Entries with no counterpart (MSE-native metrics
added after the
+/// migration) are emitted only under MSE / DUAL modes and silently dropped in
SERVER mode.
+public enum MseMeter implements AbstractMetrics.Meter {
+ QUERIES("queries", true, ServerMeter.MSE_QUERIES),
+ OPCHAINS_STARTED("opchains", true, ServerMeter.MSE_OPCHAINS_STARTED),
+ OPCHAINS_COMPLETED("opchains", true, ServerMeter.MSE_OPCHAINS_COMPLETED),
+ CPU_EXECUTION_TIME_MS("milliseconds", true,
ServerMeter.MSE_CPU_EXECUTION_TIME_MS),
+ MEMORY_ALLOCATED_BYTES("bytes", true,
ServerMeter.MSE_MEMORY_ALLOCATED_BYTES),
+ EMITTED_ROWS("rows", true, ServerMeter.MSE_EMITTED_ROWS),
+ RUNNER_STARTED_TASKS("tasks", true,
ServerMeter.MULTI_STAGE_RUNNER_STARTED_TASKS),
+ RUNNER_COMPLETED_TASKS("tasks", true,
ServerMeter.MULTI_STAGE_RUNNER_COMPLETED_TASKS),
+ SUBMISSION_STARTED_TASKS("tasks", true,
ServerMeter.MULTI_STAGE_SUBMISSION_STARTED_TASKS),
+ SUBMISSION_COMPLETED_TASKS("tasks", true,
ServerMeter.MULTI_STAGE_SUBMISSION_COMPLETED_TASKS),
+ HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true,
ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED),
+ WINDOW_TIMES_MAX_ROWS_REACHED("times", true,
ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED),
+ IN_MEMORY_MESSAGES("messages", true,
ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES),
+ RAW_MESSAGES("messages", true, ServerMeter.MULTI_STAGE_RAW_MESSAGES),
+ RAW_BYTES("bytes", true, ServerMeter.MULTI_STAGE_RAW_BYTES);
+
+ private final String _meterName;
+ private final String _unit;
+ private final boolean _global;
+ @Nullable
+ private final ServerMeter _serverMeter;
+
+ MseMeter(String unit, boolean global) {
+ this(unit, global, null);
+ }
+
+ MseMeter(String unit, boolean global, @Nullable ServerMeter serverMeter) {
+ _unit = unit;
+ _global = global;
+ _serverMeter = serverMeter;
+ _meterName = Utils.toCamelCase(name().toLowerCase());
+ }
+
+ @Override
+ public String getMeterName() {
+ return _meterName;
+ }
+
+ @Override
+ public String getUnit() {
+ return _unit;
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return _global;
+ }
+
+ /// Existing [ServerMeter] this entry forwards to in SERVER / DUAL mode, or
`null` for
+ /// MSE-native meters with no legacy `pinot.server.*` series.
+ @Nullable
+ public ServerMeter getServerMeter() {
+ return _serverMeter;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java
new file mode 100644
index 00000000000..f21919e1b36
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetrics.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Mode-aware metrics shim for the multi-stage engine.
+///
+/// All MSE engine call sites route through [#get()] instead of
[ServerMetrics] directly. The
+/// active [MseMetricsMode] controls where emissions land:
+///
+/// - [MseMetricsMode#SERVER] (default): forwarded to [ServerMetrics] so
existing `pinot.server.*`
+/// dashboards continue to work unchanged.
+/// - [MseMetricsMode#MSE]: emitted to this instance's own `pinot.mse.*`
registry.
+/// - [MseMetricsMode#DUAL]: emitted to both, for dashboard migration windows.
+///
+/// The pre-registration [#NOOP] is in `SERVER` mode, so call sites that emit
before any explicit
+/// registration behave the same as if they had called [ServerMetrics]
directly. Because SERVER
+/// mode resolves the underlying handle through [ServerMetrics#get()],
emissions land in the noop
+/// registry whenever [ServerMetrics#register] has not been called on the
local JVM; in that case
+/// the `MSE` or `DUAL` mode must be selected for the series to surface.
+///
+/// MSE-native metrics — [MseMeter] and [MseTimer] entries with no
`ServerMeter` / `ServerTimer`
+/// counterpart — are emitted only under `MSE` and `DUAL` modes. In `SERVER`
mode they are
+/// silently dropped (the legacy namespace has no series to forward to).
+///
+/// **Initialization order:** call [#registerFromConfig] before constructing
components that
+/// resolve a [PinotMeter] handle once and cache it for the JVM lifetime
(notably the MSE
+/// `MetricsExecutor` cached handles and the inner `Metrics` class in
`OpChainSchedulerService`).
+/// The server and broker starters preserve this ordering by registering
immediately after
+/// `ServerMetrics.register` / `BrokerMetrics.register` and before any MSE
runtime component is
+/// built.
+public class MseMetrics
+ extends AbstractMetrics<AbstractMetrics.QueryPhase, MseMeter,
AbstractMetrics.Gauge, MseTimer> {
+
+ public static final String METRIC_PREFIX = "pinot.mse.";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MseMetrics.class);
+
+ private static final AbstractMetrics.QueryPhase[] EMPTY_PHASES = new
AbstractMetrics.QueryPhase[0];
+ private static final AbstractMetrics.Gauge[] EMPTY_GAUGES = new
AbstractMetrics.Gauge[0];
+
+ private static final PinotMeter NOOP_PINOT_METER = new
NoopPinotMetricsRegistry().newMeter(null, null, null);
+
+ private static final MseMetrics NOOP = new MseMetrics(MseMetricsMode.SERVER,
new NoopPinotMetricsRegistry());
+ private static final AtomicReference<MseMetrics> INSTANCE = new
AtomicReference<>(NOOP);
+
+ /// Registers `mseMetrics` as the JVM-wide instance. Returns `true` if
installed; `false` if
+ /// another instance was already registered (compare-and-set semantics,
matching
+ /// [ServerMetrics#register]).
+ public static boolean register(MseMetrics mseMetrics) {
+ return INSTANCE.compareAndSet(NOOP, mseMetrics);
+ }
+
+ @VisibleForTesting
+ public static void deregister() {
+ INSTANCE.set(NOOP);
+ }
+
+ public static MseMetrics get() {
+ return INSTANCE.get();
+ }
+
+ /// Reads [Helix#CONFIG_OF_MSE_METRICS_MODE] from `instanceConfig` (which
already contains
+ /// cluster-config keys merged in via
`ServiceStartableUtils.applyClusterConfig(...)` at
+ /// startup) and registers a new [MseMetrics] instance with the resolved
mode.
+ /// [MseMetricsMode#SERVER] reuses [NoopPinotMetricsRegistry] since no
`pinot.mse.*` series are
+ /// emitted in that mode. Subsequent calls in the same JVM are no-ops
(compare-and-set with the
+ /// NOOP placeholder).
+ public static void registerFromConfig(PinotConfiguration instanceConfig,
PinotMetricsRegistry metricsRegistry) {
+ String modeStr =
instanceConfig.getProperty(Helix.CONFIG_OF_MSE_METRICS_MODE,
Helix.DEFAULT_MSE_METRICS_MODE);
+ MseMetricsMode mode;
+ try {
+ mode = MseMetricsMode.valueOf(modeStr.trim().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("Invalid value {}={}, falling back to {}",
Helix.CONFIG_OF_MSE_METRICS_MODE, modeStr,
+ Helix.DEFAULT_MSE_METRICS_MODE);
+ mode = MseMetricsMode.valueOf(Helix.DEFAULT_MSE_METRICS_MODE);
+ }
+ PinotMetricsRegistry effectiveRegistry =
+ mode == MseMetricsMode.SERVER ? new NoopPinotMetricsRegistry() :
metricsRegistry;
+ if (register(new MseMetrics(mode, effectiveRegistry))) {
+ LOGGER.info("Registered MseMetrics in {} mode", mode);
+ } else {
+ LOGGER.info("MseMetrics already registered ({} mode); ignoring duplicate
registration with {} mode",
+ get().getMode(), mode);
+ }
+ }
+
+ private final MseMetricsMode _mode;
+
+ /// DUAL-mode wrappers are cached so callers asking for the same [MseMeter]
repeatedly (e.g.
+ /// the MSE engine emission loops in `MultiStageOperator`) share one wrapper
instead of
+ /// allocating a fresh one per increment. `null` for non-DUAL modes.
+ @Nullable
+ private final EnumMap<MseMeter, PinotMeter> _dualMeterCache;
+
+ public MseMetrics(MseMetricsMode mode, PinotMetricsRegistry metricsRegistry)
{
+ super(METRIC_PREFIX, metricsRegistry, MseMetrics.class, false,
Collections.emptySet());
+ _mode = mode;
+ _dualMeterCache = mode == MseMetricsMode.DUAL ? new
EnumMap<>(MseMeter.class) : null;
+ }
+
+ public MseMetricsMode getMode() {
+ return _mode;
+ }
+
+ @Override
+ protected AbstractMetrics.QueryPhase[] getQueryPhases() {
+ return EMPTY_PHASES;
+ }
+
+ @Override
+ protected MseMeter[] getMeters() {
+ return MseMeter.values();
+ }
+
+ @Override
+ protected AbstractMetrics.Gauge[] getGauges() {
+ return EMPTY_GAUGES;
+ }
+
+ /// Single source of truth for the mode logic; the 2-arg
[#addMeteredGlobalValue] and
+ /// [#getMeteredValue] delegate here. Reuses `reusedMeter` as a fast path so
callers that cache
+ /// a handle (e.g. `MetricsExecutor`) avoid repeated registry lookups.
+ @Override
+ public PinotMeter addMeteredGlobalValue(MseMeter meter, long unitCount,
PinotMeter reusedMeter) {
+ if (reusedMeter != null) {
+ reusedMeter.mark(unitCount);
+ return reusedMeter;
+ }
+ PinotMeter handle = getMeteredValue(meter);
+ handle.mark(unitCount);
+ return handle;
+ }
+
+ @Override
+ public PinotMeter getMeteredValue(MseMeter meter) {
+ ServerMeter serverMeter = meter.getServerMeter();
+ switch (_mode) {
+ case MSE:
+ return super.getMeteredValue(meter);
+ case DUAL:
+ if (serverMeter == null) {
+ return super.getMeteredValue(meter);
+ }
+ // computeIfAbsent rather than locking — EnumMap supports concurrent
reads of distinct
+ // keys, and a racing put of the same key just dedupes to one entry on
the next read.
+ // The DualPinotMeter itself fan-outs to two underlying handles which
are themselves
+ // dedup'd by their registries, so a duplicate wrapper is harmless if
it does happen.
+ return _dualMeterCache.computeIfAbsent(meter,
+ m -> new DualPinotMeter(super.getMeteredValue(m),
ServerMetrics.get().getMeteredValue(serverMeter)));
+ case SERVER:
+ default:
+ return serverMeter == null ? NOOP_PINOT_METER :
ServerMetrics.get().getMeteredValue(serverMeter);
+ }
+ }
+
+ @Override
+ public void addTimedValue(MseTimer timer, long duration, TimeUnit timeUnit) {
+ ServerTimer serverTimer = timer.getServerTimer();
+ if (_mode != MseMetricsMode.MSE && serverTimer != null) {
+ ServerMetrics.get().addTimedValue(serverTimer, duration, timeUnit);
+ }
+ if (_mode != MseMetricsMode.SERVER) {
+ super.addTimedValue(timer, duration, timeUnit);
+ }
+ }
+
+ /// Fan-out [PinotMeter] returned in [MseMetricsMode#DUAL] so callers that
cache a handle mark
+ /// both registries on every increment. Read-side methods delegate to the
primary (MSE) handle.
+ private static final class DualPinotMeter implements PinotMeter {
+ private final PinotMeter _primary;
+ private final PinotMeter _secondary;
+
+ private DualPinotMeter(PinotMeter primary, PinotMeter secondary) {
+ _primary = primary;
+ _secondary = secondary;
+ }
+
+ @Override
+ public void mark() {
+ _primary.mark();
+ _secondary.mark();
+ }
+
+ @Override
+ public void mark(long unitCount) {
+ _primary.mark(unitCount);
+ _secondary.mark(unitCount);
+ }
+
+ @Override
+ public long count() {
+ return _primary.count();
+ }
+
+ @Override
+ public Object getMetered() {
+ return _primary.getMetered();
+ }
+
+ @Override
+ public TimeUnit rateUnit() {
+ return _primary.rateUnit();
+ }
+
+ @Override
+ public String eventType() {
+ return _primary.eventType();
+ }
+
+ @Override
+ public double fifteenMinuteRate() {
+ return _primary.fifteenMinuteRate();
+ }
+
+ @Override
+ public double fiveMinuteRate() {
+ return _primary.fiveMinuteRate();
+ }
+
+ @Override
+ public double meanRate() {
+ return _primary.meanRate();
+ }
+
+ @Override
+ public double oneMinuteRate() {
+ return _primary.oneMinuteRate();
+ }
+
+ @Override
+ public Object getMetric() {
+ return _primary.getMetric();
+ }
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetricsMode.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetricsMode.java
new file mode 100644
index 00000000000..ddfc3a1439d
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseMetricsMode.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+/// Selects how [MseMetrics] emits multi-stage engine metrics.
+///
+/// - [#SERVER] (default): forward to [ServerMetrics] only (existing
`pinot.server.*` series).
+/// - [#MSE]: emit to a dedicated `pinot.mse.*` registry only.
+/// - [#DUAL]: emit to both, for dashboard migration windows.
+///
+/// Read at startup from cluster config; mode changes require restart.
+///
+/// **Migration path:** SERVER is the default to preserve `pinot.server.*`
dashboards. Operators
+/// migrating to the `pinot.mse.*` surface should flip to DUAL for an overlap
window, point
+/// dashboards/alerts at `pinot.mse.*`, then flip to MSE. SERVER mode (and the
+/// [MseMeter#getServerMeter()] / [MseTimer#getServerTimer()] forwarding
links) is the
+/// backward-compat shim and can be removed once the legacy series has no
remaining consumers.
+public enum MseMetricsMode {
+ SERVER,
+ MSE,
+ DUAL
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseTimer.java
new file mode 100644
index 00000000000..33e5c3305f9
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MseTimer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.Utils;
+
+
+/// Timers for the multi-stage engine, emitted via [MseMetrics] as
`pinot.mse.*` when the cluster
+/// is configured for [MseMetricsMode#MSE] or [MseMetricsMode#DUAL].
+///
+/// Each entry optionally carries a [ServerTimer] counterpart. When present,
+/// [MseMetricsMode#SERVER] and [MseMetricsMode#DUAL] forward emissions to the
existing
+/// `pinot.server.*` series. Entries with no counterpart (MSE-native timers
added after the
+/// migration) are emitted only under MSE / DUAL modes and silently dropped in
SERVER mode.
+public enum MseTimer implements AbstractMetrics.Timer {
+ HASH_JOIN_BUILD_TABLE_CPU_TIME_MS(true,
ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS),
+ SERIALIZATION_CPU_TIME_MS(true,
ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS),
+ DESERIALIZATION_CPU_TIME_MS(true,
ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS),
+ RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS(true,
ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS),
+ RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS(true,
ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS);
+
+ private final String _timerName;
+ private final boolean _global;
+ @Nullable
+ private final ServerTimer _serverTimer;
+
+ MseTimer(boolean global) {
+ this(global, null);
+ }
+
+ MseTimer(boolean global, @Nullable ServerTimer serverTimer) {
+ _global = global;
+ _serverTimer = serverTimer;
+ _timerName = Utils.toCamelCase(name().toLowerCase());
+ }
+
+ @Override
+ public String getTimerName() {
+ return _timerName;
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return _global;
+ }
+
+ /// Existing [ServerTimer] this entry forwards to in SERVER / DUAL mode, or
`null` for
+ /// MSE-native timers with no legacy `pinot.server.*` series.
+ @Nullable
+ public ServerTimer getServerTimer() {
+ return _serverTimer;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MseMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MseMetricsTest.java
new file mode 100644
index 00000000000..efebe5c4a4f
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MseMetricsTest.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.plugin.metrics.fake.FakeMetricsFactory;
+import org.apache.pinot.plugin.metrics.fake.FakeMetricsInspector;
+import org.apache.pinot.plugin.metrics.fake.FakePinotMetricsRegistry;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetricName;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Mode-aware behavior tests for {@link MseMetrics}. SERVER mode must forward
to
+ * {@link ServerMetrics} only; MSE mode emits {@code pinot.mse.*} only; DUAL
hits both.
+ */
+public class MseMetricsTest {
+
+ @BeforeClass
+ public void initMetricsFactory() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(CONFIG_OF_METRICS_FACTORY_CLASS_NAME,
FakeMetricsFactory.class.getName());
+ PinotMetricUtils.init(config);
+ }
+
+ @AfterClass
+ public void cleanUpMetricsFactory() {
+ PinotMetricUtils.cleanUp();
+ }
+
+ @AfterMethod
+ public void deregisterSingletons() {
+ MseMetrics.deregister();
+ ServerMetrics.deregister();
+ }
+
+ @Test
+ public void serverModeForwardsToServerMetricsOnly() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ FakeMetricsInspector serverInspector = new
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ mseMetrics.addMeteredGlobalValue(MseMeter.QUERIES, 7L);
+
+ PinotMetricName serverName = serverInspector.lastMetric();
+ assertNotNull(serverName, "SERVER mode should register a meter under
ServerMetrics");
+ assertEquals(serverInspector.getMeteredCount(serverName), 7L);
+ assertTrue(mseRegistry.allMetrics().isEmpty(), "SERVER mode must not emit
to the pinot.mse.* registry");
+ }
+
+ @Test
+ public void mseModeEmitsToOwnRegistryOnly() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.MSE, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_STARTED, 3L);
+
+ PinotMetricName mseName = mseInspector.lastMetric();
+ assertNotNull(mseName, "MSE mode should register a meter under
MseMetrics");
+ assertEquals(mseInspector.getMeteredCount(mseName), 3L);
+ assertTrue(serverMetrics.getMetricsRegistry().allMetrics().isEmpty(),
+ "MSE mode must not touch the ServerMetrics registry");
+ }
+
+ @Test
+ public void dualModeEmitsToBothRegistries() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ FakeMetricsInspector serverInspector = new
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_COMPLETED, 5L);
+
+ assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 5L);
+
assertEquals(serverInspector.getMeteredCount(serverInspector.lastMetric()), 5L);
+ }
+
+ @Test
+ public void dualModeReusedMeterMarksBothSides() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ FakeMetricsInspector serverInspector = new
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ // Mimic the MetricsExecutor caching pattern: resolve once, then mark()
repeatedly.
+ PinotMeter cached = mseMetrics.getMeteredValue(MseMeter.EMITTED_ROWS);
+ cached.mark(11L);
+ cached.mark(4L);
+
+ assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 15L);
+
assertEquals(serverInspector.getMeteredCount(serverInspector.lastMetric()),
15L);
+ }
+
+ @Test
+ public void addTimedValueForwardsByMode() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ FakeMetricsInspector serverInspector = new
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ mseMetrics.addTimedValue(MseTimer.SERIALIZATION_CPU_TIME_MS, 250,
TimeUnit.MILLISECONDS);
+
+ assertEquals(mseInspector.getTimerSumMs(mseInspector.lastMetric()), 250);
+ assertEquals(serverInspector.getTimerSumMs(serverInspector.lastMetric()),
250);
+ }
+
+ @Test
+ public void getMeteredValueReturnsServerHandleInServerMode() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, new
FakePinotMetricsRegistry());
+ MseMetrics.register(mseMetrics);
+
+ PinotMeter direct =
serverMetrics.getMeteredValue(MseMeter.QUERIES.getServerMeter());
+ PinotMeter throughShim = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+ assertSame(throughShim, direct, "SERVER mode should hand back the same
ServerMetrics handle");
+ }
+
+ @Test
+ public void noopFallbackIsServerMode() {
+ // Default (no register call): must behave like the old
direct-ServerMetrics call sites.
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ FakeMetricsInspector serverInspector = new
FakeMetricsInspector(serverMetrics.getMetricsRegistry());
+
+ MseMetrics.get().addMeteredGlobalValue(MseMeter.RUNNER_STARTED_TASKS, 1L);
+
+
assertEquals(serverInspector.getMeteredCount(serverInspector.lastMetric()), 1L);
+ }
+
+ @Test
+ public void registerFromConfigParsesModeFromConfig() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty("pinot.metrics.mse.mode", "DUAL");
+
+ MseMetrics.registerFromConfig(config, new FakePinotMetricsRegistry());
+
+ assertEquals(MseMetrics.get().getMode(), MseMetricsMode.DUAL);
+ }
+
+ @Test
+ public void registerFromConfigDefaultsToServerMode() {
+ MseMetrics.registerFromConfig(new PinotConfiguration(), new
FakePinotMetricsRegistry());
+
+ assertEquals(MseMetrics.get().getMode(), MseMetricsMode.SERVER);
+ }
+
+ @Test
+ public void registerFromConfigFallsBackOnInvalidValue() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty("pinot.metrics.mse.mode", "NOT_A_MODE");
+
+ MseMetrics.registerFromConfig(config, new FakePinotMetricsRegistry());
+
+ assertEquals(MseMetrics.get().getMode(), MseMetricsMode.SERVER);
+ }
+
+ @Test
+ public void serverModeWithoutRegisteredServerMetricsSilentlyDropsEmissions()
{
+ // ServerMetrics.register() deliberately not called — the resolved handle
is the NOOP singleton.
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ // Must not throw — SERVER mode resolves through ServerMetrics.get();
documented behavior is
+ // that emissions are silently dropped when the underlying singleton is
the noop instance.
+ mseMetrics.addMeteredGlobalValue(MseMeter.QUERIES, 1L);
+ mseMetrics.addTimedValue(MseTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS, 5L,
TimeUnit.MILLISECONDS);
+
+ assertTrue(mseRegistry.allMetrics().isEmpty(),
+ "SERVER mode must not register any pinot.mse.* meters even when
forwarding is a noop");
+ }
+
+ @Test
+ public void mseModeStillEmitsWhenServerMetricsIsNoop() {
+ // ServerMetrics is noop; MSE mode emits to the local registry instead of
forwarding, so
+ // engine emissions surface even without a registered ServerMetrics.
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.MSE, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_STARTED, 2L);
+
+ assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 2L);
+ }
+
+ @Test
+ public void dualModeWrapperIsCachedAcrossLookups() {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, new
FakePinotMetricsRegistry());
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ PinotMeter first = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+ PinotMeter second = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+ PinotMeter other = mseMetrics.getMeteredValue(MseMeter.OPCHAINS_STARTED);
+
+ assertSame(second, first, "DUAL mode must return the cached wrapper for
repeated lookups");
+ assertNotNull(other);
+ assertTrue(first != other, "Distinct meters must get distinct wrappers");
+ }
+
+ @Test
+ public void serverModeMeterWithoutCounterpartReturnsNoop()
+ throws Exception {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, new
FakePinotMetricsRegistry());
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ // Simulate an MSE-native meter (no ServerMeter counterpart added in a
future release) by
+ // nulling the field on an existing entry for the duration of the test.
+ withNullServerCounterpart(MseMeter.QUERIES, () -> {
+ PinotMeter handle = mseMetrics.getMeteredValue(MseMeter.QUERIES);
+ assertNotNull(handle, "SERVER mode without a counterpart must still
return a non-null handle");
+ handle.mark(7);
+ assertEquals(handle.count(), 0L, "Returned handle must be the noop meter
— SERVER mode has no series to mark");
+ });
+ }
+
+ @Test
+ public void mseModeMeterWithoutCounterpartStillEmits()
+ throws Exception {
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.MSE, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ withNullServerCounterpart(MseMeter.OPCHAINS_STARTED, () -> {
+ mseMetrics.addMeteredGlobalValue(MseMeter.OPCHAINS_STARTED, 9L);
+ });
+
+ assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 9L);
+ }
+
+ @Test
+ public void dualModeMeterWithoutCounterpartOnlyEmitsToMseSide()
+ throws Exception {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ PinotMetricsRegistry mseRegistry = new FakePinotMetricsRegistry();
+ FakeMetricsInspector mseInspector = new FakeMetricsInspector(mseRegistry);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.DUAL, mseRegistry);
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ withNullServerCounterpart(MseMeter.EMITTED_ROWS, () -> {
+ mseMetrics.addMeteredGlobalValue(MseMeter.EMITTED_ROWS, 4L);
+ });
+
+ assertEquals(mseInspector.getMeteredCount(mseInspector.lastMetric()), 4L);
+ assertTrue(serverMetrics.getMetricsRegistry().allMetrics().isEmpty(),
+ "DUAL mode must not register any pinot.server.* meter when there is no
counterpart");
+ }
+
+ @Test
+ public void serverModeTimerWithoutCounterpartIsDropped()
+ throws Exception {
+ ServerMetrics serverMetrics = new ServerMetrics(new
FakePinotMetricsRegistry());
+ ServerMetrics.register(serverMetrics);
+ MseMetrics mseMetrics = new MseMetrics(MseMetricsMode.SERVER, new
FakePinotMetricsRegistry());
+ assertTrue(MseMetrics.register(mseMetrics));
+
+ withNullServerTimerCounterpart(MseTimer.SERIALIZATION_CPU_TIME_MS, () -> {
+ mseMetrics.addTimedValue(MseTimer.SERIALIZATION_CPU_TIME_MS, 250,
TimeUnit.MILLISECONDS);
+ });
+
+ assertTrue(serverMetrics.getMetricsRegistry().allMetrics().isEmpty(),
+ "SERVER mode without a timer counterpart must not register any
pinot.server.* timer");
+ }
+
+ @Test
+ public void registerIsCompareAndSetAgainstNoop() {
+ MseMetrics first = new MseMetrics(MseMetricsMode.MSE, new
FakePinotMetricsRegistry());
+ assertTrue(MseMetrics.register(first));
+ assertSame(MseMetrics.get(), first);
+
+ MseMetrics second = new MseMetrics(MseMetricsMode.DUAL, new
FakePinotMetricsRegistry());
+ assertFalse(MseMetrics.register(second), "Second register must fail; CAS
guards against accidental swap");
+ assertSame(MseMetrics.get(), first);
+ }
+
+ /// Reflection helper: temporarily null out `MseMeter._serverMeter` to
simulate an MSE-native
+ /// entry, run the test body, then restore. Used to exercise the
no-counterpart branches
+ /// without adding a real null-counterpart entry to the public enum.
+ private static void withNullServerCounterpart(MseMeter meter, Runnable body)
+ throws Exception {
+ Field field = MseMeter.class.getDeclaredField("_serverMeter");
+ field.setAccessible(true);
+ Object original = field.get(meter);
+ field.set(meter, null);
+ try {
+ body.run();
+ } finally {
+ field.set(meter, original);
+ }
+ }
+
+ private static void withNullServerTimerCounterpart(MseTimer timer, Runnable
body)
+ throws Exception {
+ Field field = MseTimer.class.getDeclaredField("_serverTimer");
+ field.setAccessible(true);
+ Object original = field.get(timer);
+ field.set(timer, null);
+ try {
+ body.run();
+ } finally {
+ field.set(timer, original);
+ }
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
index a6ce99fd0cf..017c28d2fe6 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/BrokerPrometheusMetricsTest.java
@@ -25,6 +25,10 @@ import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
+import org.apache.pinot.common.metrics.MseMetricsMode;
+import org.apache.pinot.common.metrics.MseTimer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -52,12 +56,22 @@ public abstract class BrokerPrometheusMetricsTest extends
PinotPrometheusMetrics
private static final List<BrokerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
List.of(BrokerGauge.REQUEST_SIZE);
+ // pinot.mse.* metrics share the role-agnostic prefix and must be exported
from every JVM role
+ // that registers MseMetrics; on broker JVMs this exercises the broker.yml
catch-all rule.
+ private static final String EXPORTED_MSE_METRIC_PREFIX = "pinot_mse_";
+
private BrokerMetrics _brokerMetrics;
+ private MseMetrics _mseMetrics;
+
@BeforeClass
public void setup()
throws Exception {
_brokerMetrics = new
BrokerMetrics(_pinotMetricsFactory.getPinotMetricsRegistry());
+ // MSE mode so emissions land in this JVM's PinotMetricsRegistry (the one
the JMX exporter is
+ // scraping). MseMetrics is constructed with the shared registry,
mirroring how broker JVMs
+ // running in MSE/DUAL mode emit pinot.mse.* beans alongside
pinot.broker.*.
+ _mseMetrics = new MseMetrics(MseMetricsMode.MSE,
_pinotMetricsFactory.getPinotMetricsRegistry());
}
@Test(dataProvider = "brokerTimers")
@@ -115,6 +129,18 @@ public abstract class BrokerPrometheusMetricsTest extends
PinotPrometheusMetrics
}
}
+ @Test(dataProvider = "mseMeters")
+ public void mseMeterExportedFromBrokerJmx(MseMeter meter) {
+ _mseMetrics.addMeteredGlobalValue(meter, 1L);
+ assertMeterExportedCorrectly(meter.getMeterName(),
EXPORTED_MSE_METRIC_PREFIX);
+ }
+
+ @Test(dataProvider = "mseTimers")
+ public void mseTimerExportedFromBrokerJmx(MseTimer timer) {
+ _mseMetrics.addTimedValue(timer, 30_000, TimeUnit.MILLISECONDS);
+ assertTimerExportedCorrectly(timer.getTimerName(),
EXPORTED_MSE_METRIC_PREFIX);
+ }
+
@DataProvider(name = "brokerTimers")
public Object[] brokerTimers() {
return BrokerTimer.values();
@@ -129,4 +155,14 @@ public abstract class BrokerPrometheusMetricsTest extends
PinotPrometheusMetrics
public Object[] brokerGauges() {
return BrokerGauge.values();
}
+
+ @DataProvider(name = "mseMeters")
+ public Object[] mseMeters() {
+ return MseMeter.values();
+ }
+
+ @DataProvider(name = "mseTimers")
+ public Object[] mseTimers() {
+ return MseTimer.values();
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
index c7ad6eff399..c92f762fbeb 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PrometheusTemplateRegexpTest.java
@@ -134,19 +134,31 @@ public class PrometheusTemplateRegexpTest {
}
/**
- * broker.yml: global gauge/meter/timer (no table scope).
- * e.g. pinot.broker.totalDocuments
+ * broker.yml: global gauge/meter/timer (no table scope). The catch-all is
group-flexible at the
+ * prefix so non-broker MBean groups registered in the broker JVM (e.g.
pinot.mse.*) are also
+ * exported with this rule.
+ * e.g. pinot.broker.totalDocuments, pinot.mse.queries
*/
@Test
public void testBrokerGlobalMeterPattern()
throws Exception {
- String pattern = loadPatternByName("broker.yml", "pinot_broker_$1_$2");
- Matcher m = Pattern.compile(pattern).matcher(
+ String pattern = loadPatternByName("broker.yml", "pinot_$1_$2_$3");
+ Pattern compiled = Pattern.compile(pattern);
+
+ Matcher brokerMatch = compiled.matcher(
"\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", "
+ "name=\"pinot.broker.totalDocuments\"><>Value");
- Assert.assertTrue(m.matches(), "Pattern should match global broker gauge");
- Assert.assertEquals(m.group(1), "totalDocuments");
- Assert.assertEquals(m.group(2), "Value");
+ Assert.assertTrue(brokerMatch.matches(), "Pattern should match global
broker gauge");
+ Assert.assertEquals(brokerMatch.group(1), "broker");
+ Assert.assertEquals(brokerMatch.group(2), "totalDocuments");
+ Assert.assertEquals(brokerMatch.group(3), "Value");
+
+ Matcher mseMatch = compiled.matcher(
+ "\"org.apache.pinot.common.metrics\"<type=\"MseMetrics\",
name=\"pinot.mse.queries\"><>Count");
+ Assert.assertTrue(mseMatch.matches(), "Pattern should also match
pinot.mse.* mbeans on broker JVMs");
+ Assert.assertEquals(mseMatch.group(1), "mse");
+ Assert.assertEquals(mseMatch.group(2), "queries");
+ Assert.assertEquals(mseMatch.group(3), "Count");
}
// ---- Server patterns ----
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 79fb88f56ed..274a415c55a 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -20,6 +20,10 @@ package org.apache.pinot.common.metrics.prometheus;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
+import org.apache.pinot.common.metrics.MseMetricsMode;
+import org.apache.pinot.common.metrics.MseTimer;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -62,12 +66,22 @@ public abstract class ServerPrometheusMetricsTest extends
PinotPrometheusMetrics
List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED,
ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
ServerGauge.LUCENE_INDEXING_DELAY_MS,
ServerGauge.LUCENE_INDEXING_DELAY_DOCS);
+ // pinot.mse.* metrics share the role-agnostic prefix and must be exported
from every JVM role
+ // that registers MseMetrics; on server JVMs this exercises the server.yml
catch-all rule.
+ private static final String EXPORTED_MSE_METRIC_PREFIX = "pinot_mse_";
+
private ServerMetrics _serverMetrics;
+ private MseMetrics _mseMetrics;
+
@BeforeClass
public void setup()
throws Exception {
_serverMetrics = new
ServerMetrics(_pinotMetricsFactory.getPinotMetricsRegistry());
+ // MSE mode so emissions land in this JVM's PinotMetricsRegistry (the one
the JMX exporter is
+ // scraping). MseMetrics is constructed with the shared registry,
mirroring how server JVMs
+ // running in MSE/DUAL mode emit pinot.mse.* beans alongside
pinot.server.*.
+ _mseMetrics = new MseMetrics(MseMetricsMode.MSE,
_pinotMetricsFactory.getPinotMetricsRegistry());
}
@Test(dataProvider = "serverTimers")
@@ -149,6 +163,18 @@ public abstract class ServerPrometheusMetricsTest extends
PinotPrometheusMetrics
_serverMetrics.addMeteredTableValue(labels, serverMeter, 4L);
}
+ @Test(dataProvider = "mseMeters")
+ public void mseMeterExportedFromServerJmx(MseMeter meter) {
+ _mseMetrics.addMeteredGlobalValue(meter, 1L);
+ assertMeterExportedCorrectly(meter.getMeterName(),
EXPORTED_MSE_METRIC_PREFIX);
+ }
+
+ @Test(dataProvider = "mseTimers")
+ public void mseTimerExportedFromServerJmx(MseTimer timer) {
+ _mseMetrics.addTimedValue(timer, 30_000, TimeUnit.MILLISECONDS);
+ assertTimerExportedCorrectly(timer.getTimerName(),
EXPORTED_MSE_METRIC_PREFIX);
+ }
+
@DataProvider(name = "serverTimers")
public Object[] serverTimers() {
return ServerTimer.values(); // Provide all values of ServerTimer enum
@@ -164,6 +190,16 @@ public abstract class ServerPrometheusMetricsTest extends
PinotPrometheusMetrics
return ServerGauge.values(); // Provide all values of ServerTimer enum
}
+ @DataProvider(name = "mseMeters")
+ public Object[] mseMeters() {
+ return MseMeter.values();
+ }
+
+ @DataProvider(name = "mseTimers")
+ public Object[] mseTimers() {
+ return MseTimer.values();
+ }
+
private boolean meterTrackingRealtimeExceptions(ServerMeter serverMeter) {
return serverMeter == ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS
|| serverMeter == ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS
diff --git
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
index d74c7f63d81..619d9b7bbba 100644
---
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
+++
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardBrokerPrometheusMetricsTest.java
@@ -22,6 +22,8 @@ package org.apache.pinot.plugin.metrics.dropwizard.prometheus;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseTimer;
import org.apache.pinot.common.metrics.prometheus.BrokerPrometheusMetricsTest;
import org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory;
import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
@@ -58,4 +60,14 @@ public class DropwizardBrokerPrometheusMetricsTest extends
BrokerPrometheusMetri
public void gaugeTest(BrokerGauge gauge) {
super.gaugeTest(gauge);
}
+
+ @Test(dataProvider = "mseMeters", enabled = false)
+ public void mseMeterExportedFromBrokerJmx(MseMeter meter) {
+ super.mseMeterExportedFromBrokerJmx(meter);
+ }
+
+ @Test(dataProvider = "mseTimers", enabled = false)
+ public void mseTimerExportedFromBrokerJmx(MseTimer timer) {
+ super.mseTimerExportedFromBrokerJmx(timer);
+ }
}
diff --git
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
index ac6dfc60252..13be096fc28 100644
---
a/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
+++
b/pinot-plugins/pinot-metrics/pinot-dropwizard/src/test/java/org/apache/pinot/plugin/metrics/dropwizard/prometheus/DropwizardServerPrometheusMetricsTest.java
@@ -19,6 +19,8 @@
package org.apache.pinot.plugin.metrics.dropwizard.prometheus;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseTimer;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerTimer;
@@ -59,4 +61,14 @@ public class DropwizardServerPrometheusMetricsTest extends
ServerPrometheusMetri
public void gaugeTest(ServerGauge serverGauge) {
super.gaugeTest(serverGauge);
}
+
+ @Test(dataProvider = "mseMeters", enabled = false)
+ public void mseMeterExportedFromServerJmx(MseMeter meter) {
+ super.mseMeterExportedFromServerJmx(meter);
+ }
+
+ @Test(dataProvider = "mseTimers", enabled = false)
+ public void mseTimerExportedFromServerJmx(MseTimer timer) {
+ super.mseTimerExportedFromServerJmx(timer);
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 53398023be7..37cf8f8c1a4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -37,7 +37,8 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.TlsConfig;
-import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -193,9 +194,10 @@ public class QueryRunner {
Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);
ServerMetrics serverMetrics = ServerMetrics.get();
+ MseMetrics mseMetrics = MseMetrics.get();
MetricsExecutor metricsExecutor = new MetricsExecutor(baseExecutorService,
-
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_RUNNER_STARTED_TASKS),
-
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_RUNNER_COMPLETED_TASKS));
+ mseMetrics.getMeteredValue(MseMeter.RUNNER_STARTED_TASKS),
+ mseMetrics.getMeteredValue(MseMeter.RUNNER_COMPLETED_TASKS));
_executorService =
QueryThreadContext.contextAwareExecutorService(metricsExecutor);
int hardLimit =
HardLimitExecutor.getMultiStageExecutorHardLimit(serverConf);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 599d052cf24..e946ae4c467 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -35,7 +35,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.StatMap;
-import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
@@ -275,11 +276,12 @@ public class OpChainSchedulerService {
}
private static class Metrics {
- private final PinotMeter _startedOpchains =
ServerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter();
- private final PinotMeter _competedOpchains =
ServerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter();
- private final PinotMeter _emittedRows =
ServerMeter.MSE_EMITTED_ROWS.getGlobalMeter();
- private final PinotMeter _cpuExecutionTimeMs =
ServerMeter.MSE_CPU_EXECUTION_TIME_MS.getGlobalMeter();
- private final PinotMeter _memoryAllocatedBytes =
ServerMeter.MSE_MEMORY_ALLOCATED_BYTES.getGlobalMeter();
+ private final MseMetrics _mseMetrics = MseMetrics.get();
+ private final PinotMeter _startedOpchains =
_mseMetrics.getMeteredValue(MseMeter.OPCHAINS_STARTED);
+ private final PinotMeter _competedOpchains =
_mseMetrics.getMeteredValue(MseMeter.OPCHAINS_COMPLETED);
+ private final PinotMeter _emittedRows =
_mseMetrics.getMeteredValue(MseMeter.EMITTED_ROWS);
+ private final PinotMeter _cpuExecutionTimeMs =
_mseMetrics.getMeteredValue(MseMeter.CPU_EXECUTION_TIME_MS);
+ private final PinotMeter _memoryAllocatedBytes =
_mseMetrics.getMeteredValue(MseMeter.MEMORY_ALLOCATED_BYTES);
private static final String EMITTED_ROWS = "EMITTED_ROWS";
private static final String EXECUTION_TIME_MS = "EXECUTION_TIME_MS";
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 05bfe47dda6..8a481526d3b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,7 +28,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.StatMap;
-import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.MseMetrics;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
@@ -308,17 +308,17 @@ public class MailboxSendOperator extends
MultiStageOperator {
}
private void updateMetrics(MultiStageQueryStats queryStats) {
- ServerMetrics serverMetrics = ServerMetrics.get();
+ MseMetrics mseMetrics = MseMetrics.get();
if (queryStats == null) {
LOGGER.info("Query stats not found in the EOS block.");
} else {
for (MultiStageQueryStats.StageStats.Closed closed :
queryStats.getClosedStats()) {
if (closed != null) {
- closed.forEach((type, stats) -> type.updateServerMetrics(stats,
serverMetrics));
+ closed.forEach((type, stats) -> type.updateMseMetrics(stats,
mseMetrics));
}
}
queryStats.getCurrentStats().forEach((type, stats) -> {
- type.updateServerMetrics(stats, serverMetrics);
+ type.updateMseMetrics(stats, mseMetrics);
});
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index b33391d1284..2075169bf03 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -29,9 +29,9 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.StatMap;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
+import org.apache.pinot.common.metrics.MseTimer;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -259,7 +259,7 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
/// So far this keys do not need to be modified from here because they
are incremented in a per-worker basis:
/// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED
/// ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_WARNING_LIMIT_REACHED
- /// public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics);
+ /// public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics);
},
FILTER(1, FilterOperator.StatKey.class) {
@Override
@@ -280,15 +280,15 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
}
@Override
- public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
- super.updateServerMetrics(map, serverMetrics);
+ public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
+ super.updateMseMetrics(map, mseMetrics);
@SuppressWarnings("unchecked")
StatMap<HashJoinOperator.StatKey> stats =
(StatMap<HashJoinOperator.StatKey>) map;
boolean maxRowsInJoinReached =
stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED);
if (maxRowsInJoinReached) {
-
serverMetrics.addMeteredGlobalValue(ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED,
1);
+
mseMetrics.addMeteredGlobalValue(MseMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED, 1);
}
-
serverMetrics.addTimedValue(ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS,
+ mseMetrics.addTimedValue(MseTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS,
stats.getLong(HashJoinOperator.StatKey.TIME_BUILDING_HASH_TABLE_MS),
TimeUnit.MILLISECONDS);
}
},
@@ -329,23 +329,23 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
}
@Override
- public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
- super.updateServerMetrics(map, serverMetrics);
+ public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
+ super.updateMseMetrics(map, mseMetrics);
@SuppressWarnings("unchecked")
StatMap<BaseMailboxReceiveOperator.StatKey> stats =
(StatMap<BaseMailboxReceiveOperator.StatKey>) map;
-
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES,
+ mseMetrics.addMeteredGlobalValue(MseMeter.IN_MEMORY_MESSAGES,
stats.getInt(BaseMailboxReceiveOperator.StatKey.IN_MEMORY_MESSAGES));
-
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_MESSAGES,
+ mseMetrics.addMeteredGlobalValue(MseMeter.RAW_MESSAGES,
stats.getInt(BaseMailboxReceiveOperator.StatKey.RAW_MESSAGES));
- serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_BYTES,
+ mseMetrics.addMeteredGlobalValue(MseMeter.RAW_BYTES,
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZED_BYTES));
-
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS,
+ mseMetrics.addTimedValue(MseTimer.DESERIALIZATION_CPU_TIME_MS,
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZATION_TIME_MS),
TimeUnit.MILLISECONDS);
-
serverMetrics.addTimedValue(ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS,
+ mseMetrics.addTimedValue(MseTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS,
stats.getLong(BaseMailboxReceiveOperator.StatKey.DOWNSTREAM_WAIT_MS),
TimeUnit.MILLISECONDS);
-
serverMetrics.addTimedValue(ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS,
+ mseMetrics.addTimedValue(MseTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS,
stats.getLong(BaseMailboxReceiveOperator.StatKey.UPSTREAM_WAIT_MS),
TimeUnit.MILLISECONDS);
}
},
@@ -358,10 +358,10 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
}
@Override
- public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
@SuppressWarnings("unchecked")
StatMap<MailboxSendOperator.StatKey> stats =
(StatMap<MailboxSendOperator.StatKey>) map;
-
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS,
+ mseMetrics.addTimedValue(MseTimer.SERIALIZATION_CPU_TIME_MS,
stats.getLong(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS),
TimeUnit.MILLISECONDS);
}
},
@@ -417,11 +417,11 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
}
@Override
- public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
@SuppressWarnings("unchecked")
StatMap<WindowAggregateOperator.StatKey> stats =
(StatMap<WindowAggregateOperator.StatKey>) map;
if
(stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED)) {
-
serverMetrics.addMeteredGlobalValue(ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED,
1);
+
mseMetrics.addMeteredGlobalValue(MseMeter.WINDOW_TIMES_MAX_ROWS_REACHED, 1);
}
}
},
@@ -508,7 +508,7 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
*/
public abstract void mergeInto(BrokerResponseNativeV2 response, StatMap<?>
map);
- public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ public void updateMseMetrics(StatMap<?> map, MseMetrics mseMetrics) {
// Do nothing by default
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index de88051cfb5..8c018058389 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -39,8 +39,8 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.apache.pinot.common.config.TlsConfig;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.MseMeter;
+import org.apache.pinot.common.metrics.MseMetrics;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
@@ -163,10 +163,10 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
"query_submission_executor_on_" + _port + "_port",
CommonConstants.Server.DEFAULT_MULTISTAGE_SUBMISSION_EXEC_TYPE);
- ServerMetrics serverMetrics = ServerMetrics.get();
+ MseMetrics mseMetrics = MseMetrics.get();
_submissionExecutorService = new MetricsExecutor(baseExecutorService,
-
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_STARTED_TASKS),
-
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_COMPLETED_TASKS));
+ mseMetrics.getMeteredValue(MseMeter.SUBMISSION_STARTED_TASKS),
+ mseMetrics.getMeteredValue(MseMeter.SUBMISSION_COMPLETED_TASKS));
NamedThreadFactory explainThreadFactory = new
NamedThreadFactory("query_explain_on_" + _port + "_port");
_explainExecutorService =
Executors.newSingleThreadExecutor(explainThreadFactory);
@@ -260,7 +260,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
@Override
public void submit(Worker.QueryRequest request,
StreamObserver<Worker.QueryResponse> responseObserver) {
// Match the SSE QUERIES counter semantics by counting requests as soon as
they reach the handler.
- ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
+ MseMetrics.get().addMeteredGlobalValue(MseMeter.QUERIES, 1L);
Map<String, String> reqMetadata;
try {
reqMetadata =
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 59627f3ef25..46682cc4a4f 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -62,6 +62,7 @@ import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.DefaultClusterConfigChangeHandler;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metrics.MseMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -674,6 +675,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_serverMetrics.initializeGlobalMeters();
_serverMetrics.setValueOfGlobalGauge(ServerGauge.VERSION,
PinotVersion.VERSION_METRIC_NAME, 1);
ServerMetrics.register(_serverMetrics);
+ MseMetrics.registerFromConfig(_serverConf, metricsRegistry);
LOGGER.info("Initializing reload job status cache");
_reloadJobStatusCache = new ServerReloadJobStatusCache(_instanceId);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ecd52695a2e..6b4628c1ac1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -279,6 +279,14 @@ public class CommonConstants {
"pinot.beta.multistage.engine.max.server.query.threads.hardlimit.factor";
public static final String
DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR = "4";
+ /// Cluster-config knob that selects how the multi-stage engine emits
metrics.
+ /// Read at startup by server and broker; mode changes require a restart
to take effect.
+ /// Valid values: {@code SERVER} (default; forward to {@code
pinot.server.*}), {@code MSE}
+ /// (emit only {@code pinot.mse.*}), {@code DUAL} (emit both). See
+ /// {@code org.apache.pinot.common.metrics.MseMetricsMode}.
+ public static final String CONFIG_OF_MSE_METRICS_MODE =
"pinot.metrics.mse.mode";
+ public static final String DEFAULT_MSE_METRICS_MODE = "SERVER";
+
// Preprocess throttle configs
public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM =
"pinot.server.max.segment.preprocess.parallelism";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]