This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f28525b417 Add operator level stats to response when tracing is
enabled (#10364)
f28525b417 is described below
commit f28525b4176b39c45bf326bd8ecc6234cee1e027
Author: Kartik Khare <[email protected]>
AuthorDate: Mon Mar 6 12:05:42 2023 +0530
Add operator level stats to response when tracing is enabled (#10364)
* Add operator level stats to response when tracing is enabled
* Add tests for operatorStats on tracing
---
.../MultiStageBrokerRequestHandler.java | 6 ++++-
.../response/broker/BrokerResponseNativeV2.java | 2 +-
.../response/broker/BrokerResponseStats.java | 18 +++++++------
.../api/resources/PinotQueryResource.java | 6 ++---
.../query/reduce/ExecutionStatsAggregator.java | 12 ++++++---
.../pinot/query/runtime/QueryRunnerTestBase.java | 4 +--
.../runtime/queries/ResourceBasedQueriesTest.java | 30 ++++++++++++++++++----
7 files changed, 54 insertions(+), 24 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 17f018f047..9ebc6ea6b4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -169,10 +169,14 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e));
}
+ boolean traceEnabled = Boolean.parseBoolean(
+ request.has(CommonConstants.Broker.Request.TRACE) ?
request.get(CommonConstants.Broker.Request.TRACE).asText()
+ : "false");
+
ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) {
- stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(false));
+ stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
}
try {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 79605773d7..5bf631e129 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -81,7 +81,7 @@ public class BrokerResponseNativeV2 extends
BrokerResponseNative {
}
public void addStageStat(Integer stageId, BrokerResponseStats
brokerResponseStats) {
- if (!brokerResponseStats.getOperatorIds().isEmpty()) {
+ if (!brokerResponseStats.getOperatorStats().isEmpty()) {
_stageIdStats.put(stageId, brokerResponseStats);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index 60d7ab4813..83cbd16f3c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -38,14 +40,14 @@ import org.apache.pinot.spi.utils.JsonUtils;
"totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
- "traceInfo", "operatorIds", "tableNames"})
+ "traceInfo", "operatorStats", "tableNames"})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class BrokerResponseStats extends BrokerResponseNative {
private int _numBlocks = 0;
private int _numRows = 0;
private long _stageExecutionTimeMs = 0;
- private List<String> _operatorIds = new ArrayList<>();
+ private Map<String, Map<String, String>> _operatorStats = new HashMap<>();
private List<String> _tableNames = new ArrayList<>();
@Override
@@ -88,14 +90,14 @@ public class BrokerResponseStats extends
BrokerResponseNative {
return JsonUtils.objectToString(this);
}
- @JsonProperty("operatorIds")
- public List<String> getOperatorIds() {
- return _operatorIds;
+ @JsonProperty("operatorStats")
+ public Map<String, Map<String, String>> getOperatorStats() {
+ return _operatorStats;
}
- @JsonProperty("operatorIds")
- public void setOperatorIds(List<String> operatorIds) {
- _operatorIds = operatorIds;
+ @JsonProperty("operatorStats")
+ public void setOperatorStats(Map<String, Map<String, String>> operatorStats)
{
+ _operatorStats = operatorStats;
}
@JsonProperty("tableNames")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 6dd546c778..856632350e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -138,7 +138,7 @@ public class PinotQueryResource {
if
(Boolean.parseBoolean(options.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
if
(_controllerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
- return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders,
endpointUrl);
+ return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders,
endpointUrl, traceEnabled);
} else {
throw new UnsupportedOperationException("V2 Multi-Stage query engine
not enabled. "
+ "Please see https://docs.pinot.apache.org/ for instruction to
enable V2 engine.");
@@ -161,7 +161,7 @@ public class PinotQueryResource {
}
private String getMultiStageQueryResponse(String query, String queryOptions,
HttpHeaders httpHeaders,
- String endpointUrl) {
+ String endpointUrl, String traceEnabled) {
// Validate data access
// we don't have a cross table access control rule so only ADMIN can make
request to multi-stage engine.
@@ -185,7 +185,7 @@ public class PinotQueryResource {
// Send query to a random broker.
String instanceId = instanceIds.get(RANDOM.nextInt(instanceIds.size()));
- return sendRequestToBroker(query, instanceId, "false", queryOptions,
httpHeaders);
+ return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions,
httpHeaders);
}
private String getQueryResponse(String query, @Nullable SqlNode sqlNode,
String traceEnabled, String queryOptions,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 889b859c29..2faefee584 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -43,7 +43,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class ExecutionStatsAggregator {
private final List<QueryProcessingException> _processingExceptions = new
ArrayList<>();
- private final List<String> _operatorIds = new ArrayList<>();
+ private final Map<String, Map<String, String>> _operatorStats = new
HashMap<>();
private final Set<String> _tableNames = new HashSet<>();
private final Map<String, String> _traceInfo = new HashMap<>();
private final boolean _enableTrace;
@@ -89,7 +89,7 @@ public class ExecutionStatsAggregator {
public synchronized void aggregate(@Nullable ServerRoutingInstance
routingInstance, Map<String, String> metadata,
Map<Integer, String> exceptions) {
// Reduce on trace info.
- if (_enableTrace) {
+ if (_enableTrace &&
metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) {
_traceInfo.put(routingInstance.getShortName(),
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
}
@@ -116,7 +116,11 @@ public class ExecutionStatsAggregator {
String operatorId =
metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName());
if (operatorId != null) {
- _operatorIds.add(operatorId);
+ if (_enableTrace) {
+ _operatorStats.put(operatorId, metadata);
+ } else {
+ _operatorStats.put(operatorId, new HashMap<>());
+ }
}
// Reduce on exceptions.
@@ -340,7 +344,7 @@ public class ExecutionStatsAggregator {
brokerResponseStats.setNumBlocks(_numBlocks);
brokerResponseStats.setNumRows(_numRows);
brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
- brokerResponseStats.setOperatorIds(_operatorIds);
+ brokerResponseStats.setOperatorStats(_operatorStats);
brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 8c6ed98ee8..9cc6cbc539 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -106,13 +106,13 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
}
}
if (executionStatsAggregatorMap != null) {
- executionStatsAggregatorMap.put(stageId, new
ExecutionStatsAggregator(false));
+ executionStatsAggregatorMap.put(stageId, new
ExecutionStatsAggregator(true));
}
}
Preconditions.checkNotNull(mailboxReceiveOperator);
return QueryDispatcher.toResultTable(
QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator,
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
- executionStatsAggregatorMap, null),
+ executionStatsAggregatorMap, queryPlan),
queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 5b57f59ab0..1169e66780 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.BrokerResponseStats;
@@ -248,11 +249,30 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
for (Integer stageId : stageIdStats.keySet()) {
// check stats only for leaf stage
BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId);
- if (!brokerResponseStats.getTableNames().isEmpty()) {
- Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
- String tableName = brokerResponseStats.getTableNames().get(0);
- Assert.assertNotNull(_tableToSegmentMap.get(tableName));
- Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(),
_tableToSegmentMap.get(tableName).size());
+
+ if (brokerResponseStats.getTableNames().isEmpty()) {
+ continue;
+ }
+
+ String tableName = brokerResponseStats.getTableNames().get(0);
+ Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
+
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == null) {
+ tableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+ }
+
+ Assert.assertNotNull(_tableToSegmentMap.get(tableName));
+ Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(),
_tableToSegmentMap.get(tableName).size());
+
+ Assert.assertFalse(brokerResponseStats.getOperatorStats().isEmpty());
+ Map<String, Map<String, String>> operatorStats =
brokerResponseStats.getOperatorStats();
+ for (Map.Entry<String, Map<String, String>> entry :
operatorStats.entrySet()) {
+ if (entry.getKey().contains("LEAF_STAGE")) {
+
Assert.assertNotNull(entry.getValue().get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName()));
+ } else {
+
Assert.assertNotNull(entry.getValue().get(DataTable.MetadataKey.NUM_BLOCKS.getName()));
+ }
}
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]