This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f15c471a514 Keep stats during pipeline breaker (#17576)
f15c471a514 is described below
commit f15c471a51400aee908b3936c759434dfce37101
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Feb 12 11:42:25 2026 +0100
Keep stats during pipeline breaker (#17576)
---
.../components/Query/FlamegraphQueryStageStats.tsx | 78 ++++++++++----
.../tests/MultiStageEngineIntegrationTest.java | 119 ++++++++++++++++++++-
.../query/runtime/InStageStatsTreeBuilder.java | 85 +++++++++++++--
.../apache/pinot/query/runtime/QueryRunner.java | 14 +--
.../pinot/query/runtime/operator/LeafOperator.java | 29 ++++-
.../runtime/plan/OpChainExecutionContext.java | 17 ++-
.../query/runtime/plan/PlanNodeToOpChain.java | 12 ++-
.../plan/pipeline/PipelineBreakerExecutor.java | 4 +-
.../plan/pipeline/PipelineBreakerOperator.java | 10 +-
.../query/service/dispatch/QueryDispatcher.java | 2 +-
.../apache/pinot/query/QueryServerEnclosure.java | 2 +-
.../executor/OpChainSchedulerServiceTest.java | 2 +-
.../runtime/operator/MailboxSendOperatorTest.java | 2 +-
.../query/runtime/operator/OperatorTestUtil.java | 4 +-
.../plan/pipeline/PipelineBreakerExecutorTest.java | 2 +-
.../pinot/server/starter/ServerInstance.java | 5 +-
.../server/starter/helix/BaseServerStarter.java | 5 +-
.../helix/KeepPipelineBreakerStatsPredicate.java | 77 +++++++++++++
.../pinot/server/worker/WorkerQueryServer.java | 7 +-
.../apache/pinot/spi/utils/CommonConstants.java | 8 ++
20 files changed, 427 insertions(+), 57 deletions(-)
diff --git
a/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
b/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
index 85fd0b84679..05293372a91 100644
---
a/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
+++
b/pinot-controller/src/main/resources/app/components/Query/FlamegraphQueryStageStats.tsx
@@ -99,48 +99,90 @@ const generateFlameGraphData = (stats, highlightedStage :
Number = null, mode :
const stages = [];
- const processNode = (node, currentStage) => {
- const { children, ...data } = node;
+ const processNode = (node, currentStage, onPipelineBreaker: boolean = false)
=> {
+ let { children, ...data } = node;
const baseNode = {
tooltip: JSON.stringify(data),
value: getNodeValue(data),
backgroundColor: highlightedStage === currentStage ? 'lightblue' : null,
}
- // If it's a MAILBOX_RECEIVE node, prune the tree here
- if (data.type === "MAILBOX_RECEIVE") {
- const sendOperator = children[0];
- visitStage(sendOperator, currentStage);
- return {
- name: `MAILBOX_RECEIVE from stage ${sendOperator.stage || 'unknown'}`,
- relatedStage: sendOperator.stage || null,
- ...baseNode,
- };
+ let name: String;
+ switch (data.type) {
+ case "LEAF": {
+ name = `LEAF (${data.table || ''})`;
+ if (children && children.length !== 0) {
+ // We don't want to include the children in this stage because its
time is independent of the leaf node.
+ children = [];
+ }
+ break;
+ }
+ case "MAILBOX_RECEIVE": {
+ // If it's a MAILBOX_RECEIVE node, prune the tree here
+ const sendOperator = children[0];
+ visitStage(sendOperator, currentStage);
+ const prefix = onPipelineBreaker ? "PIPELINE_BREAKER " :
"MAILBOX_RECEIVE";
+ return {
+ name: `${prefix} from stage ${sendOperator.stage || 'unknown'}`,
+ relatedStage: sendOperator.stage || null, ...baseNode,
+ };
+ }
+ default: {
+ name = data.type || "Unknown Type";
+ }
}
// For other nodes, continue processing children
return {
- name: data.type || "Unknown Type",
+ name: name,
...baseNode,
children: children
- ? children.map(node => processNode(node, currentStage))
+ ? children.map(node => processNode(node, currentStage,
onPipelineBreaker))
.filter(child => child !== null) : [],
};
}
+ const getPipelineBreakerNode = (children, currentStage: number) => {
+ if (!children || children.length === 0) return null;
+ const stack = [...children];
+ while (stack.length > 0) {
+ const node = stack.pop();
+ if (node.type === "LEAF" && node.children && node.children.length > 0) {
+ return processNode(node.children[0], currentStage, true);
+ }
+ if (node.type == "MAILBOX_RECEIVE") {
+ // We don't want to go past MAILBOX_RECEIVE nodes, as their children
belong to other stages.
+ continue;
+ }
+ if (node.children && node.children.length > 0) {
+ stack.push(...node.children);
+ }
+ }
+ return null;
+ }
+
const visitStage = (node, parentStage = null) => {
const { children, ...data } = node;
const stage = data.stage || 0;
- const value = getNodeValue(node);
+ const pipelineBreakerNode = getPipelineBreakerNode(children, stage);
+ let value: number;
+ const childrenNodes = children
+ ? children.map(node => processNode(node, stage)).filter(child => child
!== null)
+ : [];
+ if (pipelineBreakerNode) {
+ value = getNodeValue(node) + (pipelineBreakerNode.value || 0);
+ childrenNodes.push(pipelineBreakerNode);
+ } else {
+ value = getNodeValue(node)
+ }
+
stages.push({
name: "Stage " + stage,
value: value,
tooltip: JSON.stringify(data),
backgroundColor: stage === highlightedStage ? 'lightblue' : null,
relatedStage: parentStage,
- children: children
- ? children.map(node => processNode(node, stage)).filter(child => child
!== null)
- : [],
+ children: childrenNodes
});
}
stats.children.forEach((node) => visitStage(node));
@@ -149,7 +191,7 @@ const generateFlameGraphData = (stats, highlightedStage :
Number = null, mode :
return {
data: {
name: 'All stages',
- value: stages.reduce((sum, child) => sum + child.value, 1),
+ value: Math.max(1, stages.reduce((sum, child) => sum + child.value, 0)),
children: stages,
}
};
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 73ea56434de..db47b3997c7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -78,10 +78,8 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.apache.pinot.common.function.scalar.StringFunctions.*;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
+
public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestSet {
private static final String SCHEMA_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
@@ -2120,7 +2118,8 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
return postQuery(query, headers);
}
- private void setupDimensionTable() throws Exception {
+ private void setupDimensionTable()
+ throws Exception {
// Set up the dimension table for JOIN tests
Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH);
addSchema(lookupTableSchema);
@@ -2149,6 +2148,116 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
assertNotNull(response.get("resultTable"), "Should have result table");
}
+ @Test
+ public void testStageStatsPipelineBreaker()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
+
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerKeepsNumGroupsLimitReached()
+ throws Exception {
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ try {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"false");
+ String query = ""
+ + "SET numGroupsLimit = 1;"
+ + "SELECT * FROM daysOfWeek "
+ + "WHERE dayid in ("
+ + " SELECT DayOfWeek FROM mytable"
+ + " GROUP BY DayOfWeek"
+ + ")";
+
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
+
+ JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
+
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode pipelineSend = pipelineReceive.get("children").get(0);
+
Assertions.assertThat(pipelineSend.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
+ .describedAs("numGroupsLimitReached should be true even when the
limit is reached on a pipeline breaker")
+ .isEqualTo(true);
+ } finally {
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
"true");
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerWithoutKeepingStats() {
+ // let's try several times to give helix time to propagate the config
change
+ String errorMsg = "Failed to verify absence of pipeline breaker stats
after multiple attempts after 10 attempts";
+ TestUtils.waitForCondition(() -> {
+ String query = "select * from mytable "
+ + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+ JsonNode response = postQuery(query);
+ assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+ JsonNode receiveNode = response.get("stageStats");
+
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+ JsonNode sendNode = receiveNode.get("children").get(0);
+
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+ JsonNode mytableLeaf = sendNode.get("children").get(0);
+
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+ Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker
stats are not kept, "
+ + "there should be no children under the leaf node");
+ return true;
+ }, 100, 10_000L, errorMsg, true, Duration.ofSeconds(1));
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
index bfc7a4a7d46..dfc042f755d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java
@@ -21,10 +21,13 @@ package org.apache.pinot.query.runtime;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.IntFunction;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.BasePlanNode;
@@ -71,6 +74,27 @@ public class InStageStatsTreeBuilder implements
PlanNodeVisitor<ObjectNode, InSt
}
private ObjectNode selfNode(MultiStageOperator.Type type, Context context,
int index, JsonNode[] childrenArr) {
+ return selfNode(type, context, index, childrenArr, true);
+ }
+
+ /**
+ * Builds the JSON node for the current operator including its statistics
and children.
+ *
+ * @param type The type of the operator.
+ * @param context The context containing parallelism information.
+ * @param index The index of the operator in the stage stats.
+ * @param childrenArr The array of child JSON nodes.
+ * @param adjustWithChildren Whether cumulative stats like execution time
and memory allocation should be adjusted
+ * by subtracting the children's stats. This is
usually true, except in cases like pipeline
+ * breakers
+ * @return The constructed JSON node representing the operator and its
statistics, including children.
+ */
+ private ObjectNode selfNode(
+ MultiStageOperator.Type type,
+ Context context,
+ int index,
+ JsonNode[] childrenArr,
+ boolean adjustWithChildren) {
ObjectNode json = JsonUtils.newObjectNode();
json.put("type", type.toString());
for (Map.Entry<String, JsonNode> entry :
_stageStats.getOperatorStats(index).asJson().properties()) {
@@ -81,9 +105,10 @@ public class InStageStatsTreeBuilder implements
PlanNodeVisitor<ObjectNode, InSt
json.put("parallelism", context._parallelism);
}
- addClockTimeMs(type, json, childrenArr, context);
- addSelfAllocatedBytes(type, json, childrenArr, context);
- addSelfGcTime(type, json, childrenArr, context);
+ JsonNode[] childrenArrForStats = adjustWithChildren ? childrenArr : new
JsonNode[0];
+ addClockTimeMs(type, json, childrenArrForStats, context);
+ addSelfAllocatedBytes(type, json, childrenArrForStats, context);
+ addSelfGcTime(type, json, childrenArrForStats, context);
if (childrenArr.length > 0) {
json.set(CHILDREN_KEY, JsonUtils.objectToJsonNode(childrenArr));
@@ -139,6 +164,49 @@ public class InStageStatsTreeBuilder implements
PlanNodeVisitor<ObjectNode, InSt
.sum();
}
+ @Nullable
+ private ObjectNode extractPipelineBreakerResult(BasePlanNode node, Context
context) {
+ MailboxReceiveNode pipelineBreakerNode = getPipelineBreakerNode(node);
+ if (pipelineBreakerNode == null) {
+ return null;
+ }
+ _index--;
+ return visitMailboxReceive(pipelineBreakerNode, context);
+ }
+
+ @Nullable
+ private MailboxReceiveNode getPipelineBreakerNode(BasePlanNode node) {
+ if (_index == 0) {
+ return null;
+ }
+ MultiStageOperator.Type nextOperatorType =
_stageStats.getOperatorType(_index - 1);
+ if (nextOperatorType != MultiStageOperator.Type.PIPELINE_BREAKER) {
+ // even if the plan may say there is a pipeline breaker, the stats do
not have it
+ return null;
+ }
+ // This code assumes there is a single pipeline breaker in the stage,
which is true for now.
+ ArrayList<PlanNode> nodeStack = new ArrayList<>(1);
+ nodeStack.add(node);
+ while (!nodeStack.isEmpty()) {
+ PlanNode currentNode = nodeStack.remove(nodeStack.size() - 1);
+ if (currentNode instanceof JoinNode) {
+ JoinNode joinNode = (JoinNode) currentNode;
+ if (joinNode.getInputs().size() > 1 &&
isPipelineBreakerNode(joinNode)) {
+ PlanNode planNode = joinNode.getInputs().get(1);
+ if (planNode instanceof MailboxReceiveNode) {
+ return (MailboxReceiveNode) planNode;
+ }
+ }
+ }
+ nodeStack.addAll(currentNode.getInputs());
+ }
+ return null;
+ }
+
+ private boolean isPipelineBreakerNode(JoinNode joinNode) {
+ return joinNode.getJoinType() == JoinRelType.SEMI ||
joinNode.getJoinType() == JoinRelType.ANTI;
+ }
+
private ObjectNode recursiveCase(BasePlanNode node, MultiStageOperator.Type
expectedType, Context context) {
MultiStageOperator.Type type = _stageStats.getOperatorType(_index);
/*
@@ -150,7 +218,13 @@ public class InStageStatsTreeBuilder implements
PlanNodeVisitor<ObjectNode, InSt
*/
if (type != expectedType) {
if (type == MultiStageOperator.Type.LEAF) {
- return selfNode(MultiStageOperator.Type.LEAF, context);
+ int selfIndex = _index;
+ ObjectNode pipelineBreakerResultNode =
extractPipelineBreakerResult(node, context);
+ if (pipelineBreakerResultNode != null) {
+ return selfNode(
+ MultiStageOperator.Type.LEAF, context, selfIndex, new JsonNode[]
{pipelineBreakerResultNode}, false);
+ }
+ return selfNode(MultiStageOperator.Type.LEAF, context, _index, new
JsonNode[0]);
}
List<PlanNode> inputs = node.getInputs();
int childrenSize = inputs.size();
@@ -188,8 +262,7 @@ public class InStageStatsTreeBuilder implements
PlanNodeVisitor<ObjectNode, InSt
childrenArr[i] = child;
}
- ObjectNode json = selfNode(type, context, selfIndex, childrenArr);
- return json;
+ return selfNode(type, context, selfIndex, childrenArr);
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 54882329a82..d19fa564f1a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -140,13 +140,14 @@ public class QueryRunner {
@Nullable
private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor;
private BooleanSupplier _sendStats;
+ private BooleanSupplier _keepPipelineBreakerStats;
/**
* Initializes the query executor.
* <p>Should be called only once and before calling any other method.
*/
public void init(PinotConfiguration serverConf, InstanceDataManager
instanceDataManager,
- @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats) {
+ @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats,
BooleanSupplier keepPipelineBreakerStats) {
String hostname =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -236,6 +237,7 @@ public class QueryRunner {
}
_sendStats = sendStats;
+ _keepPipelineBreakerStats = keepPipelineBreakerStats;
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}",
hostname, port);
}
@@ -273,9 +275,9 @@ public class QueryRunner {
Map<String, String> opChainMetadata =
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
// run pre-stage execution for all pipeline breakers
- PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler,
_mailboxService,
- workerMetadata, stagePlan, opChainMetadata,
_sendStats.getAsBoolean());
+ PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(
+ _opChainScheduler, _mailboxService, workerMetadata, stagePlan,
opChainMetadata,
+ _sendStats.getAsBoolean(), _keepPipelineBreakerStats.getAsBoolean());
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
@@ -287,7 +289,7 @@ public class QueryRunner {
// run OpChain
OpChainExecutionContext executionContext =
OpChainExecutionContext.fromQueryContext(_mailboxService,
opChainMetadata, stageMetadata, workerMetadata,
- pipelineBreakerResult, _sendStats.getAsBoolean());
+ pipelineBreakerResult, _sendStats.getAsBoolean(),
_keepPipelineBreakerStats.getAsBoolean());
try {
OpChain opChain;
if (workerMetadata.isLeafStageWorker()) {
@@ -543,7 +545,7 @@ public class QueryRunner {
// compile OpChain
OpChainExecutionContext executionContext =
OpChainExecutionContext.fromQueryContext(_mailboxService,
opChainMetadata, stageMetadata, workerMetadata, null,
- false);
+ false, false);
OpChain opChain =
ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan,
_leafQueryExecutor, _executorService,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 92d64f55bcc..8faf2de2a20 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -60,6 +60,7 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -94,6 +95,8 @@ public class LeafOperator extends MultiStageOperator {
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
private final AtomicReference<ErrorMseBlock> _errorBlock = new
AtomicReference<>();
private final ResultsBlockStreamer _resultsBlockStreamer =
this::addResultsBlock;
+ @Nullable
+ private final MultiStageQueryStats _pipelineBreakerStats;
// Use a limit-sized BlockingQueue to store the results blocks and apply
back pressure to the single-stage threads
@VisibleForTesting
@@ -105,6 +108,16 @@ public class LeafOperator extends MultiStageOperator {
public LeafOperator(OpChainExecutionContext context,
List<ServerQueryRequest> requests, DataSchema dataSchema,
QueryExecutor queryExecutor, ExecutorService executorService) {
+ this(context, requests, dataSchema, queryExecutor, executorService, null);
+ }
+
+ public LeafOperator(
+ OpChainExecutionContext context,
+ List<ServerQueryRequest> requests,
+ DataSchema dataSchema,
+ QueryExecutor queryExecutor,
+ ExecutorService executorService,
+ @Nullable MultiStageQueryStats pipelineBreakerStats) {
super(context);
int numRequests = requests.size();
Preconditions.checkArgument(numRequests == 1 || numRequests == 2,
"Expected 1 or 2 requests, got: %s", numRequests);
@@ -118,6 +131,7 @@ public class LeafOperator extends MultiStageOperator {
Integer maxStreamingPendingBlocks =
QueryOptionsUtils.getMaxStreamingPendingBlocks(context.getOpChainMetadata());
_blockingQueue = new ArrayBlockingQueue<>(maxStreamingPendingBlocks !=
null ? maxStreamingPendingBlocks
: QueryOptionValue.DEFAULT_MAX_STREAMING_PENDING_BLOCKS);
+ _pipelineBreakerStats = pipelineBreakerStats;
}
public List<ServerQueryRequest> getRequests() {
@@ -151,6 +165,13 @@ public class LeafOperator extends MultiStageOperator {
return List.of();
}
+ @Override
+ protected MultiStageQueryStats calculateUpstreamStats() {
+ return _pipelineBreakerStats != null
+ ? _pipelineBreakerStats
+ : MultiStageQueryStats.emptyStats(_context.getStageId());
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
@@ -340,7 +361,7 @@ public class LeafOperator extends MultiStageOperator {
_statMap.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED,
Boolean.parseBoolean(entry.getValue()));
break;
case TIME_USED_MS:
- _statMap.merge(StatKey.EXECUTION_TIME_MS,
Long.parseLong(entry.getValue()));
+ _statMap.merge(StatKey.SSE_EXECUTION_TIME_MS,
Long.parseLong(entry.getValue()));
break;
case TRACE_INFO:
LOGGER.debug("Skipping trace info: {}", entry.getValue());
@@ -723,7 +744,11 @@ public class LeafOperator extends MultiStageOperator {
/**
* Time spent on GC while this operator or its children in the same stage
were running.
*/
- GC_TIME_MS(StatMap.Type.LONG, null);
+ GC_TIME_MS(StatMap.Type.LONG, null),
+ /**
+ * Time spent in single-stage execution engine for this leaf stage.
+ */
+ SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null);
// IMPORTANT: When adding new StatKeys, make sure to either create the
same key in BrokerResponseNativeV2.StatKey or
// call the constructor that accepts a String as last argument and set it
to null.
// Otherwise the constructor will fail with an IllegalArgumentException
which will not be caught and will
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 610c12bac8e..e917adb678f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -62,11 +62,13 @@ public class OpChainExecutionContext {
@Nullable
private ServerPlanRequestContext _leafStageContext;
private final boolean _sendStats;
+ private final boolean _keepPipelineBreakerStats;
@VisibleForTesting
public OpChainExecutionContext(MailboxService mailboxService, long
requestId, String cid, long activeDeadlineMs,
long passiveDeadlineMs, String brokerId, Map<String, String>
opChainMetadata, StageMetadata stageMetadata,
- WorkerMetadata workerMetadata, @Nullable PipelineBreakerResult
pipelineBreakerResult, boolean sendStats) {
+ WorkerMetadata workerMetadata, @Nullable PipelineBreakerResult
pipelineBreakerResult, boolean sendStats,
+ boolean keepPipelineBreakerStats) {
_mailboxService = mailboxService;
// TODO: Consider removing info included in QueryExecutionContext
_requestId = requestId;
@@ -84,24 +86,25 @@ public class OpChainExecutionContext {
_pipelineBreakerResult = pipelineBreakerResult;
_traceEnabled =
Boolean.parseBoolean(opChainMetadata.get(CommonConstants.Broker.Request.TRACE));
_queryOperatorFactoryProvider = getDefaultQueryOperatorFactoryProvider();
+ _keepPipelineBreakerStats = keepPipelineBreakerStats;
}
public static OpChainExecutionContext fromQueryContext(MailboxService
mailboxService,
Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
- @Nullable PipelineBreakerResult pipelineBreakerResult, boolean
sendStats) {
+ @Nullable PipelineBreakerResult pipelineBreakerResult, boolean
sendStats, boolean keepPipelineBreakerStats) {
return fromQueryContext(mailboxService, opChainMetadata, stageMetadata,
workerMetadata, pipelineBreakerResult,
- sendStats, QueryThreadContext.get().getExecutionContext());
+ sendStats, keepPipelineBreakerStats,
QueryThreadContext.get().getExecutionContext());
}
@VisibleForTesting
public static OpChainExecutionContext fromQueryContext(MailboxService
mailboxService,
Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
- @Nullable PipelineBreakerResult pipelineBreakerResult, boolean sendStats,
+ @Nullable PipelineBreakerResult pipelineBreakerResult, boolean
sendStats, boolean keepPipelineBreakerStats,
QueryExecutionContext queryExecutionContext) {
return new OpChainExecutionContext(mailboxService,
queryExecutionContext.getRequestId(),
queryExecutionContext.getCid(),
queryExecutionContext.getActiveDeadlineMs(),
queryExecutionContext.getPassiveDeadlineMs(),
queryExecutionContext.getBrokerId(), opChainMetadata,
- stageMetadata, workerMetadata, pipelineBreakerResult, sendStats);
+ stageMetadata, workerMetadata, pipelineBreakerResult, sendStats,
keepPipelineBreakerStats);
}
public MailboxService getMailboxService() {
@@ -200,6 +203,10 @@ public class OpChainExecutionContext {
return _sendStats;
}
+ public boolean isKeepPipelineBreakerStats() {
+ return _keepPipelineBreakerStats;
+ }
+
private static QueryOperatorFactoryProvider
getDefaultQueryOperatorFactoryProvider() {
// Prefer server context when explicitly configured, otherwise fall back
to broker, then default.
Object serverProvider =
ServerContext.getInstance().getQueryOperatorFactoryProvider();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index 304302fface..6e5da76bc4e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.annotation.Nullable;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.EnrichedJoinNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -61,6 +62,7 @@ import
org.apache.pinot.query.runtime.operator.set.MinusAllOperator;
import org.apache.pinot.query.runtime.operator.set.MinusOperator;
import org.apache.pinot.query.runtime.operator.set.UnionAllOperator;
import org.apache.pinot.query.runtime.operator.set.UnionOperator;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -111,9 +113,17 @@ public class PlanNodeToOpChain {
MultiStageOperator result;
if (context.getLeafStageContext() != null &&
context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
ServerPlanRequestContext leafStageContext =
context.getLeafStageContext();
+ PipelineBreakerResult pipelineBreakerResult =
context.getPipelineBreakerResult();
+ @Nullable
+ MultiStageQueryStats pipelineBreakerQueryStats;
+ if (context.isKeepPipelineBreakerStats() && pipelineBreakerResult !=
null) {
+ pipelineBreakerQueryStats =
pipelineBreakerResult.getStageQueryStats();
+ } else {
+ pipelineBreakerQueryStats = null;
+ }
result = new LeafOperator(context,
leafStageContext.getServerQueryRequests(),
leafStageContext.getLeafStageBoundaryNode().getDataSchema(),
leafStageContext.getLeafQueryExecutor(),
- leafStageContext.getExecutorService());
+ leafStageContext.getExecutorService(), pipelineBreakerQueryStats);
} else {
result = node.visit(this, context);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 060b6366a69..54a650a61c1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -68,7 +68,7 @@ public class PipelineBreakerExecutor {
@Nullable
public static PipelineBreakerResult
executePipelineBreakers(OpChainSchedulerService scheduler,
MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan
stagePlan,
- Map<String, String> opChainMetadata, boolean sendStats) {
+ Map<String, String> opChainMetadata, boolean sendStats, boolean
keepPipelineBreakerStats) {
PipelineBreakerContext pipelineBreakerContext = new
PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(),
pipelineBreakerContext);
if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
@@ -78,7 +78,7 @@ public class PipelineBreakerExecutor {
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query
information
OpChainExecutionContext opChainExecutionContext =
OpChainExecutionContext.fromQueryContext(mailboxService,
opChainMetadata, stagePlan.getStageMetadata(),
- workerMetadata, null, sendStats);
+ workerMetadata, null, sendStats, keepPipelineBreakerStats);
return execute(scheduler, pipelineBreakerContext,
opChainExecutionContext);
} catch (Exception e) {
long requestId =
QueryThreadContext.get().getExecutionContext().getRequestId();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 5732ff7c8d7..548e22ecc69 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -65,9 +65,10 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
@Override
public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
- _statMap.merge(StatKey.EMITTED_ROWS, numRows);
_statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
_statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
+ // This is actually unnecessary given that pipeline breaker does not emit
any rows upstream.
+ _statMap.merge(StatKey.EMITTED_ROWS, numRows);
}
@Override
@@ -149,6 +150,13 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
@Override
public StatMap<StatKey> copyStatMaps() {
+ if (_statMap.getLong(StatKey.EMITTED_ROWS) == 0) {
+ long totalRows = _resultMap.values().stream()
+ .flatMap(List::stream)
+ .mapToLong(block -> ((MseBlock.Data) block).getNumRows())
+ .sum();
+ _statMap.merge(StatKey.EMITTED_ROWS, totalRows);
+ }
return new StatMap<>(_statMap);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index d38f3f01eb1..dc80478d9cc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -569,7 +569,7 @@ public class QueryDispatcher {
StageMetadata stageMetadata = new StageMetadata(0, workerMetadata,
stagePlan.getCustomProperties());
OpChainExecutionContext opChainExecutionContext =
OpChainExecutionContext.fromQueryContext(mailboxService, queryOptions,
stageMetadata, workerMetadata.get(0),
- null, true);
+ null, true, true);
PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
DataSchema sourceSchema = rootNode.getDataSchema();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index d3f59e7bacc..7dbf60c8dd4 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -62,7 +62,7 @@ public class QueryServerEnclosure {
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
_queryRunnerPort);
InstanceDataManager instanceDataManager =
factory.buildInstanceDataManager();
_queryRunner = new QueryRunner();
- _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, null, () -> true);
+ _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, null, () -> true, () -> true);
}
public int getPort() {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index abda6ceecc4..196e43eb269 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -85,7 +85,7 @@ public class OpChainSchedulerServiceTest {
when(mailboxService.getPort()).thenReturn(1234);
WorkerMetadata workerMetadata = new WorkerMetadata(0, Map.of(), Map.of());
OpChainExecutionContext context =
OpChainExecutionContext.fromQueryContext(mailboxService, Map.of(),
- new StageMetadata(0, List.of(workerMetadata), Map.of()),
workerMetadata, null, true);
+ new StageMetadata(0, List.of(workerMetadata), Map.of()),
workerMetadata, null, true, true);
return new OpChain(context, operator);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 8861357f4c6..e0936b6763f 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -176,7 +176,7 @@ public class MailboxSendOperatorTest {
StageMetadata stageMetadata = new StageMetadata(SENDER_STAGE_ID,
List.of(workerMetadata), Map.of());
OpChainExecutionContext context =
OpChainExecutionContext.fromQueryContext(_mailboxService, Map.of(),
stageMetadata, workerMetadata, null, true,
- QueryExecutionContext.forMseTest());
+ true, QueryExecutionContext.forMseTest());
return new MailboxSendOperator(context, _input, statMap -> _exchange);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 3274f3c4ffa..7e77ecaf780 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -111,7 +111,7 @@ public class OperatorTestUtil {
public static OpChainExecutionContext getOpChainContext(MailboxService
mailboxService, long deadlineMs,
StageMetadata stageMetadata) {
return new OpChainExecutionContext(mailboxService, 0, "cid", deadlineMs,
deadlineMs, "brokerId", Map.of(),
- stageMetadata, stageMetadata.getWorkerMetadataList().get(0), null,
true);
+ stageMetadata, stageMetadata.getWorkerMetadataList().get(0), null,
true, true);
}
public static OpChainExecutionContext getTracingContext() {
@@ -135,7 +135,7 @@ public class OperatorTestUtil {
new StageMetadata(0, List.of(workerMetadata),
Map.of(DispatchablePlanFragment.TABLE_NAME_KEY, "testTable"));
OpChainExecutionContext opChainExecutionContext =
OpChainExecutionContext.fromQueryContext(mailboxService,
opChainMetadata, stageMetadata, workerMetadata, null,
- true, QueryExecutionContext.forMseTest());
+ true, true, QueryExecutionContext.forMseTest());
opChainExecutionContext.setLeafStageContext(
new ServerPlanRequestContext(new StagePlan(null, stageMetadata), null,
null, null));
return opChainExecutionContext;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 6249391d0c0..d3baa636c94 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -116,7 +116,7 @@ public class PipelineBreakerExecutorTest {
try (QueryThreadContext ignore = QueryThreadContext.open(executionContext,
workerInfo,
ThreadAccountantUtils.getNoOpAccountant())) {
return PipelineBreakerExecutor.executePipelineBreakers(scheduler,
mailboxService, workerMetadata, stagePlan,
- opChainMetadata, true);
+ opChainMetadata, true, true);
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 24fb993e718..88c45b32d8b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -48,6 +48,7 @@ import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.server.access.AccessControlFactory;
import org.apache.pinot.server.access.AllowAllAccessFactory;
import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
import org.apache.pinot.server.starter.helix.SendStatsPredicate;
import org.apache.pinot.server.worker.WorkerQueryServer;
import org.apache.pinot.spi.accounting.ThreadAccountant;
@@ -87,6 +88,7 @@ public class ServerInstance {
public ServerInstance(ServerConf serverConf, String instanceId, HelixManager
helixManager,
AccessControlFactory accessControlFactory, @Nullable
SegmentOperationsThrottler segmentOperationsThrottler,
ThreadAccountant threadAccountant, SendStatsPredicate sendStatsPredicate,
+ KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate,
ServerReloadJobStatusCache reloadJobStatusCache)
throws Exception {
LOGGER.info("Initializing server instance: {}", instanceId);
@@ -132,7 +134,8 @@ public class ServerInstance {
if (serverConf.isMultiStageServerEnabled()) {
LOGGER.info("Initializing Multi-stage query engine");
_workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(),
_instanceDataManager,
- serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null,
threadAccountant, sendStatsPredicate);
+ serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null,
threadAccountant, sendStatsPredicate,
+ keepPipelineBreakerStatsPredicate);
} else {
_workerQueryServer = null;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 11c0c9b4587..383984fe9c0 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -749,9 +749,11 @@ public abstract class BaseServerStarter implements
ServiceStartable {
org.apache.pinot.spi.config.instance.InstanceType.SERVER);
SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
+ KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate =
+ KeepPipelineBreakerStatsPredicate.create(_serverConf);
_serverInstance =
new ServerInstance(serverConf, _instanceId, _helixManager,
_accessControlFactory, _segmentOperationsThrottler,
- _threadAccountant, sendStatsPredicate, _reloadJobStatusCache);
+ _threadAccountant, sendStatsPredicate,
keepPipelineBreakerStatsPredicate, _reloadJobStatusCache);
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
@@ -791,6 +793,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
LOGGER.error("Failed to register DefaultClusterConfigChangeHandler as
the Helix ClusterConfigChangeListener", e);
}
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(keepPipelineBreakerStatsPredicate);
if (sendStatsPredicate.needWatchForInstanceConfigChange()) {
LOGGER.info("Initializing and registering the SendStatsPredicate");
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
new file mode 100644
index 00000000000..374fa19770d
--- /dev/null
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KeepPipelineBreakerStatsPredicate implements
PinotClusterConfigChangeListener {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KeepPipelineBreakerStatsPredicate.class);
+
+ private volatile boolean _skip;
+
+ public KeepPipelineBreakerStatsPredicate(boolean skip) {
+ _skip = skip;
+ }
+
+ // NOTE: When this method is called, the helix manager is not yet connected.
+ public static KeepPipelineBreakerStatsPredicate create(PinotConfiguration
serverConf) {
+ boolean skip = serverConf.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_SKIP_PIPELINE_BREAKER_STATS);
+ LOGGER.info("Initialized {} with value: {}",
+
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, skip);
+ return new KeepPipelineBreakerStatsPredicate(skip);
+ }
+
+ public boolean isEnabled() {
+ return !_skip;
+ }
+
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ String key =
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS;
+ if (!changedConfigs.contains(key)) {
+ LOGGER.debug("No change for key: {}, keeping its value as {}", key,
_skip);
+ return;
+ }
+ String value = clusterConfigs.get(key);
+ if (value == null || value.isEmpty()) {
+ LOGGER.info("Empty or null value for key: {}, reset to default: {}",
+ key,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_SKIP_PIPELINE_BREAKER_STATS);
+ _skip =
CommonConstants.MultiStageQueryRunner.DEFAULT_SKIP_PIPELINE_BREAKER_STATS;
+ } else {
+ boolean oldSkip = _skip;
+ String valueStr = value.trim();
+ _skip = Boolean.parseBoolean(valueStr);
+ if (oldSkip != _skip) {
+ LOGGER.info("Updated {} from: {} to: {}, parsed as {}", key, oldSkip,
valueStr, _skip);
+ } else {
+ LOGGER.info("{} kept as {}", key, value);
+ }
+ }
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index f427dfbe2bb..4c343416429 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -23,6 +23,7 @@ import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.service.server.QueryServer;
+import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
import org.apache.pinot.server.starter.helix.SendStatsPredicate;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -37,12 +38,14 @@ public class WorkerQueryServer {
private final QueryServer _queryWorkerService;
public WorkerQueryServer(PinotConfiguration serverConf, InstanceDataManager
instanceDataManager,
- @Nullable TlsConfig tlsConfig, ThreadAccountant threadAccountant,
SendStatsPredicate sendStats) {
+ @Nullable TlsConfig tlsConfig, ThreadAccountant threadAccountant,
SendStatsPredicate sendStats,
+ KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate) {
serverConf = toWorkerQueryConfig(serverConf);
_queryServicePort =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
QueryRunner queryRunner = new QueryRunner();
- queryRunner.init(serverConf, instanceDataManager, tlsConfig,
sendStats::isSendStats);
+ queryRunner.init(serverConf, instanceDataManager, tlsConfig,
sendStats::isSendStats,
+ keepPipelineBreakerStatsPredicate::isEnabled);
_queryWorkerService =
new QueryServer(serverConf, instanceDataManager.getInstanceId(),
_queryServicePort, queryRunner, tlsConfig,
threadAccountant);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index bc05f2c7720..b76754e50b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2035,6 +2035,14 @@ public class CommonConstants {
public static final String KEY_OF_SEND_STATS_MODE =
"pinot.query.mse.stats.mode";
public static final String DEFAULT_SEND_STATS_MODE = "SAFE";
+ /// Used to indicate whether MSE pipeline breaker stats should be included
in the queryStats field.
+ /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker
stats were not kept. Starting from 1.5.0,
+ /// they are not included by default but can be included by setting this
flag to false (upper or lower case).
+ ///
+ /// It is expected that in 1.6.0 and later, MSE pipeline breaker stats
will be included by default.
+ public static final String KEY_OF_SKIP_PIPELINE_BREAKER_STATS =
"pinot.query.mse.skip.pipeline.breaker.stats";
+ public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = true;
+
/// Used to indicate that MSE stats should be logged at INFO level for
successful queries.
///
/// When an MSE query is executed, the stats are collected and logged.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]