This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f15c471a514 Keep stats during pipeline breaker (#17576)
f15c471a514 is described below

commit f15c471a51400aee908b3936c759434dfce37101
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Feb 12 11:42:25 2026 +0100

    Keep stats during pipeline breaker (#17576)
---
 .../components/Query/FlamegraphQueryStageStats.tsx |  78 ++++++++++----
 .../tests/MultiStageEngineIntegrationTest.java     | 119 ++++++++++++++++++++-
 .../query/runtime/InStageStatsTreeBuilder.java     |  85 +++++++++++++--
 .../apache/pinot/query/runtime/QueryRunner.java    |  14 +--
 .../pinot/query/runtime/operator/LeafOperator.java |  29 ++++-
 .../runtime/plan/OpChainExecutionContext.java      |  17 ++-
 .../query/runtime/plan/PlanNodeToOpChain.java      |  12 ++-
 .../plan/pipeline/PipelineBreakerExecutor.java     |   4 +-
 .../plan/pipeline/PipelineBreakerOperator.java     |  10 +-
 .../query/service/dispatch/QueryDispatcher.java    |   2 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   2 +-
 .../executor/OpChainSchedulerServiceTest.java      |   2 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |   2 +-
 .../query/runtime/operator/OperatorTestUtil.java   |   4 +-
 .../plan/pipeline/PipelineBreakerExecutorTest.java |   2 +-
 .../pinot/server/starter/ServerInstance.java       |   5 +-
 .../server/starter/helix/BaseServerStarter.java    |   5 +-
 .../helix/KeepPipelineBreakerStatsPredicate.java   |  77 +++++++++++++
 .../pinot/server/worker/WorkerQueryServer.java     |   7 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   8 ++
 20 files changed, 427 insertions(+), 57 deletions(-)

diff --git 
a/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
 
b/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
index 85fd0b84679..05293372a91 100644
--- 
a/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
+++ 
b/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
@@ -99,48 +99,90 @@ const generateFlameGraphData = (stats, highlightedStage : 
Number = null, mode :
 
   const stages = [];
 
-  const processNode = (node, currentStage) => {
-    const { children, ...data } = node;
+  const processNode = (node, currentStage, onPipelineBreaker: boolean = false) 
=> {
+    let { children, ...data } = node;
 
     const baseNode = {
       tooltip: JSON.stringify(data),
       value: getNodeValue(data),
       backgroundColor: highlightedStage === currentStage ? 'lightblue' : null,
     }
-    // If it's a MAILBOX_RECEIVE node, prune the tree here
-    if (data.type === "MAILBOX_RECEIVE") {
-      const sendOperator = children[0];
-      visitStage(sendOperator, currentStage);
-      return {
-        name: `MAILBOX_RECEIVE from stage ${sendOperator.stage || 'unknown'}`,
-        relatedStage: sendOperator.stage || null,
-        ...baseNode,
-      };
+    let name: String;
+    switch (data.type) {
+      case "LEAF": {
+        name = `LEAF (${data.table || ''})`;
+        if (children && children.length !== 0) {
+          // We don't want to include the children in this stage because its 
time is independent of the leaf node.
+          children = [];
+        }
+        break;
+      }
+      case "MAILBOX_RECEIVE": {
+        // If it's a MAILBOX_RECEIVE node, prune the tree here
+        const sendOperator = children[0];
+        visitStage(sendOperator, currentStage);
+        const prefix = onPipelineBreaker ? "PIPELINE_BREAKER " : 
"MAILBOX_RECEIVE";
+        return {
+          name: `${prefix} from stage ${sendOperator.stage || 'unknown'}`,
+          relatedStage: sendOperator.stage || null, ...baseNode,
+        };
+      }
+      default: {
+        name = data.type || "Unknown Type";
+      }
     }
 
     // For other nodes, continue processing children
     return {
-      name: data.type || "Unknown Type",
+      name: name,
       ...baseNode,
       children: children
-        ? children.map(node => processNode(node, currentStage))
+        ? children.map(node => processNode(node, currentStage, 
onPipelineBreaker))
           .filter(child => child !== null) : [],
     };
   }
 
+  const getPipelineBreakerNode = (children, currentStage: number) => {
+    if (!children || children.length === 0) return null;
+    const stack = [...children];
+    while (stack.length > 0) {
+      const node = stack.pop();
+      if (node.type === "LEAF" && node.children && node.children.length > 0) {
+        return processNode(node.children[0], currentStage, true);
+      }
+      if (node.type == "MAILBOX_RECEIVE") {
+        // We don't want to go past MAILBOX_RECEIVE nodes, as their children 
belong to other stages.
+        continue;
+      }
+      if (node.children && node.children.length > 0) {
+        stack.push(...node.children);
+      }
+    }
+    return null;
+  }
+
   const visitStage = (node, parentStage = null) => {
     const { children, ...data } = node;
     const stage = data.stage || 0;
-    const value = getNodeValue(node);
+    const pipelineBreakerNode = getPipelineBreakerNode(children, stage);
+    let value: number;
+    const childrenNodes = children
+        ? children.map(node => processNode(node, stage)).filter(child => child 
!== null)
+        : [];
+    if (pipelineBreakerNode) {
+      value = getNodeValue(node) + (pipelineBreakerNode.value || 0);
+      childrenNodes.push(pipelineBreakerNode);
+    } else {
+      value = getNodeValue(node)
+    }
+
     stages.push({
       name: "Stage " + stage,
       value: value,
       tooltip: JSON.stringify(data),
       backgroundColor: stage === highlightedStage ? 'lightblue' : null,
       relatedStage: parentStage,
-      children: children
-        ? children.map(node => processNode(node, stage)).filter(child => child 
!== null)
-        : [],
+      children: childrenNodes
     });
   }
   stats.children.forEach((node) => visitStage(node));
@@ -149,7 +191,7 @@ const generateFlameGraphData = (stats, highlightedStage : 
Number = null, mode :
   return {
     data: {
       name: 'All stages',
-      value: stages.reduce((sum, child) => sum + child.value, 1),
+      value: Math.max(1, stages.reduce((sum, child) => sum + child.value, 0)),
       children: stages,
     }
   };
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 73ea56434de..db47b3997c7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -78,10 +78,8 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.function.scalar.StringFunctions.*;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
+
 
 public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private static final String SCHEMA_FILE_NAME = 
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
@@ -2120,7 +2118,8 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     return postQuery(query, headers);
   }
 
-  private void setupDimensionTable() throws Exception {
+  private void setupDimensionTable()
+      throws Exception {
     // Set up the dimension table for JOIN tests
     Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH);
     addSchema(lookupTableSchema);
@@ -2149,6 +2148,116 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     assertNotNull(response.get("resultTable"), "Should have result table");
   }
 
+  @Test
+  public void testStageStatsPipelineBreaker()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = "select * from mytable "
+          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+      
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerKeepsNumGroupsLimitReached()
+      throws Exception {
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+      String query = ""
+          + "SET numGroupsLimit = 1;"
+          + "SELECT * FROM daysOfWeek "
+          + "WHERE dayid in ("
+          + " SELECT DayOfWeek FROM mytable"
+          + " GROUP BY DayOfWeek"
+          + ")";
+
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+      JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+      
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+      
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+          .describedAs("numGroupsLimitReached should be true even when the 
limit is reached on a pipeline breaker")
+          .isEqualTo(true);
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+    }
+  }
+
+  @Test
+  public void testPipelineBreakerWithoutKeepingStats() {
+    // let's try several times to give helix time to propagate the config 
change
+    String errorMsg = "Failed to verify absence of pipeline breaker stats 
after multiple attempts after 10 attempts";
+    TestUtils.waitForCondition(() -> {
+      String query = "select * from mytable "
+          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+      JsonNode response = postQuery(query);
+      assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+      JsonNode receiveNode = response.get("stageStats");
+      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+      JsonNode sendNode = receiveNode.get("children").get(0);
+      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+      JsonNode mytableLeaf = sendNode.get("children").get(0);
+      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+      Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker 
stats are not kept, "
+          + "there should be no children under the leaf node");
+      return true;
+    }, 100, 10_000L, errorMsg, true, Duration.ofSeconds(1));
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
index bfc7a4a7d46..dfc042f755d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
@@ -21,10 +21,13 @@ package org.apache.pinot.query.runtime;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.BasePlanNode;
@@ -71,6 +74,27 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, InSt
   }
 
   private ObjectNode selfNode(MultiStageOperator.Type type, Context context, 
int index, JsonNode[] childrenArr) {
+    return selfNode(type, context, index, childrenArr, true);
+  }
+
+  /**
+   * Builds the JSON node for the current operator including its statistics 
and children.
+   *
+   * @param type The type of the operator.
+   * @param context The context containing parallelism information.
+   * @param index The index of the operator in the stage stats.
+   * @param childrenArr The array of child JSON nodes.
+   * @param adjustWithChildren Whether cumulative stats like execution time 
and memory allocation should be adjusted
+   *                           by subtracting the children's stats. This is 
usually true, except in cases like pipeline
+   *                           breakers
+   * @return The constructed JSON node representing the operator and its 
statistics, including children.
+   */
+  private ObjectNode selfNode(
+      MultiStageOperator.Type type,
+      Context context,
+      int index,
+      JsonNode[] childrenArr,
+      boolean adjustWithChildren) {
     ObjectNode json = JsonUtils.newObjectNode();
     json.put("type", type.toString());
     for (Map.Entry<String, JsonNode> entry : 
_stageStats.getOperatorStats(index).asJson().properties()) {
@@ -81,9 +105,10 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, InSt
       json.put("parallelism", context._parallelism);
     }
 
-    addClockTimeMs(type, json, childrenArr, context);
-    addSelfAllocatedBytes(type, json, childrenArr, context);
-    addSelfGcTime(type, json, childrenArr, context);
+    JsonNode[] childrenArrForStats = adjustWithChildren ? childrenArr : new 
JsonNode[0];
+    addClockTimeMs(type, json, childrenArrForStats, context);
+    addSelfAllocatedBytes(type, json, childrenArrForStats, context);
+    addSelfGcTime(type, json, childrenArrForStats, context);
 
     if (childrenArr.length > 0) {
       json.set(CHILDREN_KEY, JsonUtils.objectToJsonNode(childrenArr));
@@ -139,6 +164,49 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, InSt
             .sum();
   }
 
+  @Nullable
+  private ObjectNode extractPipelineBreakerResult(BasePlanNode node, Context 
context) {
+    MailboxReceiveNode pipelineBreakerNode = getPipelineBreakerNode(node);
+    if (pipelineBreakerNode == null) {
+      return null;
+    }
+    _index--;
+    return visitMailboxReceive(pipelineBreakerNode, context);
+  }
+
+  @Nullable
+  private MailboxReceiveNode getPipelineBreakerNode(BasePlanNode node) {
+    if (_index == 0) {
+      return null;
+    }
+    MultiStageOperator.Type nextOperatorType = 
_stageStats.getOperatorType(_index - 1);
+    if (nextOperatorType != MultiStageOperator.Type.PIPELINE_BREAKER) {
+      // even if the plan may say there is a pipeline breaker, the stats do 
not have it
+      return null;
+    }
+    // This code assumes there is a single pipeline breaker in the stage, 
which is true for now.
+    ArrayList<PlanNode> nodeStack = new ArrayList<>(1);
+    nodeStack.add(node);
+    while (!nodeStack.isEmpty()) {
+      PlanNode currentNode = nodeStack.remove(nodeStack.size() - 1);
+      if (currentNode instanceof JoinNode) {
+        JoinNode joinNode = (JoinNode) currentNode;
+        if (joinNode.getInputs().size() > 1 && 
isPipelineBreakerNode(joinNode)) {
+          PlanNode planNode = joinNode.getInputs().get(1);
+          if (planNode instanceof MailboxReceiveNode) {
+            return (MailboxReceiveNode) planNode;
+          }
+        }
+      }
+      nodeStack.addAll(currentNode.getInputs());
+    }
+    return null;
+  }
+
+  private boolean isPipelineBreakerNode(JoinNode joinNode) {
+    return joinNode.getJoinType() == JoinRelType.SEMI || 
joinNode.getJoinType() == JoinRelType.ANTI;
+  }
+
   private ObjectNode recursiveCase(BasePlanNode node, MultiStageOperator.Type 
expectedType, Context context) {
     MultiStageOperator.Type type = _stageStats.getOperatorType(_index);
     /*
@@ -150,7 +218,13 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, InSt
     */
     if (type != expectedType) {
       if (type == MultiStageOperator.Type.LEAF) {
-        return selfNode(MultiStageOperator.Type.LEAF, context);
+        int selfIndex = _index;
+        ObjectNode pipelineBreakerResultNode = 
extractPipelineBreakerResult(node, context);
+        if (pipelineBreakerResultNode != null) {
+          return selfNode(
+              MultiStageOperator.Type.LEAF, context, selfIndex, new JsonNode[] 
{pipelineBreakerResultNode}, false);
+        }
+        return selfNode(MultiStageOperator.Type.LEAF, context, _index, new 
JsonNode[0]);
       }
       List<PlanNode> inputs = node.getInputs();
       int childrenSize = inputs.size();
@@ -188,8 +262,7 @@ public class InStageStatsTreeBuilder implements 
PlanNodeVisitor<ObjectNode, InSt
       childrenArr[i] = child;
     }
 
-    ObjectNode json = selfNode(type, context, selfIndex, childrenArr);
-    return json;
+    return selfNode(type, context, selfIndex, childrenArr);
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 54882329a82..d19fa564f1a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -140,13 +140,14 @@ public class QueryRunner {
   @Nullable
   private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor;
   private BooleanSupplier _sendStats;
+  private BooleanSupplier _keepPipelineBreakerStats;
 
   /**
    * Initializes the query executor.
    * <p>Should be called only once and before calling any other method.
    */
   public void init(PinotConfiguration serverConf, InstanceDataManager 
instanceDataManager,
-      @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats) {
+      @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, 
BooleanSupplier keepPipelineBreakerStats) {
     String hostname = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
       hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -236,6 +237,7 @@ public class QueryRunner {
     }
 
     _sendStats = sendStats;
+    _keepPipelineBreakerStats = keepPipelineBreakerStats;
 
     LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", 
hostname, port);
   }
@@ -273,9 +275,9 @@ public class QueryRunner {
     Map<String, String> opChainMetadata = 
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
 
     // run pre-stage execution for all pipeline breakers
-    PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, 
_mailboxService,
-            workerMetadata, stagePlan, opChainMetadata, 
_sendStats.getAsBoolean());
+    PipelineBreakerResult pipelineBreakerResult = 
PipelineBreakerExecutor.executePipelineBreakers(
+        _opChainScheduler, _mailboxService, workerMetadata, stagePlan, 
opChainMetadata,
+        _sendStats.getAsBoolean(), _keepPipelineBreakerStats.getAsBoolean());
 
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() 
!= null) {
@@ -287,7 +289,7 @@ public class QueryRunner {
     // run OpChain
     OpChainExecutionContext executionContext =
         OpChainExecutionContext.fromQueryContext(_mailboxService, 
opChainMetadata, stageMetadata, workerMetadata,
-            pipelineBreakerResult, _sendStats.getAsBoolean());
+            pipelineBreakerResult, _sendStats.getAsBoolean(), 
_keepPipelineBreakerStats.getAsBoolean());
     try {
       OpChain opChain;
       if (workerMetadata.isLeafStageWorker()) {
@@ -543,7 +545,7 @@ public class QueryRunner {
     // compile OpChain
     OpChainExecutionContext executionContext =
         OpChainExecutionContext.fromQueryContext(_mailboxService, 
opChainMetadata, stageMetadata, workerMetadata, null,
-            false);
+            false, false);
 
     OpChain opChain =
         ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, 
_leafQueryExecutor, _executorService,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 92d64f55bcc..8faf2de2a20 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -60,6 +60,7 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -94,6 +95,8 @@ public class LeafOperator extends MultiStageOperator {
   private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
   private final AtomicReference<ErrorMseBlock> _errorBlock = new 
AtomicReference<>();
   private final ResultsBlockStreamer _resultsBlockStreamer = 
this::addResultsBlock;
+  @Nullable
+  private final MultiStageQueryStats _pipelineBreakerStats;
 
   // Use a limit-sized BlockingQueue to store the results blocks and apply 
back pressure to the single-stage threads
   @VisibleForTesting
@@ -105,6 +108,16 @@ public class LeafOperator extends MultiStageOperator {
 
   public LeafOperator(OpChainExecutionContext context, 
List<ServerQueryRequest> requests, DataSchema dataSchema,
       QueryExecutor queryExecutor, ExecutorService executorService) {
+    this(context, requests, dataSchema, queryExecutor, executorService, null);
+  }
+
+  public LeafOperator(
+      OpChainExecutionContext context,
+      List<ServerQueryRequest> requests,
+      DataSchema dataSchema,
+      QueryExecutor queryExecutor,
+      ExecutorService executorService,
+      @Nullable MultiStageQueryStats pipelineBreakerStats) {
     super(context);
     int numRequests = requests.size();
     Preconditions.checkArgument(numRequests == 1 || numRequests == 2, 
"Expected 1 or 2 requests, got: %s", numRequests);
@@ -118,6 +131,7 @@ public class LeafOperator extends MultiStageOperator {
     Integer maxStreamingPendingBlocks = 
QueryOptionsUtils.getMaxStreamingPendingBlocks(context.getOpChainMetadata());
     _blockingQueue = new ArrayBlockingQueue<>(maxStreamingPendingBlocks != 
null ? maxStreamingPendingBlocks
         : QueryOptionValue.DEFAULT_MAX_STREAMING_PENDING_BLOCKS);
+    _pipelineBreakerStats = pipelineBreakerStats;
   }
 
   public List<ServerQueryRequest> getRequests() {
@@ -151,6 +165,13 @@ public class LeafOperator extends MultiStageOperator {
     return List.of();
   }
 
+  @Override
+  protected MultiStageQueryStats calculateUpstreamStats() {
+    return _pipelineBreakerStats != null
+        ? _pipelineBreakerStats
+        : MultiStageQueryStats.emptyStats(_context.getStageId());
+  }
+
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
@@ -340,7 +361,7 @@ public class LeafOperator extends MultiStageOperator {
           _statMap.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, 
Boolean.parseBoolean(entry.getValue()));
           break;
         case TIME_USED_MS:
-          _statMap.merge(StatKey.EXECUTION_TIME_MS, 
Long.parseLong(entry.getValue()));
+          _statMap.merge(StatKey.SSE_EXECUTION_TIME_MS, 
Long.parseLong(entry.getValue()));
           break;
         case TRACE_INFO:
           LOGGER.debug("Skipping trace info: {}", entry.getValue());
@@ -723,7 +744,11 @@ public class LeafOperator extends MultiStageOperator {
     /**
      * Time spent on GC while this operator or its children in the same stage 
were running.
      */
-    GC_TIME_MS(StatMap.Type.LONG, null);
+    GC_TIME_MS(StatMap.Type.LONG, null),
+    /**
+     * Time spent in single-stage execution engine for this leaf stage.
+     */
+    SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null);
     // IMPORTANT: When adding new StatKeys, make sure to either create the 
same key in BrokerResponseNativeV2.StatKey or
     //  call the constructor that accepts a String as last argument and set it 
to null.
     //  Otherwise the constructor will fail with an IllegalArgumentException 
which will not be caught and will
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 610c12bac8e..e917adb678f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -62,11 +62,13 @@ public class OpChainExecutionContext {
   @Nullable
   private ServerPlanRequestContext _leafStageContext;
   private final boolean _sendStats;
+  private final boolean _keepPipelineBreakerStats;
 
   @VisibleForTesting
   public OpChainExecutionContext(MailboxService mailboxService, long 
requestId, String cid, long activeDeadlineMs,
       long passiveDeadlineMs, String brokerId, Map<String, String> 
opChainMetadata, StageMetadata stageMetadata,
-      WorkerMetadata workerMetadata, @Nullable PipelineBreakerResult 
pipelineBreakerResult, boolean sendStats) {
+      WorkerMetadata workerMetadata, @Nullable PipelineBreakerResult 
pipelineBreakerResult, boolean sendStats,
+      boolean keepPipelineBreakerStats) {
     _mailboxService = mailboxService;
     // TODO: Consider removing info included in QueryExecutionContext
     _requestId = requestId;
@@ -84,24 +86,25 @@ public class OpChainExecutionContext {
     _pipelineBreakerResult = pipelineBreakerResult;
     _traceEnabled = 
Boolean.parseBoolean(opChainMetadata.get(CommonConstants.Broker.Request.TRACE));
     _queryOperatorFactoryProvider = getDefaultQueryOperatorFactoryProvider();
+    _keepPipelineBreakerStats = keepPipelineBreakerStats;
   }
 
   public static OpChainExecutionContext fromQueryContext(MailboxService 
mailboxService,
       Map<String, String> opChainMetadata, StageMetadata stageMetadata, 
WorkerMetadata workerMetadata,
-      @Nullable PipelineBreakerResult pipelineBreakerResult, boolean 
sendStats) {
+      @Nullable PipelineBreakerResult pipelineBreakerResult, boolean 
sendStats, boolean keepPipelineBreakerStats) {
     return fromQueryContext(mailboxService, opChainMetadata, stageMetadata, 
workerMetadata, pipelineBreakerResult,
-        sendStats, QueryThreadContext.get().getExecutionContext());
+        sendStats, keepPipelineBreakerStats, 
QueryThreadContext.get().getExecutionContext());
   }
 
   @VisibleForTesting
   public static OpChainExecutionContext fromQueryContext(MailboxService 
mailboxService,
       Map<String, String> opChainMetadata, StageMetadata stageMetadata, 
WorkerMetadata workerMetadata,
-      @Nullable PipelineBreakerResult pipelineBreakerResult, boolean sendStats,
+      @Nullable PipelineBreakerResult pipelineBreakerResult, boolean 
sendStats, boolean keepPipelineBreakerStats,
       QueryExecutionContext queryExecutionContext) {
     return new OpChainExecutionContext(mailboxService, 
queryExecutionContext.getRequestId(),
         queryExecutionContext.getCid(), 
queryExecutionContext.getActiveDeadlineMs(),
         queryExecutionContext.getPassiveDeadlineMs(), 
queryExecutionContext.getBrokerId(), opChainMetadata,
-        stageMetadata, workerMetadata, pipelineBreakerResult, sendStats);
+        stageMetadata, workerMetadata, pipelineBreakerResult, sendStats, 
keepPipelineBreakerStats);
   }
 
   public MailboxService getMailboxService() {
@@ -200,6 +203,10 @@ public class OpChainExecutionContext {
     return _sendStats;
   }
 
+  public boolean isKeepPipelineBreakerStats() {
+    return _keepPipelineBreakerStats;
+  }
+
   private static QueryOperatorFactoryProvider 
getDefaultQueryOperatorFactoryProvider() {
     // Prefer server context when explicitly configured, otherwise fall back 
to broker, then default.
     Object serverProvider = 
ServerContext.getInstance().getQueryOperatorFactoryProvider();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index 304302fface..6e5da76bc4e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import javax.annotation.Nullable;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.EnrichedJoinNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -61,6 +62,7 @@ import 
org.apache.pinot.query.runtime.operator.set.MinusAllOperator;
 import org.apache.pinot.query.runtime.operator.set.MinusOperator;
 import org.apache.pinot.query.runtime.operator.set.UnionAllOperator;
 import org.apache.pinot.query.runtime.operator.set.UnionOperator;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 
@@ -111,9 +113,17 @@ public class PlanNodeToOpChain {
       MultiStageOperator result;
       if (context.getLeafStageContext() != null && 
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
         ServerPlanRequestContext leafStageContext = 
context.getLeafStageContext();
+        PipelineBreakerResult pipelineBreakerResult = 
context.getPipelineBreakerResult();
+        @Nullable
+        MultiStageQueryStats pipelineBreakerQueryStats;
+        if (context.isKeepPipelineBreakerStats() && pipelineBreakerResult != 
null) {
+          pipelineBreakerQueryStats = 
pipelineBreakerResult.getStageQueryStats();
+        } else {
+          pipelineBreakerQueryStats = null;
+        }
         result = new LeafOperator(context, 
leafStageContext.getServerQueryRequests(),
             leafStageContext.getLeafStageBoundaryNode().getDataSchema(), 
leafStageContext.getLeafQueryExecutor(),
-            leafStageContext.getExecutorService());
+            leafStageContext.getExecutorService(), pipelineBreakerQueryStats);
       } else {
         result = node.visit(this, context);
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 060b6366a69..54a650a61c1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -68,7 +68,7 @@ public class PipelineBreakerExecutor {
   @Nullable
   public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
       MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
-      Map<String, String> opChainMetadata, boolean sendStats) {
+      Map<String, String> opChainMetadata, boolean sendStats, boolean 
keepPipelineBreakerStats) {
     PipelineBreakerContext pipelineBreakerContext = new 
PipelineBreakerContext();
     PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), 
pipelineBreakerContext);
     if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
@@ -78,7 +78,7 @@ public class PipelineBreakerExecutor {
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query 
information
         OpChainExecutionContext opChainExecutionContext =
             OpChainExecutionContext.fromQueryContext(mailboxService, 
opChainMetadata, stagePlan.getStageMetadata(),
-                workerMetadata, null, sendStats);
+                workerMetadata, null, sendStats, keepPipelineBreakerStats);
         return execute(scheduler, pipelineBreakerContext, 
opChainExecutionContext);
       } catch (Exception e) {
         long requestId = 
QueryThreadContext.get().getExecutionContext().getRequestId();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 5732ff7c8d7..548e22ecc69 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -65,9 +65,10 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
   @Override
   public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
-    _statMap.merge(StatKey.EMITTED_ROWS, numRows);
     _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
     _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
+    // This is actually unnecessary given that pipeline breaker does not emit 
any rows upstream.
+    _statMap.merge(StatKey.EMITTED_ROWS, numRows);
   }
 
   @Override
@@ -149,6 +150,13 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
 
   @Override
   public StatMap<StatKey> copyStatMaps() {
+    if (_statMap.getLong(StatKey.EMITTED_ROWS) == 0) {
+      long totalRows = _resultMap.values().stream()
+          .flatMap(List::stream)
+          .mapToLong(block -> ((MseBlock.Data) block).getNumRows())
+          .sum();
+      _statMap.merge(StatKey.EMITTED_ROWS, totalRows);
+    }
     return new StatMap<>(_statMap);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index d38f3f01eb1..dc80478d9cc 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -569,7 +569,7 @@ public class QueryDispatcher {
     StageMetadata stageMetadata = new StageMetadata(0, workerMetadata, 
stagePlan.getCustomProperties());
     OpChainExecutionContext opChainExecutionContext =
         OpChainExecutionContext.fromQueryContext(mailboxService, queryOptions, 
stageMetadata, workerMetadata.get(0),
-            null, true);
+            null, true, true);
 
     PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
     DataSchema sourceSchema = rootNode.getDataSchema();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index d3f59e7bacc..7dbf60c8dd4 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -62,7 +62,7 @@ public class QueryServerEnclosure {
     
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 _queryRunnerPort);
     InstanceDataManager instanceDataManager = 
factory.buildInstanceDataManager();
     _queryRunner = new QueryRunner();
-    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, null, () -> true);
+    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, null, () -> true, () -> true);
   }
 
   public int getPort() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index abda6ceecc4..196e43eb269 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -85,7 +85,7 @@ public class OpChainSchedulerServiceTest {
     when(mailboxService.getPort()).thenReturn(1234);
     WorkerMetadata workerMetadata = new WorkerMetadata(0, Map.of(), Map.of());
     OpChainExecutionContext context = 
OpChainExecutionContext.fromQueryContext(mailboxService, Map.of(),
-        new StageMetadata(0, List.of(workerMetadata), Map.of()), 
workerMetadata, null, true);
+        new StageMetadata(0, List.of(workerMetadata), Map.of()), 
workerMetadata, null, true, true);
     return new OpChain(context, operator);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 8861357f4c6..e0936b6763f 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -176,7 +176,7 @@ public class MailboxSendOperatorTest {
     StageMetadata stageMetadata = new StageMetadata(SENDER_STAGE_ID, 
List.of(workerMetadata), Map.of());
     OpChainExecutionContext context =
         OpChainExecutionContext.fromQueryContext(_mailboxService, Map.of(), 
stageMetadata, workerMetadata, null, true,
-            QueryExecutionContext.forMseTest());
+            true, QueryExecutionContext.forMseTest());
     return new MailboxSendOperator(context, _input, statMap -> _exchange);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 3274f3c4ffa..7e77ecaf780 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -111,7 +111,7 @@ public class OperatorTestUtil {
   public static OpChainExecutionContext getOpChainContext(MailboxService 
mailboxService, long deadlineMs,
       StageMetadata stageMetadata) {
     return new OpChainExecutionContext(mailboxService, 0, "cid", deadlineMs, 
deadlineMs, "brokerId", Map.of(),
-        stageMetadata, stageMetadata.getWorkerMetadataList().get(0), null, 
true);
+        stageMetadata, stageMetadata.getWorkerMetadataList().get(0), null, 
true, true);
   }
 
   public static OpChainExecutionContext getTracingContext() {
@@ -135,7 +135,7 @@ public class OperatorTestUtil {
         new StageMetadata(0, List.of(workerMetadata), 
Map.of(DispatchablePlanFragment.TABLE_NAME_KEY, "testTable"));
     OpChainExecutionContext opChainExecutionContext =
         OpChainExecutionContext.fromQueryContext(mailboxService, 
opChainMetadata, stageMetadata, workerMetadata, null,
-            true, QueryExecutionContext.forMseTest());
+            true, true, QueryExecutionContext.forMseTest());
     opChainExecutionContext.setLeafStageContext(
         new ServerPlanRequestContext(new StagePlan(null, stageMetadata), null, 
null, null));
     return opChainExecutionContext;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 6249391d0c0..d3baa636c94 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -116,7 +116,7 @@ public class PipelineBreakerExecutorTest {
     try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, 
workerInfo,
         ThreadAccountantUtils.getNoOpAccountant())) {
       return PipelineBreakerExecutor.executePipelineBreakers(scheduler, 
mailboxService, workerMetadata, stagePlan,
-          opChainMetadata, true);
+          opChainMetadata, true, true);
     }
   }
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 24fb993e718..88c45b32d8b 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -48,6 +48,7 @@ import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
 import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
 import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.server.worker.WorkerQueryServer;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
@@ -87,6 +88,7 @@ public class ServerInstance {
   public ServerInstance(ServerConf serverConf, String instanceId, HelixManager 
helixManager,
       AccessControlFactory accessControlFactory, @Nullable 
SegmentOperationsThrottler segmentOperationsThrottler,
       ThreadAccountant threadAccountant, SendStatsPredicate sendStatsPredicate,
+      KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate,
       ServerReloadJobStatusCache reloadJobStatusCache)
       throws Exception {
     LOGGER.info("Initializing server instance: {}", instanceId);
@@ -132,7 +134,8 @@ public class ServerInstance {
     if (serverConf.isMultiStageServerEnabled()) {
       LOGGER.info("Initializing Multi-stage query engine");
       _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager,
-          serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null, 
threadAccountant, sendStatsPredicate);
+          serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null, 
threadAccountant, sendStatsPredicate,
+          keepPipelineBreakerStatsPredicate);
     } else {
       _workerQueryServer = null;
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 11c0c9b4587..383984fe9c0 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -749,9 +749,11 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         org.apache.pinot.spi.config.instance.InstanceType.SERVER);
 
     SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
+    KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate =
+        KeepPipelineBreakerStatsPredicate.create(_serverConf);
     _serverInstance =
         new ServerInstance(serverConf, _instanceId, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler,
-            _threadAccountant, sendStatsPredicate, _reloadJobStatusCache);
+            _threadAccountant, sendStatsPredicate, 
keepPipelineBreakerStatsPredicate, _reloadJobStatusCache);
 
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
     instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
@@ -791,6 +793,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       LOGGER.error("Failed to register DefaultClusterConfigChangeHandler as 
the Helix ClusterConfigChangeListener", e);
     }
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);
+    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(keepPipelineBreakerStatsPredicate);
 
     if (sendStatsPredicate.needWatchForInstanceConfigChange()) {
       LOGGER.info("Initializing and registering the SendStatsPredicate");
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
new file mode 100644
index 00000000000..374fa19770d
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KeepPipelineBreakerStatsPredicate implements 
PinotClusterConfigChangeListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KeepPipelineBreakerStatsPredicate.class);
+
+  private volatile boolean _skip;
+
+  public KeepPipelineBreakerStatsPredicate(boolean skip) {
+    _skip = skip;
+  }
+
+  // NOTE: When this method is called, the helix manager is not yet connected.
+  public static KeepPipelineBreakerStatsPredicate create(PinotConfiguration 
serverConf) {
+    boolean skip = serverConf.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_SKIP_PIPELINE_BREAKER_STATS);
+    LOGGER.info("Initialized {} with value: {}",
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, skip);
+    return new KeepPipelineBreakerStatsPredicate(skip);
+  }
+
+  public boolean isEnabled() {
+    return !_skip;
+  }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    String key = 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS;
+    if (!changedConfigs.contains(key)) {
+      LOGGER.debug("No change for key: {}, keeping its value as {}", key, 
_skip);
+      return;
+    }
+    String value = clusterConfigs.get(key);
+    if (value == null || value.isEmpty()) {
+      LOGGER.info("Empty or null value for key: {}, reset to default: {}",
+          key,
+          
CommonConstants.MultiStageQueryRunner.DEFAULT_SKIP_PIPELINE_BREAKER_STATS);
+      _skip = 
CommonConstants.MultiStageQueryRunner.DEFAULT_SKIP_PIPELINE_BREAKER_STATS;
+    } else {
+      boolean oldSkip = _skip;
+      String valueStr = value.trim();
+      _skip = Boolean.parseBoolean(valueStr);
+      if (oldSkip != _skip) {
+        LOGGER.info("Updated {} from: {} to: {}, parsed as {}", key, oldSkip, 
valueStr, _skip);
+      } else {
+        LOGGER.info("{} kept as {}", key, value);
+      }
+    }
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index f427dfbe2bb..4c343416429 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -23,6 +23,7 @@ import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.service.server.QueryServer;
+import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
 import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -37,12 +38,14 @@ public class WorkerQueryServer {
   private final QueryServer _queryWorkerService;
 
   public WorkerQueryServer(PinotConfiguration serverConf, InstanceDataManager 
instanceDataManager,
-      @Nullable TlsConfig tlsConfig, ThreadAccountant threadAccountant, 
SendStatsPredicate sendStats) {
+      @Nullable TlsConfig tlsConfig, ThreadAccountant threadAccountant, 
SendStatsPredicate sendStats,
+      KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate) {
     serverConf = toWorkerQueryConfig(serverConf);
     _queryServicePort = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
         MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     QueryRunner queryRunner = new QueryRunner();
-    queryRunner.init(serverConf, instanceDataManager, tlsConfig, 
sendStats::isSendStats);
+    queryRunner.init(serverConf, instanceDataManager, tlsConfig, 
sendStats::isSendStats,
+        keepPipelineBreakerStatsPredicate::isEnabled);
     _queryWorkerService =
         new QueryServer(serverConf, instanceDataManager.getInstanceId(), 
_queryServicePort, queryRunner, tlsConfig,
             threadAccountant);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index bc05f2c7720..b76754e50b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2035,6 +2035,14 @@ public class CommonConstants {
     public static final String KEY_OF_SEND_STATS_MODE = 
"pinot.query.mse.stats.mode";
     public static final String DEFAULT_SEND_STATS_MODE = "SAFE";
 
+    /// Used to indicate whether MSE pipeline breaker stats should be included 
in the queryStats field.
+    /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker 
stats were not kept. Starting from 1.5.0,
+    /// they are not included by default but can be included by setting this 
flag to false (upper or lower case).
+    ///
+    /// It is expected that in 1.6.0 and later, MSE pipeline breaker stats 
will be included by default.
+    public static final String KEY_OF_SKIP_PIPELINE_BREAKER_STATS = 
"pinot.query.mse.skip.pipeline.breaker.stats";
+    public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = true;
+
     /// Used to indicate that MSE stats should be logged at INFO level for 
successful queries.
     ///
     /// When an MSE query is executed, the stats are collected and logged.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to