This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 613274643aa [timeseries] Add support for query event listeners (#17464)
613274643aa is described below
commit 613274643aa6b2097bf7c5544cec188a02fd4efd
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Jan 9 09:04:45 2026 -0800
[timeseries] Add support for query event listeners (#17464)
* [timeseries] Add support for query event listeners
* Adding exceptions from TimeSeriesBlock to RequestContext
---------
Co-authored-by: shauryachats <[email protected]>
---
.../requesthandler/BaseBrokerRequestHandler.java | 28 ++++++++++++---------
.../requesthandler/TimeSeriesRequestHandler.java | 29 +++++++++++++++++++---
.../response/mapper/TimeSeriesResponseMapper.java | 27 ++++++++++++++++++++
.../tsdb/planner/TimeSeriesQueryEnvironment.java | 2 +-
.../tsdb/planner/physical/TableScanVisitor.java | 6 +++--
5 files changed, 74 insertions(+), 18 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 86f56aa03f2..5b18088ccac 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -150,18 +150,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
requestContext.setBrokerId(_brokerId);
long requestId = _requestIdGenerator.get();
requestContext.setRequestId(requestId);
-
- if (httpHeaders != null && !_trackedHeaders.isEmpty()) {
- MultivaluedMap<String, String> requestHeaders =
httpHeaders.getRequestHeaders();
- Map<String, List<String>> trackedHeadersMap =
Maps.newHashMapWithExpectedSize(_trackedHeaders.size());
- for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) {
- String key = entry.getKey().toLowerCase();
- if (_trackedHeaders.contains(key)) {
- trackedHeadersMap.put(key, entry.getValue());
- }
- }
- requestContext.setRequestHttpHeaders(trackedHeadersMap);
- }
+ setTrackedHeadersInRequestContext(requestContext, httpHeaders,
_trackedHeaders);
// First-stage access control to prevent unauthenticated requests from
using up resources. Secondary table-level
// check comes later.
@@ -361,6 +350,21 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
statistics.setTraceInfo(response.getTraceInfo());
}
+ protected static void setTrackedHeadersInRequestContext(RequestContext
requestContext,
+ HttpHeaders httpHeaders, Set<String> trackedHeaders) {
+ if (httpHeaders != null && !trackedHeaders.isEmpty()) {
+ MultivaluedMap<String, String> requestHeaders =
httpHeaders.getRequestHeaders();
+ Map<String, List<String>> trackedHeadersMap =
Maps.newHashMapWithExpectedSize(trackedHeaders.size());
+ for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) {
+ String key = entry.getKey().toLowerCase();
+ if (trackedHeaders.contains(key)) {
+ trackedHeadersMap.put(key, entry.getValue());
+ }
+ }
+ requestContext.setRequestHttpHeaders(trackedHeadersMap);
+ }
+ }
+
@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation
is not enabled on broker");
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 6f1a80b1256..fc6cd452776 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
@@ -50,6 +51,7 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.mapper.TimeSeriesResponseMapper;
import org.apache.pinot.common.utils.HumanReadableDuration;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
@@ -126,6 +128,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
_queryDispatcher.shutdown();
}
+ // TODO: Consider returning BrokerResponse instead of TimeSeriesBlock for
consistency with other handlers.
@Override
public TimeSeriesBlock handleTimeSeriesRequest(String lang, String
rawQueryParamString,
Map<String, String> queryParams, RequestContext requestContext,
RequesterIdentity requesterIdentity,
@@ -136,13 +139,17 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(_requestIdGenerator.get());
- RangeTimeSeriesRequest timeSeriesRequest = null;
+ setTrackedHeadersInRequestContext(requestContext, httpHeaders,
_trackedHeaders);
+
firstStageAccessControlCheck(requesterIdentity);
+ RangeTimeSeriesRequest timeSeriesRequest;
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString, queryParams);
} catch (URISyntaxException e) {
throw new QueryException(QueryErrorCode.TIMESERIES_PARSING, "Error
building RangeTimeSeriesRequest", e);
}
+ requestContext.setQuery(timeSeriesRequest.getQuery());
+
TimeSeriesLogicalPlanResult logicalPlanResult =
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
// If there are no buckets in the logical plan, return an empty response.
if (logicalPlanResult.getTimeBuckets().getNumBuckets() == 0) {
@@ -154,17 +161,33 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
timeSeriesBlock =
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
timeSeriesRequest.getTimeout().toMillis(), requestContext);
+ TimeSeriesResponseMapper.setStatsInRequestContext(requestContext,
timeSeriesBlock.getMetadata());
+ setExceptionsFromBlockToRequestContext(timeSeriesBlock, requestContext);
return timeSeriesBlock;
} catch (Exception e) {
+ QueryException qe;
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
1);
if (e instanceof QueryException) {
- throw (QueryException) e;
+ qe = (QueryException) e;
} else {
- throw new QueryException(QueryErrorCode.UNKNOWN, "Error processing
time-series query", e);
+ qe = new QueryException(QueryErrorCode.UNKNOWN, "Error processing
time-series query", e);
}
+ requestContext.setErrorCode(qe.getErrorCode());
+ throw qe;
} finally {
_brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS,
System.currentTimeMillis() - queryStartTime,
TimeUnit.MILLISECONDS);
+ _brokerQueryEventListener.onQueryCompletion(requestContext);
+ }
+ }
+
+ private void setExceptionsFromBlockToRequestContext(TimeSeriesBlock
timeSeriesBlock, RequestContext requestContext) {
+ List<QueryException> exceptions = timeSeriesBlock.getExceptions();
+ if (exceptions != null && !exceptions.isEmpty()) {
+ // Set the first exception's error code in the request context
+ requestContext.setErrorCode(exceptions.get(0).getErrorCode());
+
requestContext.setProcessingExceptions(exceptions.stream().map(QueryException::getMessage)
+ .collect(Collectors.toList()));
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index be681bc50b3..ae146ea8273 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -29,6 +29,7 @@ import
org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.tsdb.spi.series.TimeSeries;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -160,6 +161,32 @@ public class TimeSeriesResponseMapper {
brokerResponse.addBrokerStats(map);
}
+ public static void setStatsInRequestContext(RequestContext requestContext,
Map<String, String> metadata) {
+ if (metadata == null || metadata.isEmpty() || requestContext == null) {
+ return;
+ }
+ requestContext.setNumDocsScanned(getLongMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_DOCS_SCANNED));
+ requestContext.setNumEntriesScannedInFilter(getLongMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER));
+
requestContext.setNumEntriesScannedPostFilter(getLongMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER));
+ requestContext.setTotalDocs(getLongMetadataValue(metadata,
+ DataTable.MetadataKey.TOTAL_DOCS));
+ requestContext.setNumSegmentsQueried(getIntMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_SEGMENTS_QUERIED));
+ requestContext.setNumSegmentsProcessed(getIntMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED));
+ requestContext.setNumSegmentsMatched(getIntMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_SEGMENTS_MATCHED));
+ requestContext.setNumConsumingSegmentsMatched(getIntMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED));
+
requestContext.setNumConsumingSegmentsProcessed(getIntMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED));
+ requestContext.setNumConsumingSegmentsQueried(getIntMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED));
+ }
+
private static long getLongMetadataValue(Map<String, String> metadata,
DataTable.MetadataKey key) {
return Long.parseLong(metadata.getOrDefault(key.getName(), "0"));
}
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
index 157122eebfd..c62b931765a 100644
---
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
+++
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
@@ -99,7 +99,7 @@ public class TimeSeriesQueryEnvironment {
RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) {
// Step-0: Add table type info to the logical plan.
logicalPlan = new
TimeSeriesLogicalPlanResult(TableScanVisitor.INSTANCE.addTableTypeInfoToPlan(
- logicalPlan.getPlanNode()), logicalPlan.getTimeBuckets());
+ logicalPlan.getPlanNode(), requestContext),
logicalPlan.getTimeBuckets());
// Step-1: Assign segments to servers for each leaf node.
TableScanVisitor.Context scanVisitorContext =
TableScanVisitor.createContext(requestContext.getRequestId());
TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(),
logicalPlan.getTimeBuckets(),
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
index cfd7332d88f..cc5891de87c 100644
---
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
+++
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TableRouteInfo;
import org.apache.pinot.core.routing.TableRouteProvider;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
@@ -86,19 +87,20 @@ public class TableScanVisitor {
* @param planNode The {@link BaseTimeSeriesPlanNode} to process.
* @return The updated {@link BaseTimeSeriesPlanNode} with table type
information.
*/
- public BaseTimeSeriesPlanNode addTableTypeInfoToPlan(BaseTimeSeriesPlanNode
planNode) {
+ public BaseTimeSeriesPlanNode addTableTypeInfoToPlan(BaseTimeSeriesPlanNode
planNode, RequestContext requestContext) {
if (planNode instanceof LeafTimeSeriesPlanNode) {
LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) planNode;
TableRouteInfo routeInfo =
_tableRouteProvider.getTableRouteInfo(sfpNode.getTableName(), _tableCache,
_routingManager);
String tableNameWithType = getTableNameWithType(routeInfo);
Preconditions.checkNotNull(tableNameWithType, "Table not found for table
name: " + sfpNode.getTableName());
+ requestContext.setTableName(tableNameWithType);
return sfpNode.withTableName(tableNameWithType);
}
List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) {
- newInputs.add(addTableTypeInfoToPlan(childNode));
+ newInputs.add(addTableTypeInfoToPlan(childNode, requestContext));
}
return planNode.withInputs(newInputs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]