This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b6b3b6c1f1a [multistage] Fix stream-mode stats handling for 
plugin-defined operator types (follow-up to #18458) (#18736)
b6b3b6c1f1a is described below

commit b6b3b6c1f1ae5e1d53a5fd0026205825d14316c9
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jun 17 10:42:41 2026 +0200

    [multistage] Fix stream-mode stats handling for plugin-defined operator 
types (follow-up to #18458) (#18736)
---
 .../MultiStageBrokerRequestHandler.java            |   9 +-
 .../query/runtime/InStageStatsTreeBuilder.java     |  14 +-
 .../query/runtime/MultiStageStatsTreeBuilder.java  | 161 +++++++++++++++
 .../runtime/operator/OperatorTypeRegistry.java     |  28 ++-
 .../runtime/plan/OpChainExecutionContext.java      |   9 +
 .../query/service/dispatch/QueryDispatcher.java    |  24 ++-
 .../runtime/MultiStageStatsTreeBuilderTest.java    | 215 +++++++++++++++++++++
 7 files changed, 447 insertions(+), 13 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 33651babb7f..ee5f62a487b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -89,6 +89,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.auth.TableAuthorizationResult;
@@ -815,7 +816,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       }
 
       fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(), 
dispatchableSubPlan,
-          queryResults.getStageCoverage());
+          queryResults.getStageCoverage(), queryResults.getStageStatsTrees());
 
       if (QueryOptionsUtils.isStreamStats(query.getOptions(), 
_streamStatsDefault)) {
         
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MSE_STREAM_STATS_QUERIES, 1);
@@ -919,11 +920,13 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
   private void fillOldBrokerResponseStats(BrokerResponseNativeV2 
brokerResponse,
       List<MultiStageQueryStats.StageStats.Closed> queryStats, 
DispatchableSubPlan dispatchableSubPlan,
-      @Nullable List<QueryDispatcher.QueryResult.StageCoverage> stageCoverage) 
{
+      @Nullable List<QueryDispatcher.QueryResult.StageCoverage> stageCoverage,
+      @Nullable Map<Integer, StageStatsTreeNode> stageStatsTrees) {
     try {
       Map<Integer, DispatchablePlanFragment> queryStageMap = 
dispatchableSubPlan.getQueryStageMap();
 
-      MultiStageStatsTreeBuilder treeBuilder = new 
MultiStageStatsTreeBuilder(queryStageMap, queryStats);
+      MultiStageStatsTreeBuilder treeBuilder = new 
MultiStageStatsTreeBuilder(queryStageMap, queryStats,
+          stageStatsTrees);
       brokerResponse.setStageStats(treeBuilder.jsonStatsByStage(0));
       for (MultiStageQueryStats.StageStats.Closed stageStats : queryStats) {
         if (stageStats != null) { // for example pipeline breaker may not have 
stats
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
index 3018ffefef4..cbe9a870722 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
@@ -304,10 +304,16 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, InSt
 
   @Override
   public ObjectNode visitMailboxSend(MailboxSendNode node, Context context) {
-    @SuppressWarnings("unchecked")
-    StatMap<MailboxSendOperator.StatKey> operatorStats =
-        (StatMap<MailboxSendOperator.StatKey>) 
_stageStats.getOperatorStats(_index);
-    long parallelism = 
operatorStats.getLong(MailboxSendOperator.StatKey.PARALLELISM);
+    long parallelism = 1;
+    // Plugin-defined send operators (id >= 256) carry a different StatMap key 
class; reading the built-in
+    // PARALLELISM key from them returns 0, which would make every division by 
parallelism below throw. Only read
+    // it when the stats really are the built-in MAILBOX_SEND ones.
+    if (_stageStats.getOperatorType(_index) == 
MultiStageOperator.Type.MAILBOX_SEND) {
+      @SuppressWarnings("unchecked")
+      StatMap<MailboxSendOperator.StatKey> operatorStats =
+          (StatMap<MailboxSendOperator.StatKey>) 
_stageStats.getOperatorStats(_index);
+      parallelism = Math.max(1, 
operatorStats.getLong(MailboxSendOperator.StatKey.PARALLELISM));
+    }
     Context myContext = new Context((int) parallelism);
     return recursiveCase(node, MultiStageOperator.Type.MAILBOX_SEND, 
myContext);
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
index 98016e91663..a19c81a7723 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilder.java
@@ -18,32 +18,65 @@
  */
 package org.apache.pinot.query.runtime;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
+/**
+ * Builds the {@code stageStats} JSON tree of the broker response.
+ *
+ * <p>Two rendering modes per stage:
+ * <ul>
+ *   <li><b>Explicit tree</b> (stream-mode stats): when a {@link 
StageStatsTreeNode} is available for the stage, the
+ *   JSON is rendered directly from it. This is the only mode that can 
faithfully render plugin-defined operator
+ *   types (ids &gt;= 256), whose shape cannot be re-derived by pairing the 
flat stats list against the stage's
+ *   {@link PlanNode} tree.</li>
+ *   <li><b>Plan-node pairing</b> (legacy): the flat per-stage stats list is 
paired positionally against the stage's
+ *   PlanNode tree via {@link InStageStatsTreeBuilder}.</li>
+ * </ul>
+ */
 public class MultiStageStatsTreeBuilder {
   private final Map<Integer, PlanNode> _planNodes;
   private final List<? extends MultiStageQueryStats.StageStats> _queryStats;
   private final Map<Integer, DispatchablePlanFragment> _planFragments;
+  @Nullable
+  private final Map<Integer, StageStatsTreeNode> _stageStatsTrees;
 
   public MultiStageStatsTreeBuilder(Map<Integer, DispatchablePlanFragment> 
planFragments,
       List<? extends MultiStageQueryStats.StageStats> queryStats) {
+    this(planFragments, queryStats, null);
+  }
+
+  public MultiStageStatsTreeBuilder(Map<Integer, DispatchablePlanFragment> 
planFragments,
+      List<? extends MultiStageQueryStats.StageStats> queryStats,
+      @Nullable Map<Integer, StageStatsTreeNode> stageStatsTrees) {
     _planFragments = planFragments;
     _planNodes = Maps.newHashMapWithExpectedSize(planFragments.size());
     for (Map.Entry<Integer, DispatchablePlanFragment> entry : 
planFragments.entrySet()) {
       _planNodes.put(entry.getKey(), 
entry.getValue().getPlanFragment().getFragmentRoot());
     }
     _queryStats = queryStats;
+    _stageStatsTrees = stageStatsTrees;
   }
 
   public ObjectNode jsonStatsByStage(int stage) {
+    StageStatsTreeNode statsTree = _stageStatsTrees == null ? null : 
_stageStatsTrees.get(stage);
+    if (statsTree != null) {
+      return jsonFromStatsTree(statsTree, stage);
+    }
     PlanNode planNode = _planNodes.get(stage);
 
     MultiStageQueryStats.StageStats stageStats = stage < _queryStats.size() ? 
_queryStats.get(stage) : null;
@@ -63,4 +96,132 @@ public class MultiStageStatsTreeBuilder {
     InStageStatsTreeBuilder treeBuilder = new 
InStageStatsTreeBuilder(stageStats, this::jsonStatsByStage);
     return planNode.visit(treeBuilder, new InStageStatsTreeBuilder.Context(1));
   }
+
+  /**
+   * Renders a stage's explicit stats tree. The shape (and any plugin operator 
types) come straight from the decoded
+   * tree; the stage's PlanNode tree is consulted only to resolve which plan 
nodes are {@link MailboxReceiveNode}s so
+   * the sender stage's tree can be nested under them, mirroring the legacy 
renderer's cross-stage nesting.
+   */
+  private ObjectNode jsonFromStatsTree(StageStatsTreeNode statsTree, int 
stage) {
+    Map<Integer, PlanNode> planNodesById = new HashMap<>();
+    PlanNode fragmentRoot = _planNodes.get(stage);
+    if (fragmentRoot != null) {
+      assignPlanNodeIds(fragmentRoot, planNodesById, new int[]{0});
+    }
+    // The stage root is the send operator: its parallelism (when reported, 
e.g. by the built-in MAILBOX_SEND stats)
+    // scales cpu-time into wall-clock time for the whole stage. Plugin send 
types may not report it; default to 1.
+    int parallelism = 1;
+    JsonNode rootParallelism = 
statsTree.getStatMap().asJson().get("parallelism");
+    if (rootParallelism != null) {
+      parallelism = Math.max(1, rootParallelism.asInt(1));
+    }
+    return jsonFromStatsTreeNode(statsTree, planNodesById, parallelism);
+  }
+
+  private ObjectNode jsonFromStatsTreeNode(StageStatsTreeNode node, 
Map<Integer, PlanNode> planNodesById,
+      int parallelism) {
+    ObjectNode json = JsonUtils.newObjectNode();
+    json.put("type", node.getType().name());
+    for (Map.Entry<String, JsonNode> entry : 
node.getStatMap().asJson().properties()) {
+      json.set(entry.getKey(), entry.getValue());
+    }
+    if (json.get("parallelism") == null) {
+      json.put("parallelism", parallelism);
+    }
+    JsonNode executionTimeMs = json.get("executionTimeMs");
+    if (executionTimeMs != null) {
+      json.put("clockTimeMs", executionTimeMs.asLong(0) / parallelism);
+    }
+
+    if (!node.getPlanNodeIds().isEmpty()) {
+      ArrayNode planNodeIds = JsonUtils.newArrayNode();
+      node.getPlanNodeIds().forEach(planNodeIds::add);
+      json.set("planNodeIds", planNodeIds);
+    }
+
+    ArrayNode children = JsonUtils.newArrayNode();
+    for (StageStatsTreeNode child : node.getChildren()) {
+      // Collapse PIPELINE_BREAKER nodes for parity with the legacy renderer, 
which nests the
+      // breaker's receive operators directly under the LEAF and never renders 
the breaker itself.
+      // The JSON shape is consumed by external tooling, so the explicit-tree 
mode must not change it.
+      if (child.getType().getId() == 
MultiStageOperator.Type.PIPELINE_BREAKER.getId()) {
+        for (StageStatsTreeNode grandChild : child.getChildren()) {
+          children.add(jsonFromStatsTreeNode(grandChild, planNodesById, 
parallelism));
+        }
+      } else {
+        children.add(jsonFromStatsTreeNode(child, planNodesById, parallelism));
+      }
+    }
+    // Cross-stage nesting: a node whose plan node is a mailbox receive gets 
the sender stage's tree as a child.
+    // Type-agnostic on purpose — plugin-defined receive operators carry a 
different descriptor than the built-in
+    // MAILBOX_RECEIVE but map to the same MailboxReceiveNode.
+    for (Integer planNodeId : node.getPlanNodeIds()) {
+      PlanNode planNode = planNodesById.get(planNodeId);
+      if (planNode instanceof MailboxReceiveNode) {
+        children.add(jsonStatsByStage(((MailboxReceiveNode) 
planNode).getSenderStageId()));
+      }
+    }
+    if (!children.isEmpty()) {
+      json.set("children", children);
+    }
+
+    // self* fields: parent-minus-children derived stats, matching 
InStageStatsTreeBuilder semantics.
+    // These must be omitted when zero to keep parity with the legacy renderer.
+    JsonNode execNode = json.get("executionTimeMs");
+    if (execNode != null) {
+      long selfExecTimeMs = execNode.asLong(0) - sumChildrenStat(node, 
"executionTimeMs");
+      if (selfExecTimeMs != 0) {
+        json.put("selfExecutionTimeMs", selfExecTimeMs);
+        json.put("selfClockTimeMs", selfExecTimeMs / parallelism);
+      }
+    }
+    JsonNode allocNode = json.get("allocatedMemoryBytes");
+    if (allocNode != null) {
+      long selfAllocBytes = allocNode.asLong(0) - sumChildrenStat(node, 
"allocatedMemoryBytes");
+      if (selfAllocBytes != 0) {
+        json.put("selfAllocatedMB", selfAllocBytes / (1024 * 1024));
+      }
+    }
+    JsonNode gcNode = json.get("gcTimeMs");
+    if (gcNode != null) {
+      long selfGcTimeMs = gcNode.asLong(0) - sumChildrenStat(node, "gcTimeMs");
+      if (selfGcTimeMs != 0) {
+        json.put("selfGcTimeMs", selfGcTimeMs);
+      }
+    }
+
+    return json;
+  }
+
+  /**
+   * Sums a named stat field across the direct non-pipeline-breaker children 
of a node.
+   *
+   * <p>PIPELINE_BREAKER children are skipped entirely, mirroring {@link 
InStageStatsTreeBuilder}'s
+   * {@code adjustWithChildren=false} path for LEAF nodes: the breaker runs 
pre-stage, so its
+   * cumulative time is not part of the parent's time budget and must not be 
subtracted.
+   */
+  private static long sumChildrenStat(StageStatsTreeNode node, String statKey) 
{
+    long sum = 0;
+    for (StageStatsTreeNode child : node.getChildren()) {
+      if (child.getType().getId() == 
MultiStageOperator.Type.PIPELINE_BREAKER.getId()) {
+        continue;
+      }
+      JsonNode val = child.getStatMap().asJson().get(statKey);
+      if (val != null) {
+        sum += val.asLong(0);
+      }
+    }
+    return sum;
+  }
+
+  /**
+   * Pre-order id assignment identical to the server-side walk in {@code 
PlanNodeToOpChain#assignPlanNodeIds}, so the
+   * ids carried by the stats tree resolve to the same plan nodes here.
+   */
+  private static void assignPlanNodeIds(PlanNode node, Map<Integer, PlanNode> 
planNodesById, int[] counter) {
+    planNodesById.put(counter[0]++, node);
+    for (PlanNode child : node.getInputs()) {
+      assignPlanNodeIds(child, planNodesById, counter);
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
index 7296369e2aa..eb0405aa289 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorTypeRegistry.java
@@ -21,17 +21,23 @@ package org.apache.pinot.query.runtime.operator;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.plugin.PluginManager;
 
 
 /**
  * Registry of all known MSE {@link OperatorTypeDescriptor}s. Built-in types 
({@link MultiStageOperator.Type} enum
  * entries) are always present. Plugin-defined types are discovered at 
class-loading time via {@link ServiceLoader}:
- * any jar on the classpath that ships a
+ * any jar that ships a
  * {@code 
META-INF/services/org.apache.pinot.query.runtime.operator.OperatorTypeDescriptor}
 file will have its
- * descriptors automatically registered without configuration.
+ * descriptors automatically registered without configuration. Discovery 
covers both the context classpath and the
+ * {@link PluginManager} plugin classloaders, so descriptors packaged in 
isolated plugin jars are found too — like
+ * {@code PinotRuleSet} does for {@code RuleSetCustomizer}. The registry is 
first used well after plugins are loaded
+ * (decoding/encoding stream-mode stats), so the plugin classloader snapshot 
is complete by then.
  *
  * <p>Thread-safe: the registry map is built once in a static initializer and 
never mutated afterward.
  */
@@ -44,7 +50,22 @@ public final class OperatorTypeRegistry {
     for (MultiStageOperator.Type builtIn : MultiStageOperator.Type.values()) {
       map.put(builtIn.getId(), builtIn);
     }
-    for (OperatorTypeDescriptor plugin : 
ServiceLoader.load(OperatorTypeDescriptor.class)) {
+    // Dedup by class name: the context classloader and a plugin classloader 
may both see the same
+    // META-INF/services file if their classpaths overlap (e.g. fat-jar + 
plugin realm).
+    Set<String> seen = new HashSet<>();
+    registerPlugins(ServiceLoader.load(OperatorTypeDescriptor.class), map, 
seen);
+    for (ClassLoader pluginClassLoader : 
PluginManager.get().getPluginClassLoaders()) {
+      registerPlugins(ServiceLoader.load(OperatorTypeDescriptor.class, 
pluginClassLoader), map, seen);
+    }
+    ID_TO_DESCRIPTOR = Collections.unmodifiableMap(map);
+  }
+
+  private static void registerPlugins(ServiceLoader<OperatorTypeDescriptor> 
plugins,
+      Map<Integer, OperatorTypeDescriptor> map, Set<String> seen) {
+    for (OperatorTypeDescriptor plugin : plugins) {
+      if (!seen.add(plugin.getClass().getName())) {
+        continue;
+      }
       // Enforce the documented id contract: ids below PLUGIN_ID_FLOOR are 
reserved for built-ins (current and
       // future). Without this check a plugin could squat on a reserved id and 
work until a built-in claims it —
       // and ids that fit in the legacy single-byte stat format would be 
silently emitted there, defeating the
@@ -60,7 +81,6 @@ public final class OperatorTypeRegistry {
             "Duplicate operator type id " + plugin.getId() + ": " + 
prev.name() + " vs " + plugin.name());
       }
     }
-    ID_TO_DESCRIPTOR = Collections.unmodifiableMap(map);
   }
 
   private OperatorTypeRegistry() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 94f422eaba8..c8811ad7c73 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -287,6 +287,15 @@ public class OpChainExecutionContext {
     return id != null ? id : -1;
   }
 
+  /**
+   * Returns an unmodifiable view of the recorded PlanNode→id assignments, or 
an empty map when none were recorded
+   * (ids are only assigned in stream-stats mode). Useful for plugins that 
need to resolve PlanNodes by id, e.g. to
+   * register synthetic stats operators via {@link 
#recordPlanNodesForOperator}.
+   */
+  public Map<PlanNode, Integer> getPlanNodeIdMap() {
+    return _planNodeIds == null ? Map.of() : 
Collections.unmodifiableMap(_planNodeIds);
+  }
+
   private static QueryOperatorFactoryProvider 
getDefaultQueryOperatorFactoryProvider() {
     // Prefer server context when explicitly configured, otherwise fall back 
to broker, then default.
     Object serverProvider = 
ServerContext.getInstance().getQueryOperatorFactoryProvider();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index b832bf68c31..6157e7ec498 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -452,7 +452,7 @@ public class QueryDispatcher {
       stageCoverage.add(expected == 0 ? null : new 
QueryResult.StageCoverage(responded, mergeFailed, missing));
     }
     return new QueryResult(brokerResult.getResultTable(), 
brokerResult.getProcessingException(), merged,
-        brokerResult.getBrokerReduceTimeMs(), stageCoverage);
+        brokerResult.getBrokerReduceTimeMs(), stageCoverage, accumulator);
   }
 
   /// Tries to recover from an exception thrown during legacy (non-streaming) 
query dispatching.
@@ -1004,6 +1004,14 @@ public class QueryDispatcher {
      */
     @Nullable
     private final List<StageCoverage> _stageCoverage;
+    /**
+     * Non-null only in stream-mode queries: the explicit per-stage stats 
trees decoded from the
+     * {@code SubmitWithStream} reports, keyed by stage id. Carries the exact 
tree shape (including plugin-defined
+     * operator types), which the flat {@link #_queryStats} list cannot 
represent. Stage 0 (broker-local) and stages
+     * that never reported are absent.
+     */
+    @Nullable
+    private final Map<Integer, StageStatsTreeNode> _stageStatsTrees;
 
     /**
      * Creates a successful query result.
@@ -1021,6 +1029,7 @@ public class QueryDispatcher {
       _brokerReduceTimeMs = brokerReduceTimeMs;
       _processingException = null;
       _stageCoverage = null;
+      _stageStatsTrees = null;
     }
 
     /**
@@ -1042,6 +1051,7 @@ public class QueryDispatcher {
         _queryStats.add(queryStats.getUpstreamStageStats(i));
       }
       _stageCoverage = null;
+      _stageStatsTrees = null;
     }
 
     /**
@@ -1051,12 +1061,13 @@ public class QueryDispatcher {
      */
     public QueryResult(@Nullable ResultTable resultTable, @Nullable 
QueryProcessingException processingException,
         List<MultiStageQueryStats.StageStats.Closed> queryStats, long 
brokerReduceTimeMs,
-        @Nullable List<StageCoverage> stageCoverage) {
+        @Nullable List<StageCoverage> stageCoverage, @Nullable Map<Integer, 
StageStatsTreeNode> stageStatsTrees) {
       _resultTable = resultTable;
       _processingException = processingException;
       _queryStats = queryStats;
       _brokerReduceTimeMs = brokerReduceTimeMs;
       _stageCoverage = stageCoverage;
+      _stageStatsTrees = stageStatsTrees;
     }
 
     @Nullable
@@ -1086,6 +1097,15 @@ public class QueryDispatcher {
       return _stageCoverage;
     }
 
+    /**
+     * Returns the explicit per-stage stats trees decoded from stream-mode 
reports, keyed by stage id, or {@code null}
+     * when the query ran in legacy mode. Stages that never reported are 
absent from the map.
+     */
+    @Nullable
+    public Map<Integer, StageStatsTreeNode> getStageStatsTrees() {
+      return _stageStatsTrees;
+    }
+
     /**
      * Per-stage stats coverage for a stream-mode query. Captures how many 
opchain reports the broker received vs.
      * expected, and how many it couldn't merge (version-skew or shape 
mismatch).
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilderTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilderTest.java
new file mode 100644
index 00000000000..87e4e6523b3
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/MultiStageStatsTreeBuilderTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.PlanFragment;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTypeDescriptor;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests the explicit-tree {@code stageStats} rendering mode of {@link 
MultiStageStatsTreeBuilder}, in particular
+ * with plugin-defined operator types (ids >= {@link 
OperatorTypeDescriptor#PLUGIN_ID_FLOOR}) whose
+ * {@link StatMap.Key} enums are unknown to the built-in {@link 
MultiStageOperator.Type} world.
+ */
+public class MultiStageStatsTreeBuilderTest {
+
+  private static final DataSchema SCHEMA =
+      new DataSchema(new String[]{"col"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+  /** A plugin-only stat enum, intentionally unrelated to any built-in 
operator's StatKey. */
+  public enum TestPluginStat implements StatMap.Key {
+    EMITTED_ROWS(StatMap.Type.LONG),
+    CUSTOM_COUNTER(StatMap.Type.LONG);
+
+    private final StatMap.Type _type;
+
+    TestPluginStat(StatMap.Type type) {
+      _type = type;
+    }
+
+    @Override
+    public StatMap.Type getType() {
+      return _type;
+    }
+  }
+
+  private static OperatorTypeDescriptor pluginType(int id, String name) {
+    return new OperatorTypeDescriptor() {
+      @Override
+      public int getId() {
+        return id;
+      }
+
+      @Override
+      public String name() {
+        return name;
+      }
+
+      @Override
+      @SuppressWarnings("rawtypes")
+      public Class getStatKeyClass() {
+        return TestPluginStat.class;
+      }
+
+      @Override
+      public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
+      }
+    };
+  }
+
+  private static MailboxReceiveNode receiveNode(int stageId, int 
senderStageId) {
+    return new MailboxReceiveNode(stageId, SCHEMA, senderStageId, 
PinotRelExchangeType.STREAMING,
+        RelDistribution.Type.HASH_DISTRIBUTED, null, null, false, false, null);
+  }
+
+  private static MailboxSendNode sendNode(int stageId, List<PlanNode> inputs, 
int receiverStageId) {
+    return new MailboxSendNode(stageId, SCHEMA, inputs, receiverStageId, 
PinotRelExchangeType.STREAMING,
+        RelDistribution.Type.HASH_DISTRIBUTED, null, false, null, false, 
"murmur");
+  }
+
+  private static Map<Integer, DispatchablePlanFragment> fragments(Map<Integer, 
PlanNode> rootsByStage) {
+    Map<Integer, DispatchablePlanFragment> result = new java.util.HashMap<>();
+    rootsByStage.forEach(
+        (stage, root) -> result.put(stage, new DispatchablePlanFragment(new 
PlanFragment(stage, root, List.of()))));
+    return result;
+  }
+
+  private static StatMap<TestPluginStat> pluginStats(long emittedRows, long 
customCounter) {
+    StatMap<TestPluginStat> stats = new StatMap<>(TestPluginStat.class);
+    stats.merge(TestPluginStat.EMITTED_ROWS, emittedRows);
+    stats.merge(TestPluginStat.CUSTOM_COUNTER, customCounter);
+    return stats;
+  }
+
+  /**
+   * A stage tree made exclusively of plugin types must render fully (no node 
dropped, custom stat fields present)
+   * and must nest the sender stage's tree under the node whose plan-node id 
resolves to a
+   * {@link MailboxReceiveNode} — even though the receive node carries a 
plugin type, not the built-in
+   * MAILBOX_RECEIVE.
+   */
+  @Test
+  public void testPluginTypedTreeRendersWithCrossStageNesting() {
+    OperatorTypeDescriptor sendType = pluginType(300, "TEST_PLUGIN_SEND");
+    OperatorTypeDescriptor receiveType = pluginType(301, 
"TEST_PLUGIN_RECEIVE");
+
+    // Stage 1: send(0) -> receive(1) [pre-order ids]; stage 2: send(0) -> 
value(1).
+    MailboxReceiveNode stage1Receive = receiveNode(1, 2);
+    PlanNode stage1Root = sendNode(1, List.of(stage1Receive), 0);
+    PlanNode stage2Root = sendNode(2,
+        List.of(new ValueNode(2, SCHEMA, PlanNode.NodeHint.EMPTY, List.of(), 
List.of())), 1);
+
+    StageStatsTreeNode stage1Tree = new StageStatsTreeNode(sendType, 
List.of(0), pluginStats(10, 7), List.of(
+        new StageStatsTreeNode(receiveType, List.of(1), pluginStats(10, 3), 
List.of())));
+    StageStatsTreeNode stage2Tree = new StageStatsTreeNode(sendType, 
List.of(0), pluginStats(5, 1), List.of());
+
+    MultiStageStatsTreeBuilder builder = new MultiStageStatsTreeBuilder(
+        fragments(Map.of(1, stage1Root, 2, stage2Root)), List.of(), Map.of(1, 
stage1Tree, 2, stage2Tree));
+    ObjectNode json = builder.jsonStatsByStage(1);
+
+    Assert.assertEquals(json.get("type").asText(), "TEST_PLUGIN_SEND");
+    Assert.assertEquals(json.get("customCounter").asLong(), 7);
+    Assert.assertEquals(json.get("emittedRows").asLong(), 10);
+    Assert.assertEquals(json.get("planNodeIds").get(0).asInt(), 0);
+
+    ObjectNode receive = (ObjectNode) json.get("children").get(0);
+    Assert.assertEquals(receive.get("type").asText(), "TEST_PLUGIN_RECEIVE");
+    Assert.assertEquals(receive.get("customCounter").asLong(), 3);
+
+    ObjectNode nestedStage2 = (ObjectNode) receive.get("children").get(0);
+    Assert.assertEquals(nestedStage2.get("type").asText(), "TEST_PLUGIN_SEND");
+    Assert.assertEquals(nestedStage2.get("emittedRows").asLong(), 5);
+  }
+
+  /**
+   * PIPELINE_BREAKER nodes must be collapsed (children hoisted into the 
parent) so the JSON shape matches the
+   * legacy renderer, which nests the breaker's receive operators directly 
under the LEAF.
+   */
+  @Test
+  public void testPipelineBreakerCollapsed() {
+    MultiStageOperator.Type leaf = MultiStageOperator.Type.LEAF;
+    MultiStageOperator.Type breaker = MultiStageOperator.Type.PIPELINE_BREAKER;
+    MultiStageOperator.Type receive = MultiStageOperator.Type.MAILBOX_RECEIVE;
+
+    StageStatsTreeNode tree = new StageStatsTreeNode(leaf, List.of(), 
statsOf(leaf), List.of(
+        new StageStatsTreeNode(breaker, List.of(), statsOf(breaker), List.of(
+            new StageStatsTreeNode(receive, List.of(), statsOf(receive), 
List.of())))));
+
+    MultiStageStatsTreeBuilder builder = new MultiStageStatsTreeBuilder(
+        fragments(Map.of(1, sendNode(1, List.of(), 0))), List.of(), Map.of(1, 
tree));
+    ObjectNode json = builder.jsonStatsByStage(1);
+
+    Assert.assertEquals(json.get("type").asText(), "LEAF");
+    Assert.assertEquals(json.get("children").size(), 1, "PIPELINE_BREAKER must 
be collapsed, not rendered");
+    Assert.assertEquals(json.get("children").get(0).get("type").asText(), 
"MAILBOX_RECEIVE");
+  }
+
+  /**
+   * Legacy-path regression: a flat stats list with a plugin type at the send 
position must not throw
+   * {@code ArithmeticException} (the built-in PARALLELISM key cannot be read 
from a foreign StatMap, which used to
+   * yield a zero divisor).
+   */
+  @Test
+  public void testLegacyPathWithPluginSendTypeDoesNotThrow() {
+    OperatorTypeDescriptor sendType = pluginType(300, "TEST_PLUGIN_SEND");
+    OperatorTypeDescriptor receiveType = pluginType(301, 
"TEST_PLUGIN_RECEIVE");
+
+    MailboxReceiveNode stage1Receive = receiveNode(1, 2);
+    PlanNode stage1Root = sendNode(1, List.of(stage1Receive), 0);
+    PlanNode stage2Root = sendNode(2,
+        List.of(new ValueNode(2, SCHEMA, PlanNode.NodeHint.EMPTY, List.of(), 
List.of())), 1);
+    // Inorder flat list: child (receive) first, then the send root.
+    MultiStageQueryStats.StageStats.Closed flatStats = new 
MultiStageQueryStats.StageStats.Closed(
+        List.of(receiveType, sendType), List.of(pluginStats(10, 3), 
pluginStats(10, 7)));
+
+    java.util.List<MultiStageQueryStats.StageStats.Closed> queryStats = new 
java.util.ArrayList<>();
+    queryStats.add(null); // stage 0
+    queryStats.add(flatStats); // stage 1
+    queryStats.add(null); // stage 2: no stats — renders the 
EMPTY_MAILBOX_SEND placeholder
+    MultiStageStatsTreeBuilder builder =
+        new MultiStageStatsTreeBuilder(fragments(Map.of(1, stage1Root, 2, 
stage2Root)), queryStats);
+
+    // The legacy renderer cannot pair plugin types against the PlanNode tree, 
so the nodes are not rendered —
+    // but it must not throw either.
+    ObjectNode json = builder.jsonStatsByStage(1);
+    Assert.assertNotNull(json);
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private static StatMap<?> statsOf(MultiStageOperator.Type type) {
+    return new StatMap(type.getStatKeyClass());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to