vvivekiyer commented on code in PR #10171:
URL: https://github.com/apache/pinot/pull/10171#discussion_r1092421840
##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -241,6 +244,7 @@ public void start()
_brokerConf.getProperty(Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
Collections.emptyList()));
_brokerMetrics.initializeGlobalMeters();
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION,
PinotVersion.VERSION_METRIC_NAME, 1);
+ BrokerMetrics.register(_brokerMetrics);
Review Comment:
Suggest creating an OSS issue to clean up other broker and server code where
we pass around this object.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -258,435 +259,447 @@ private BrokerResponseNative handleRequest(long
requestId, String query,
throws Exception {
LOGGER.debug("SQL query for request {}: {}", requestId, query);
- long compilationStartTimeNs;
- PinotQuery pinotQuery;
+ Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId));
Review Comment:
When there is an IN_SUBQUERY, we execute it as two separate queries i.e this
function will be called twice with the same requestId. So do we expect to call
setupRunner for both? Also, if we don't have a test already, would suggest
writing a test with IN_SUBQUERY.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -707,9 +714,10 @@ public static class ControllerJob {
public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST =
"segmentsForceCommitted";
}
+ public static final String PINOT_QUERY_SCHEDULER_PREFIX =
"pinot.query.scheduler";
+
public static class Accounting {
Review Comment:
Suggest adding some comments here describing the class and constants.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java:
##########
@@ -102,6 +103,7 @@ private void reduceWithIntermediateResult(DataSchema
dataSchema, Collection<Data
intermediateResults[i] =
_aggregationFunctions[i].merge(mergedIntermediateResult,
intermediateResultToMerge);
}
Review Comment:
Would it make sense to sample and check after merging each aggregation
function? Would that become too frequent?
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java:
##########
@@ -32,47 +30,49 @@
*/
public class CPUMemThreadLevelAccountingObjects {
- public static class StatsDigest {
-
- // The current usage sampling for each thread
- final long[] _currentStatsSample;
- // The previous usage sampling for each thread
- final long[] _lastStatSample;
- // The aggregated usage sampling for the finished tasks of a (still)
running queries
- final HashMap<String, Long> _finishedTaskStatAggregator;
-
- StatsDigest(int numThreads) {
- _currentStatsSample = new long[numThreads];
- _lastStatSample = new long[numThreads];
- _finishedTaskStatAggregator = new HashMap<>();
- }
- }
-
/**
- * Entry to track the task execution status of a worker/runner given thread
+ * Entry to track the task execution status and usage stats of a Thread
*/
- public static class TaskEntryHolder {
- AtomicReference<TaskEntry> _threadTaskStatus = new AtomicReference<>(null);
+ public static class ThreadEntry {
+ // current query_id, task_id of the thread; this field is accessed by the
thread itself and the accountant
+ AtomicReference<TaskEntry> _currentThreadTaskStatus = new
AtomicReference<>(null);
Review Comment:
nit: null is not needed.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java:
##########
@@ -103,9 +106,14 @@ public BrokerResponseNative
reduceOnDataTable(BrokerRequest brokerRequest, Broke
QueryContext serverQueryContext =
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer =
ResultReducerFactory.getResultReducer(serverQueryContext);
- dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
- new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
- _groupByTrimThreshold), brokerMetrics);
+ try {
+ dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
+ new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
+ _groupByTrimThreshold), brokerMetrics);
+ } catch (EarlyTerminationException e) {
+ brokerResponseNative.getProcessingExceptions()
Review Comment:
Consider using `brokerResponseNative.addToException()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]