walterddr commented on code in PR #10384:
URL: https://github.com/apache/pinot/pull/10384#discussion_r1128192494
##########
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java:
##########
@@ -275,83 +143,119 @@ public void setStats(@Nullable String rawTableName,
BrokerResponseNative brokerR
}
// Set execution statistics.
- brokerResponseNative.setNumDocsScanned(_numDocsScanned);
-
brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
-
brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
- brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
- brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
- brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
- brokerResponseNative.setTotalDocs(_numTotalDocs);
- brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
- brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
- brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
-
brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
-
brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
-
brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
-
brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
- brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
- brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
-
brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
-
brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
-
brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
-
brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
-
brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
-
brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
- if (_numConsumingSegmentsQueried > 0) {
-
brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
+
brokerResponseNative.setNumDocsScanned(getLongValue(DataTable.MetadataKey.NUM_DOCS_SCANNED));
+ brokerResponseNative.setNumEntriesScannedInFilter(
+ getLongValue(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER));
+ brokerResponseNative.setNumEntriesScannedPostFilter(
+ getLongValue(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER));
+
brokerResponseNative.setNumSegmentsQueried(getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED));
+
brokerResponseNative.setNumSegmentsProcessed(getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED));
+
brokerResponseNative.setNumSegmentsMatched(getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED));
+
brokerResponseNative.setTotalDocs(getLongValue(DataTable.MetadataKey.TOTAL_DOCS));
+ brokerResponseNative.setNumGroupsLimitReached(Boolean.parseBoolean(
+ (String)
_aggregatedStats.getOrDefault(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED,
"false")));
+
+ brokerResponseNative.setNumSegmentsPrunedByServer(
+ getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER));
+
brokerResponseNative.setNumSegmentsPrunedInvalid(getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID));
+
brokerResponseNative.setNumSegmentsPrunedByLimit(getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT));
+
brokerResponseNative.setNumSegmentsPrunedByValue(getLongValue(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE));
+ brokerResponseNative.setExplainPlanNumEmptyFilterSegments(
+
getLongValue(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS));
+ brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(
+
getLongValue(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS));
+
+ brokerResponseNative.setNumConsumingSegmentsQueried(
+ getLongValue(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED));
+ brokerResponseNative.setNumConsumingSegmentsProcessed(
+ getLongValue(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED));
+ brokerResponseNative.setNumConsumingSegmentsMatched(
+ getLongValue(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED));
+
+ if
(_aggregatedStats.containsKey(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS))
{
+ brokerResponseNative.setMinConsumingFreshnessTimeMs(
+ getLongValue(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS));
+ }
+
+ // OFFLINE/REALTIME
+ String tableName = _tableNames.isEmpty() ? rawTableName :
_tableNames.iterator().next();
+ TableType tableType = null;
+ if (tableName != null && tableName.isEmpty()) {
+ tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
}
- if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
-
brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
+
+ if (tableType == TableType.OFFLINE) {
+
brokerResponseNative.setOfflineThreadCpuTimeNs(getLongValue(DataTable.MetadataKey.THREAD_CPU_TIME_NS));
+ brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(
+ getLongValue(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS));
+ brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(
+ getLongValue(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS));
+ brokerResponseNative.setOfflineTotalCpuTimeNs(
+ brokerResponseNative.getOfflineThreadCpuTimeNs() +
brokerResponseNative.getOfflineSystemActivitiesCpuTimeNs()
+ + brokerResponseNative.getOfflineResponseSerializationCpuTimeNs()
+ + brokerResponseNative.getOfflineThreadCpuTimeNs());
+ }
+
+ if (tableType == TableType.REALTIME) {
+
brokerResponseNative.setRealtimeThreadCpuTimeNs(getLongValue(DataTable.MetadataKey.THREAD_CPU_TIME_NS));
+ brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(
+ getLongValue(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS));
+ brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(
+ getLongValue(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS));
+
brokerResponseNative.setRealtimeTotalCpuTimeNs(brokerResponseNative.getRealtimeThreadCpuTimeNs()
+ + brokerResponseNative.getRealtimeSystemActivitiesCpuTimeNs()
+ + brokerResponseNative.getRealtimeResponseSerializationCpuTimeNs());
}
-
brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
-
brokerResponseNative.setNumConsumingSegmentsMatched(_numConsumingSegmentsMatched);
// Update broker metrics.
if (brokerMetrics != null && rawTableName != null) {
- brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
- brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.ENTRIES_SCANNED_IN_FILTER,
- _numEntriesScannedInFilter);
- brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.ENTRIES_SCANNED_POST_FILTER,
- _numEntriesScannedPostFilter);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, _realtimeThreadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
- _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
- _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
- _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
- _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
- TimeUnit.NANOSECONDS);
-
- if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
- brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.FRESHNESS_LAG_MS,
- System.currentTimeMillis() - _minConsumingFreshnessTimeMs,
TimeUnit.MILLISECONDS);
- }
+ addBrokerMetrics(rawTableName, brokerMetrics, brokerResponseNative);
Review Comment:
ah. nice. thanks for the explanation!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]