gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1580608897


##########
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java:
##########
@@ -77,20 +100,301 @@ public static BrokerResponseNativeV2 empty() {
     return new BrokerResponseNativeV2();
   }
 
-  public static BrokerResponseNativeV2 fromJsonString(String jsonString)
-      throws IOException {
-    return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class);
+  public void addStageStats(JsonNode stageStats) {
+    ObjectNode node = JsonUtils.newObjectNode();
+    node.put("stage", _stageIdStats.size());
+    node.set("stats", stageStats);
+    _stageIdStats.add(node);
+  }
+
+  @JsonProperty
+  public List<JsonNode> getStageStats() {
+    return _stageIdStats;
+  }
+
+  @JsonProperty
+  public long getMaxRows() {
+    return _maxRows;
+  }
+
+  public void mergeMaxRows(long maxRows) {
+    _maxRows = Math.max(_maxRows, maxRows);
+  }
+
+  @Override
+  public void setTimeUsedMs(long timeUsedMs) {
+    _serverStats.merge(DataTable.MetadataKey.TIME_USED_MS, timeUsedMs);
+  }
+
+  @Override
+  public long getNumDocsScanned() {
+    return _serverStats.getLong(DataTable.MetadataKey.NUM_DOCS_SCANNED);
+  }
+
+  @Override
+  public long getNumEntriesScannedInFilter() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER);
+  }
+
+  @Override
+  public long getNumEntriesScannedPostFilter() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER);
+  }
+
+  @Override
+  public long getNumSegmentsQueried() {
+    return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED);
+  }
+
+  @Override
+  public long getNumSegmentsProcessed() {
+    return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED);
+  }
+
+  @Override
+  public long getNumSegmentsMatched() {
+    return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED);
+  }
+
+  @Override
+  public long getNumConsumingSegmentsQueried() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED);
+  }
+
+  @Override
+  public long getNumConsumingSegmentsProcessed() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED);
+  }
+
+  @Override
+  public long getNumConsumingSegmentsMatched() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED);
+  }
+
+  @Override
+  public long getMinConsumingFreshnessTimeMs() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS);
+  }
+
+  @Override
+  public long getTotalDocs() {
+    return _serverStats.getLong(DataTable.MetadataKey.TOTAL_DOCS);
+  }
+
+  @Override
+  public boolean isNumGroupsLimitReached() {
+    return 
_serverStats.getBoolean(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED);
+  }
+
+  public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) {
+    _serverStats.merge(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED, 
numGroupsLimitReached);
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByServer() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER);
+  }
+
+  @Override
+  public long getNumSegmentsPrunedInvalid() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID);
   }
 
-  public void addStageStat(Integer stageId, BrokerResponseStats 
brokerResponseStats) {
-    // StageExecutionWallTime will always be there, other stats are optional 
such as OperatorStats
-    if (brokerResponseStats.getStageExecWallTimeMs() != -1) {
-      _stageIdStats.put(stageId, brokerResponseStats);
+  @Override
+  public long getNumSegmentsPrunedByLimit() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT);
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByValue() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE);
+  }
+
+  @Override
+  public long getExplainPlanNumEmptyFilterSegments() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS);
+  }
+
+  @Override
+  public long getExplainPlanNumMatchAllFilterSegments() {
+    return 
_serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS);
+  }
+
+  @Override
+  public long getOfflineTotalCpuTimeNs() {
+    return getOfflineThreadCpuTimeNs() + getOfflineSystemActivitiesCpuTimeNs()
+        + getOfflineResponseSerializationCpuTimeNs();
+  }
+
+  @Override
+  public long getRealtimeTotalCpuTimeNs() {
+    return getRealtimeThreadCpuTimeNs() + 
getRealtimeSystemActivitiesCpuTimeNs()
+        + getRealtimeResponseSerializationCpuTimeNs();
+  }
+
+  @Override
+  public void setExceptions(List<ProcessingException> exceptions) {
+    for (ProcessingException exception : exceptions) {
+      _processingExceptions.add(new 
QueryProcessingException(exception.getErrorCode(), exception.getMessage()));
     }
   }
 
-  @JsonProperty("stageStats")
-  public Map<Integer, BrokerResponseStats> getStageIdStats() {
-    return _stageIdStats;
+  public void addToExceptions(QueryProcessingException processingException) {
+    _processingExceptions.add(processingException);
+  }
+
+  @Override
+  public int getNumServersQueried() {
+    return _numServersQueried;
+  }
+
+  @Override
+  public void setNumServersQueried(int numServersQueried) {
+    _numServersQueried = numServersQueried;
+  }
+
+  @Override
+  public int getNumServersResponded() {
+    return _numServersResponded;
+  }
+
+  @Override
+  public void setNumServersResponded(int numServersResponded) {
+    _numServersResponded = numServersResponded;
+  }
+
+  @JsonProperty("maxRowsInJoinReached")
+  public boolean isMaxRowsInJoinReached() {
+    return _maxRowsInJoinReached;
+  }
+
+  @JsonProperty("maxRowsInJoinReached")
+  public void mergeMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
+    _maxRowsInJoinReached |= maxRowsInJoinReached;
+  }
+
+  @Override
+  public int getExceptionsSize() {
+    return _processingExceptions.size();
+  }
+
+  @Override
+  public void setResultTable(ResultTable resultTable) {
+    _resultTable = resultTable;
+  }
+
+  @Override
+  public ResultTable getResultTable() {
+    return _resultTable;
+  }
+
+  @Override
+  public List<QueryProcessingException> getProcessingExceptions() {
+    return List.of();
+  }
+
+  @Override
+  public int getNumRowsResultSet() {
+    return 0;
+  }
+
+  @Override
+  public long getOfflineThreadCpuTimeNs() {
+    return _offlineThreadCpuTimeNs;
+  }
+
+  @Override
+  public long getRealtimeThreadCpuTimeNs() {
+    return _realtimeThreadCpuTimeNs;
+  }
+
+  @Override
+  public long getOfflineSystemActivitiesCpuTimeNs() {
+    return _offlineSystemActivitiesCpuTimeNs;
+  }
+
+  @Override
+  public long getRealtimeSystemActivitiesCpuTimeNs() {
+    return _realtimeSystemActivitiesCpuTimeNs;
+  }
+
+  @Override
+  public long getOfflineResponseSerializationCpuTimeNs() {
+    return _offlineResponseSerializationCpuTimeNs;
+  }
+
+  @Override
+  public long getRealtimeResponseSerializationCpuTimeNs() {
+    return _realtimeResponseSerializationCpuTimeNs;
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByBroker() {
+    return _numSegmentsPrunedByBroker;
+  }
+
+  @Override
+  public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) {
+    _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
+  }
+
+  @Override
+  public String getRequestId() {
+    return _requestId;
+  }
+
+  @Override
+  public void setRequestId(String requestId) {
+    _requestId = requestId;
+  }
+
+  @Override
+  public String getBrokerId() {
+    return _brokerId;
+  }
+
+  @Override
+  public void setBrokerId(String requestId) {
+    _brokerId = requestId;
+  }
+
+  @Override
+  public long getBrokerReduceTimeMs() {
+    return _brokerReduceTimeMs;
+  }
+
+  @Override
+  public void setBrokerReduceTimeMs(long brokerReduceTimeMs) {
+    _brokerReduceTimeMs = brokerReduceTimeMs;
+  }
+
+  @JsonProperty(access = JsonProperty.Access.READ_ONLY)
+  @Override
+  public boolean isPartialResult() {
+    return isNumGroupsLimitReached() || getExceptionsSize() > 0 || 
isMaxRowsInJoinReached();
+  }
+
+  public void addServerStats(StatMap<DataTable.MetadataKey> serverStats) {
+    // Set execution statistics.
+    _serverStats.merge(serverStats);
+
+    long threadCpuTimeNs = 
serverStats.getLong(DataTable.MetadataKey.THREAD_CPU_TIME_NS);
+    long systemActivitiesCpuTimeNs = 
serverStats.getLong(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS);
+    long responseSerializationCpuTimeNs = 
serverStats.getLong(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS);
+
+    String tableName = serverStats.getString(DataTable.MetadataKey.TABLE);
+    if (tableName != null) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+
+      if (tableType == TableType.OFFLINE) {
+        _offlineThreadCpuTimeNs += threadCpuTimeNs;
+        _offlineSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs;
+        _offlineResponseSerializationCpuTimeNs += 
responseSerializationCpuTimeNs;
+      } else {
+        _realtimeThreadCpuTimeNs += threadCpuTimeNs;
+        _realtimeSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs;
+        _realtimeResponseSerializationCpuTimeNs += 
responseSerializationCpuTimeNs;
+      }

Review Comment:
   These are not V1 or V2 stats. They _broker_ stats. They are populated in the 
broker code.
   
   V1 collects `threadCpuTimeNs`, `systemActivitiesCpuTimeNs` and 
`responseSerializationCpuTimeNs` but doesn't care whether they are realtime or 
offline. That information reaches the broker and depending on the table type, 
it stores the value in the realtime or offline version and that is what is 
returned to the customer.
   
   In the code we have in master right now these stats are not being populated 
when the query is executed in V2. Specifically in `ExecutionStatsAggregator`, 
the code:
   
   ```java
       TableType tableType = null;
       String instanceName = null;
       if (routingInstance != null) {
         tableType = routingInstance.getTableType();
         instanceName = routingInstance.getShortName();;
       } else if (tableName != null) {
         tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
         instanceName = tableName;
       } else {
         tableType = null;
         instanceName = null;
       }
   ```
   
   Ends up setting `tableType` to null, these stats are never calculated.
   
   I guess we can remove these lines and keep the current behavior. We can try 
to add support for this in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to