This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b6b3b6c1f1a [multistage] Fix stream-mode stats handling for
plugin-defined operator types (follow-up to #18458) (#18736)
b6b3b6c1f1a is described below
commit b6b3b6c1f1ae5e1d53a5fd0026205825d14316c9
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jun 17 10:42:41 2026 +0200
[multistage] Fix stream-mode stats handling for plugin-defined operator
types (follow-up to #18458) (#18736)
---
.../MultiStageBrokerRequestHandler.java | 9 +-
.../query/runtime/InStageStatsTreeBuilder.java | 14 +-
.../query/runtime/MultiStageStatsTreeBuilder.java | 161 +++++++++++++++
.../runtime/operator/OperatorTypeRegistry.java | 28 ++-
.../runtime/plan/OpChainExecutionContext.java | 9 +
.../query/service/dispatch/QueryDispatcher.java | 24 ++-
.../runtime/MultiStageStatsTreeBuilderTest.java | 215 +++++++++++++++++++++
7 files changed, 447 insertions(+), 13 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 33651babb7f..ee5f62a487b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -89,6 +89,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
@@ -815,7 +816,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(),
dispatchableSubPlan,
- queryResults.getStageCoverage());
+ queryResults.getStageCoverage(), queryResults.getStageStatsTrees());
if (QueryOptionsUtils.isStreamStats(query.getOptions(),
_streamStatsDefault)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MSE_STREAM_STATS_QUERIES, 1);
@@ -919,11 +920,13 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private void fillOldBrokerResponseStats(BrokerResponseNativeV2
brokerResponse,
List<MultiStageQueryStats.StageStats.Closed> queryStats,
DispatchableSubPlan dispatchableSubPlan,
- @Nullable List<QueryDispatcher.QueryResult.StageCoverage> stageCoverage)
{
+ @Nullable List<QueryDispatcher.QueryResult.StageCoverage> stageCoverage,
+ @Nullable Map<Integer, StageStatsTreeNode> stageStatsTrees) {
try {
Map<Integer, DispatchablePlanFragment> queryStageMap =
dispatchableSubPlan.getQueryStageMap();
- MultiStageStatsTreeBuilder treeBuilder = new
MultiStageStatsTreeBuilder(queryStageMap, queryStats);
+ MultiStageStatsTreeBuilder treeBuilder = new
MultiStageStatsTreeBuilder(queryStageMap, queryStats,
+ stageStatsTrees);
brokerResponse.setStageStats(treeBuilder.jsonStatsByStage(0));
for (MultiStageQueryStats.StageStats.Closed stageStats : queryStats) {
if (stageStats != null) { // for example pipeline breaker may not have
stats
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
index 3018ffefef4..cbe9a870722 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
@@ -304,10 +304,16 @@ public class InStageStatsTreeBuilder implements
PlanNodeVisitor<ObjectNode, InSt
@Override
public ObjectNode visitMailboxSend(MailboxSendNode node, Context context) {
- @SuppressWarnings("unchecked")
- StatMap<MailboxSendOperator.StatKey> operatorStats =
- (StatMap<MailboxSendOperator.StatKey>)
_stageStats.getOperatorStats(_index);
- long parallelism =
operatorStats.getLong(MailboxSendOperator.StatKey.PARALLELISM);
+ long parallelism = 1;
+ // Plugin-defined send operators (id >= 256) carry a different StatMap key
class; reading the built-in
+ // PARALLELISM key from them returns 0, which would make every division by
parallelism below throw. Only read
+ // it when the stats really are the built-in MAILBOX_SEND ones.
+ if (_stageStats.getOperatorType(_index) ==
MultiStageOperator.Type.MAILBOX_SEND) {
+ @SuppressWarnings("unchecked")
+ StatMap<MailboxSendOperator.StatKey> operatorStats =
+ (StatMap<MailboxSendOperator.StatKey>)
_stageStats.getOperatorStats(_index);
+ parallelism = Math.max(1,
operatorStats.getLong(MailboxSendOperator.StatKey.PARALLELISM));
+ }
Context myContext = new Context((int) parallelism);
return recursiveCase(node, MultiStageOperator.Type.MAILBOX_SEND,
myContext);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
index 98016e91663..a19c81a7723 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
@@ -18,32 +18,65 @@
*/
package org.apache.pinot.query.runtime;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
import org.apache.pinot.spi.utils.JsonUtils;
+/**
+ * Builds the {@code stageStats} JSON tree of the broker response.
+ *
+ * <p>Two rendering modes per stage:
+ * <ul>
+ * <li><b>Explicit tree</b> (stream-mode stats): when a {@link
StageStatsTreeNode} is available for the stage, the
+ * JSON is rendered directly from it. This is the only mode that can
faithfully render plugin-defined operator
+ * types (ids >= 256), whose shape cannot be re-derived by pairing the
flat stats list against the stage's
+ * {@link PlanNode} tree.</li>
+ * <li><b>Plan-node pairing</b> (legacy): the flat per-stage stats list is
paired positionally against the stage's
+ * PlanNode tree via {@link InStageStatsTreeBuilder}.</li>
+ * </ul>
+ */
public class MultiStageStatsTreeBuilder {
private final Map<Integer, PlanNode> _planNodes;
private final List<? extends MultiStageQueryStats.StageStats> _queryStats;
private final Map<Integer, DispatchablePlanFragment> _planFragments;
+ @Nullable
+ private final Map<Integer, StageStatsTreeNode> _stageStatsTrees;
public MultiStageStatsTreeBuilder(Map<Integer, DispatchablePlanFragment>
planFragments,
List<? extends MultiStageQueryStats.StageStats> queryStats) {
+ this(planFragments, queryStats, null);
+ }
+
+ public MultiStageStatsTreeBuilder(Map<Integer, DispatchablePlanFragment>
planFragments,
+ List<? extends MultiStageQueryStats.StageStats> queryStats,
+ @Nullable Map<Integer, StageStatsTreeNode> stageStatsTrees) {
_planFragments = planFragments;
_planNodes = Maps.newHashMapWithExpectedSize(planFragments.size());
for (Map.Entry<Integer, DispatchablePlanFragment> entry :
planFragments.entrySet()) {
_planNodes.put(entry.getKey(),
entry.getValue().getPlanFragment().getFragmentRoot());
}
_queryStats = queryStats;
+ _stageStatsTrees = stageStatsTrees;
}
public ObjectNode jsonStatsByStage(int stage) {
+ StageStatsTreeNode statsTree = _stageStatsTrees == null ? null :
_stageStatsTrees.get(stage);
+ if (statsTree != null) {
+ return jsonFromStatsTree(statsTree, stage);
+ }
PlanNode planNode = _planNodes.get(stage);
MultiStageQueryStats.StageStats stageStats = stage < _queryStats.size() ?
_queryStats.get(stage) : null;
@@ -63,4 +96,132 @@ public class MultiStageStatsTreeBuilder {
InStageStatsTreeBuilder treeBuilder = new
InStageStatsTreeBuilder(stageStats, this::jsonStatsByStage);
return planNode.visit(treeBuilder, new InStageStatsTreeBuilder.Context(1));
}
+
+ /**
+ * Renders a stage's explicit stats tree. The shape (and any plugin operator
types) come straight from the decoded
+ * tree; the stage's PlanNode tree is consulted only to resolve which plan
nodes are {@link MailboxReceiveNode}s so
+ * the sender stage's tree can be nested under them, mirroring the legacy
renderer's cross-stage nesting.
+ */
+ private ObjectNode jsonFromStatsTree(StageStatsTreeNode statsTree, int
stage) {
+ Map<Integer, PlanNode> planNodesById = new HashMap<>();
+ PlanNode fragmentRoot = _planNodes.get(stage);
+ if (fragmentRoot != null) {
+ assignPlanNodeIds(fragmentRoot, planNodesById, new int[]{0});
+ }
+ // The stage root is the send operator: its parallelism (when reported,
e.g. by the built-in MAILBOX_SEND stats)
+ // scales cpu-time into wall-clock time for the whole stage. Plugin send
types may not report it; default to 1.
+ int parallelism = 1;
+ JsonNode rootParallelism =
statsTree.getStatMap().asJson().get("parallelism");
+ if (rootParallelism != null) {
+ parallelism = Math.max(1, rootParallelism.asInt(1));
+ }
+ return jsonFromStatsTreeNode(statsTree, planNodesById, parallelism);
+ }
+
+ private ObjectNode jsonFromStatsTreeNode(StageStatsTreeNode node,
Map<Integer, PlanNode> planNodesById,
+ int parallelism) {
+ ObjectNode json = JsonUtils.newObjectNode();
+ json.put("type", node.getType().name());
+ for (Map.Entry<String, JsonNode> entry :
node.getStatMap().asJson().properties()) {
+ json.set(entry.getKey(), entry.getValue());
+ }
+ if (json.get("parallelism") == null) {
+ json.put("parallelism", parallelism);
+ }
+ JsonNode executionTimeMs = json.get("executionTimeMs");
+ if (executionTimeMs != null) {
+ json.put("clockTimeMs", executionTimeMs.asLong(0) / parallelism);
+ }
+
+ if (!node.getPlanNodeIds().isEmpty()) {
+ ArrayNode planNodeIds = JsonUtils.newArrayNode();
+ node.getPlanNodeIds().forEach(planNodeIds::add);
+ json.set("planNodeIds", planNodeIds);
+ }
+
+ ArrayNode children = JsonUtils.newArrayNode();
+ for (StageStatsTreeNode child : node.getChildren()) {
+ // Collapse PIPELINE_BREAKER nodes for parity with the legacy renderer,
which nests the
+ // breaker's receive operators directly under the LEAF and never renders
the breaker itself.
+ // The JSON shape is consumed by external tooling, so the explicit-tree
mode must not change it.
+ if (child.getType().getId() ==
MultiStageOperator.Type.PIPELINE_BREAKER.getId()) {
+ for (StageStatsTreeNode grandChild : child.getChildren()) {
+ children.add(jsonFromStatsTreeNode(grandChild, planNodesById,
parallelism));
+ }
+ } else {
+ children.add(jsonFromStatsTreeNode(child, planNodesById, parallelism));
+ }
+ }
+ // Cross-stage nesting: a node whose plan node is a mailbox receive gets
the sender stage's tree as a child.
+ // Type-agnostic on purpose — plugin-defined receive operators carry a
different descriptor than the built-in
+ // MAILBOX_RECEIVE but map to the same MailboxReceiveNode.
+ for (Integer planNodeId : node.getPlanNodeIds()) {
+ PlanNode planNode = planNodesById.get(planNodeId);
+ if (planNode instanceof MailboxReceiveNode) {
+ children.add(jsonStatsByStage(((MailboxReceiveNode)
planNode).getSenderStageId()));
+ }
+ }
+ if (!children.isEmpty()) {
+ json.set("children", children);
+ }
+
+ // self* fields: parent-minus-children derived stats, matching
InStageStatsTreeBuilder semantics.
+ // These must be omitted when zero to keep parity with the legacy renderer.
+ JsonNode execNode = json.get("executionTimeMs");
+ if (execNode != null) {
+ long selfExecTimeMs = execNode.asLong(0) - sumChildrenStat(node,
"executionTimeMs");
+ if (selfExecTimeMs != 0) {
+ json.put("selfExecutionTimeMs", selfExecTimeMs);
+ json.put("selfClockTimeMs", selfExecTimeMs / parallelism);
+ }
+ }
+ JsonNode allocNode = json.get("allocatedMemoryBytes");
+ if (allocNode != null) {
+ long selfAllocBytes = allocNode.asLong(0) - sumChildrenStat(node,
"allocatedMemoryBytes");
+ if (selfAllocBytes != 0) {
+ json.put("selfAllocatedMB", selfAllocBytes / (1024 * 1024));
+ }
+ }
+ JsonNode gcNode = json.get("gcTimeMs");
+ if (gcNode != null) {
+ long selfGcTimeMs = gcNode.asLong(0) - sumChildrenStat(node, "gcTimeMs");
+ if (selfGcTimeMs != 0) {
+ json.put("selfGcTimeMs", selfGcTimeMs);
+ }
+ }
+
+ return json;
+ }
+
+ /**
+ * Sums a named stat field across the direct non-pipeline-breaker children
of a node.
+ *
+ * <p>PIPELINE_BREAKER children are skipped entirely, mirroring {@link
InStageStatsTreeBuilder}'s
+ * {@code adjustWithChildren=false} path for LEAF nodes: the breaker runs
pre-stage, so its
+ * cumulative time is not part of the parent's time budget and must not be
subtracted.
+ */
+ private static long sumChildrenStat(StageStatsTreeNode node, String statKey)
{
+ long sum = 0;
+ for (StageStatsTreeNode child : node.getChildren()) {
+ if (child.getType().getId() ==
MultiStageOperator.Type.PIPELINE_BREAKER.getId()) {
+ continue;
+ }
+ JsonNode val = child.getStatMap().asJson().get(statKey);
+ if (val != null) {
+ sum += val.asLong(0);
+ }
+ }
+ return sum;
+ }
+
+ /**
+ * Pre-order id assignment identical to the server-side walk in {@code
PlanNodeToOpChain#assignPlanNodeIds}, so the
+ * ids carried by the stats tree resolve to the same plan nodes here.
+ */
+ private static void assignPlanNodeIds(PlanNode node, Map<Integer, PlanNode>
planNodesById, int[] counter) {
+ planNodesById.put(counter[0]++, node);
+ for (PlanNode child : node.getInputs()) {
+ assignPlanNodeIds(child, planNodesById, counter);
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
index 7296369e2aa..eb0405aa289 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
@@ -21,17 +21,23 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.pinot.spi.plugin.PluginManager;
/**
* Registry of all known MSE {@link OperatorTypeDescriptor}s. Built-in types
({@link MultiStageOperator.Type} enum
* entries) are always present. Plugin-defined types are discovered at
class-loading time via {@link ServiceLoader}:
- * any jar on the classpath that ships a
+ * any jar that ships a
* {@code
META-INF/services/org.apache.pinot.query.runtime.operator.OperatorTypeDescriptor}
file will have its
- * descriptors automatically registered without configuration.
+ * descriptors automatically registered without configuration. Discovery
covers both the context classpath and the
+ * {@link PluginManager} plugin classloaders, so descriptors packaged in
isolated plugin jars are found too — like
+ * {@code PinotRuleSet} does for {@code RuleSetCustomizer}. The registry is
first used well after plugins are loaded
+ * (decoding/encoding stream-mode stats), so the plugin classloader snapshot
is complete by then.
*
* <p>Thread-safe: the registry map is built once in a static initializer and
never mutated afterward.
*/
@@ -44,7 +50,22 @@ public final class OperatorTypeRegistry {
for (MultiStageOperator.Type builtIn : MultiStageOperator.Type.values()) {
map.put(builtIn.getId(), builtIn);
}
- for (OperatorTypeDescriptor plugin :
ServiceLoader.load(OperatorTypeDescriptor.class)) {
+ // Dedup by class name: the context classloader and a plugin classloader
may both see the same
+ // META-INF/services file if their classpaths overlap (e.g. fat-jar +
plugin realm).
+ Set<String> seen = new HashSet<>();
+ registerPlugins(ServiceLoader.load(OperatorTypeDescriptor.class), map,
seen);
+ for (ClassLoader pluginClassLoader :
PluginManager.get().getPluginClassLoaders()) {
+ registerPlugins(ServiceLoader.load(OperatorTypeDescriptor.class,
pluginClassLoader), map, seen);
+ }
+ ID_TO_DESCRIPTOR = Collections.unmodifiableMap(map);
+ }
+
+ private static void registerPlugins(ServiceLoader<OperatorTypeDescriptor>
plugins,
+ Map<Integer, OperatorTypeDescriptor> map, Set<String> seen) {
+ for (OperatorTypeDescriptor plugin : plugins) {
+ if (!seen.add(plugin.getClass().getName())) {
+ continue;
+ }
// Enforce the documented id contract: ids below PLUGIN_ID_FLOOR are
reserved for built-ins (current and
// future). Without this check a plugin could squat on a reserved id and
work until a built-in claims it —
// and ids that fit in the legacy single-byte stat format would be
silently emitted there, defeating the
@@ -60,7 +81,6 @@ public final class OperatorTypeRegistry {
"Duplicate operator type id " + plugin.getId() + ": " +
prev.name() + " vs " + plugin.name());
}
}
- ID_TO_DESCRIPTOR = Collections.unmodifiableMap(map);
}
private OperatorTypeRegistry() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 94f422eaba8..c8811ad7c73 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -287,6 +287,15 @@ public class OpChainExecutionContext {
return id != null ? id : -1;
}
+ /**
+ * Returns an unmodifiable view of the recorded PlanNode→id assignments, or
an empty map when none were recorded
+ * (ids are only assigned in stream-stats mode). Useful for plugins that
need to resolve PlanNodes by id, e.g. to
+ * register synthetic stats operators via {@link
#recordPlanNodesForOperator}.
+ */
+ public Map<PlanNode, Integer> getPlanNodeIdMap() {
+ return _planNodeIds == null ? Map.of() :
Collections.unmodifiableMap(_planNodeIds);
+ }
+
private static QueryOperatorFactoryProvider
getDefaultQueryOperatorFactoryProvider() {
// Prefer server context when explicitly configured, otherwise fall back
to broker, then default.
Object serverProvider =
ServerContext.getInstance().getQueryOperatorFactoryProvider();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index b832bf68c31..6157e7ec498 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -452,7 +452,7 @@ public class QueryDispatcher {
stageCoverage.add(expected == 0 ? null : new
QueryResult.StageCoverage(responded, mergeFailed, missing));
}
return new QueryResult(brokerResult.getResultTable(),
brokerResult.getProcessingException(), merged,
- brokerResult.getBrokerReduceTimeMs(), stageCoverage);
+ brokerResult.getBrokerReduceTimeMs(), stageCoverage, accumulator);
}
/// Tries to recover from an exception thrown during legacy (non-streaming)
query dispatching.
@@ -1004,6 +1004,14 @@ public class QueryDispatcher {
*/
@Nullable
private final List<StageCoverage> _stageCoverage;
+ /**
+ * Non-null only in stream-mode queries: the explicit per-stage stats
trees decoded from the
+ * {@code SubmitWithStream} reports, keyed by stage id. Carries the exact
tree shape (including plugin-defined
+ * operator types), which the flat {@link #_queryStats} list cannot
represent. Stage 0 (broker-local) and stages
+ * that never reported are absent.
+ */
+ @Nullable
+ private final Map<Integer, StageStatsTreeNode> _stageStatsTrees;
/**
* Creates a successful query result.
@@ -1021,6 +1029,7 @@ public class QueryDispatcher {
_brokerReduceTimeMs = brokerReduceTimeMs;
_processingException = null;
_stageCoverage = null;
+ _stageStatsTrees = null;
}
/**
@@ -1042,6 +1051,7 @@ public class QueryDispatcher {
_queryStats.add(queryStats.getUpstreamStageStats(i));
}
_stageCoverage = null;
+ _stageStatsTrees = null;
}
/**
@@ -1051,12 +1061,13 @@ public class QueryDispatcher {
*/
public QueryResult(@Nullable ResultTable resultTable, @Nullable
QueryProcessingException processingException,
List<MultiStageQueryStats.StageStats.Closed> queryStats, long
brokerReduceTimeMs,
- @Nullable List<StageCoverage> stageCoverage) {
+ @Nullable List<StageCoverage> stageCoverage, @Nullable Map<Integer,
StageStatsTreeNode> stageStatsTrees) {
_resultTable = resultTable;
_processingException = processingException;
_queryStats = queryStats;
_brokerReduceTimeMs = brokerReduceTimeMs;
_stageCoverage = stageCoverage;
+ _stageStatsTrees = stageStatsTrees;
}
@Nullable
@@ -1086,6 +1097,15 @@ public class QueryDispatcher {
return _stageCoverage;
}
+ /**
+ * Returns the explicit per-stage stats trees decoded from stream-mode
reports, keyed by stage id, or {@code null}
+ * when the query ran in legacy mode. Stages that never reported are
absent from the map.
+ */
+ @Nullable
+ public Map<Integer, StageStatsTreeNode> getStageStatsTrees() {
+ return _stageStatsTrees;
+ }
+
/**
* Per-stage stats coverage for a stream-mode query. Captures how many
opchain reports the broker received vs.
* expected, and how many it couldn't merge (version-skew or shape
mismatch).
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilderTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilderTest.java
new file mode 100644
index 00000000000..87e4e6523b3
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilderTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.PlanFragment;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTypeDescriptor;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests the explicit-tree {@code stageStats} rendering mode of {@link
MultiStageStatsTreeBuilder}, in particular
+ * with plugin-defined operator types (ids >= {@link
OperatorTypeDescriptor#PLUGIN_ID_FLOOR}) whose
+ * {@link StatMap.Key} enums are unknown to the built-in {@link
MultiStageOperator.Type} world.
+ */
+public class MultiStageStatsTreeBuilderTest {
+
+ private static final DataSchema SCHEMA =
+ new DataSchema(new String[]{"col"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+ /** A plugin-only stat enum, intentionally unrelated to any built-in
operator's StatKey. */
+ public enum TestPluginStat implements StatMap.Key {
+ EMITTED_ROWS(StatMap.Type.LONG),
+ CUSTOM_COUNTER(StatMap.Type.LONG);
+
+ private final StatMap.Type _type;
+
+ TestPluginStat(StatMap.Type type) {
+ _type = type;
+ }
+
+ @Override
+ public StatMap.Type getType() {
+ return _type;
+ }
+ }
+
+ private static OperatorTypeDescriptor pluginType(int id, String name) {
+ return new OperatorTypeDescriptor() {
+ @Override
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Class getStatKeyClass() {
+ return TestPluginStat.class;
+ }
+
+ @Override
+ public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
+ }
+ };
+ }
+
+ private static MailboxReceiveNode receiveNode(int stageId, int
senderStageId) {
+ return new MailboxReceiveNode(stageId, SCHEMA, senderStageId,
PinotRelExchangeType.STREAMING,
+ RelDistribution.Type.HASH_DISTRIBUTED, null, null, false, false, null);
+ }
+
+ private static MailboxSendNode sendNode(int stageId, List<PlanNode> inputs,
int receiverStageId) {
+ return new MailboxSendNode(stageId, SCHEMA, inputs, receiverStageId,
PinotRelExchangeType.STREAMING,
+ RelDistribution.Type.HASH_DISTRIBUTED, null, false, null, false,
"murmur");
+ }
+
+ private static Map<Integer, DispatchablePlanFragment> fragments(Map<Integer,
PlanNode> rootsByStage) {
+ Map<Integer, DispatchablePlanFragment> result = new java.util.HashMap<>();
+ rootsByStage.forEach(
+ (stage, root) -> result.put(stage, new DispatchablePlanFragment(new
PlanFragment(stage, root, List.of()))));
+ return result;
+ }
+
+ private static StatMap<TestPluginStat> pluginStats(long emittedRows, long
customCounter) {
+ StatMap<TestPluginStat> stats = new StatMap<>(TestPluginStat.class);
+ stats.merge(TestPluginStat.EMITTED_ROWS, emittedRows);
+ stats.merge(TestPluginStat.CUSTOM_COUNTER, customCounter);
+ return stats;
+ }
+
+ /**
+ * A stage tree made exclusively of plugin types must render fully (no node
dropped, custom stat fields present)
+ * and must nest the sender stage's tree under the node whose plan-node id
resolves to a
+ * {@link MailboxReceiveNode} — even though the receive node carries a
plugin type, not the built-in
+ * MAILBOX_RECEIVE.
+ */
+ @Test
+ public void testPluginTypedTreeRendersWithCrossStageNesting() {
+ OperatorTypeDescriptor sendType = pluginType(300, "TEST_PLUGIN_SEND");
+ OperatorTypeDescriptor receiveType = pluginType(301,
"TEST_PLUGIN_RECEIVE");
+
+ // Stage 1: send(0) -> receive(1) [pre-order ids]; stage 2: send(0) ->
value(1).
+ MailboxReceiveNode stage1Receive = receiveNode(1, 2);
+ PlanNode stage1Root = sendNode(1, List.of(stage1Receive), 0);
+ PlanNode stage2Root = sendNode(2,
+ List.of(new ValueNode(2, SCHEMA, PlanNode.NodeHint.EMPTY, List.of(),
List.of())), 1);
+
+ StageStatsTreeNode stage1Tree = new StageStatsTreeNode(sendType,
List.of(0), pluginStats(10, 7), List.of(
+ new StageStatsTreeNode(receiveType, List.of(1), pluginStats(10, 3),
List.of())));
+ StageStatsTreeNode stage2Tree = new StageStatsTreeNode(sendType,
List.of(0), pluginStats(5, 1), List.of());
+
+ MultiStageStatsTreeBuilder builder = new MultiStageStatsTreeBuilder(
+ fragments(Map.of(1, stage1Root, 2, stage2Root)), List.of(), Map.of(1,
stage1Tree, 2, stage2Tree));
+ ObjectNode json = builder.jsonStatsByStage(1);
+
+ Assert.assertEquals(json.get("type").asText(), "TEST_PLUGIN_SEND");
+ Assert.assertEquals(json.get("customCounter").asLong(), 7);
+ Assert.assertEquals(json.get("emittedRows").asLong(), 10);
+ Assert.assertEquals(json.get("planNodeIds").get(0).asInt(), 0);
+
+ ObjectNode receive = (ObjectNode) json.get("children").get(0);
+ Assert.assertEquals(receive.get("type").asText(), "TEST_PLUGIN_RECEIVE");
+ Assert.assertEquals(receive.get("customCounter").asLong(), 3);
+
+ ObjectNode nestedStage2 = (ObjectNode) receive.get("children").get(0);
+ Assert.assertEquals(nestedStage2.get("type").asText(), "TEST_PLUGIN_SEND");
+ Assert.assertEquals(nestedStage2.get("emittedRows").asLong(), 5);
+ }
+
+ /**
+ * PIPELINE_BREAKER nodes must be collapsed (children hoisted into the
parent) so the JSON shape matches the
+ * legacy renderer, which nests the breaker's receive operators directly
under the LEAF.
+ */
+ @Test
+ public void testPipelineBreakerCollapsed() {
+ MultiStageOperator.Type leaf = MultiStageOperator.Type.LEAF;
+ MultiStageOperator.Type breaker = MultiStageOperator.Type.PIPELINE_BREAKER;
+ MultiStageOperator.Type receive = MultiStageOperator.Type.MAILBOX_RECEIVE;
+
+ StageStatsTreeNode tree = new StageStatsTreeNode(leaf, List.of(),
statsOf(leaf), List.of(
+ new StageStatsTreeNode(breaker, List.of(), statsOf(breaker), List.of(
+ new StageStatsTreeNode(receive, List.of(), statsOf(receive),
List.of())))));
+
+ MultiStageStatsTreeBuilder builder = new MultiStageStatsTreeBuilder(
+ fragments(Map.of(1, sendNode(1, List.of(), 0))), List.of(), Map.of(1,
tree));
+ ObjectNode json = builder.jsonStatsByStage(1);
+
+ Assert.assertEquals(json.get("type").asText(), "LEAF");
+ Assert.assertEquals(json.get("children").size(), 1, "PIPELINE_BREAKER must
be collapsed, not rendered");
+ Assert.assertEquals(json.get("children").get(0).get("type").asText(),
"MAILBOX_RECEIVE");
+ }
+
+ /**
+ * Legacy-path regression: a flat stats list with a plugin type at the send
position must not throw
+ * {@code ArithmeticException} (the built-in PARALLELISM key cannot be read
from a foreign StatMap, which used to
+ * yield a zero divisor).
+ */
+ @Test
+ public void testLegacyPathWithPluginSendTypeDoesNotThrow() {
+ OperatorTypeDescriptor sendType = pluginType(300, "TEST_PLUGIN_SEND");
+ OperatorTypeDescriptor receiveType = pluginType(301,
"TEST_PLUGIN_RECEIVE");
+
+ MailboxReceiveNode stage1Receive = receiveNode(1, 2);
+ PlanNode stage1Root = sendNode(1, List.of(stage1Receive), 0);
+ PlanNode stage2Root = sendNode(2,
+ List.of(new ValueNode(2, SCHEMA, PlanNode.NodeHint.EMPTY, List.of(),
List.of())), 1);
+ // Inorder flat list: child (receive) first, then the send root.
+ MultiStageQueryStats.StageStats.Closed flatStats = new
MultiStageQueryStats.StageStats.Closed(
+ List.of(receiveType, sendType), List.of(pluginStats(10, 3),
pluginStats(10, 7)));
+
+ java.util.List<MultiStageQueryStats.StageStats.Closed> queryStats = new
java.util.ArrayList<>();
+ queryStats.add(null); // stage 0
+ queryStats.add(flatStats); // stage 1
+ queryStats.add(null); // stage 2: no stats — renders the
EMPTY_MAILBOX_SEND placeholder
+ MultiStageStatsTreeBuilder builder =
+ new MultiStageStatsTreeBuilder(fragments(Map.of(1, stage1Root, 2,
stage2Root)), queryStats);
+
+ // The legacy renderer cannot pair plugin types against the PlanNode tree,
so the nodes are not rendered —
+ // but it must not throw either.
+ ObjectNode json = builder.jsonStatsByStage(1);
+ Assert.assertNotNull(json);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static StatMap<?> statsOf(MultiStageOperator.Type type) {
+ return new StatMap(type.getStatKeyClass());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]