This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a1f2f6a8ab3 [timeseries] Adding timeUsedMs to query statistics (#17676)
a1f2f6a8ab3 is described below
commit a1f2f6a8ab317e2a789e63755f916f10e442ae90
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue Feb 17 16:39:49 2026 -0800
[timeseries] Adding timeUsedMs to query statistics (#17676)
* [timeseries] Adding timeUsedMs to query statistics
* Adding more stats
* Addressed comments
---
.../requesthandler/TimeSeriesRequestHandler.java | 12 ++++++++---
.../response/mapper/TimeSeriesResponseMapper.java | 11 ++++++++++
.../tests/TimeSeriesIntegrationTest.java | 1 +
.../TimeSeriesExchangeReceiveOperator.java | 25 +++++++++++++++++++++-
.../pinot/tsdb/spi/series/TimeSeriesBlock.java | 4 ++--
5 files changed, 47 insertions(+), 6 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index caaa2ba75d6..dfba7459996 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -46,6 +46,7 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.request.context.ExpressionContext;
@@ -158,7 +159,8 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
TimeSeriesLogicalPlanResult logicalPlanResult =
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
// If there are no buckets in the logical plan, return an empty response.
if (logicalPlanResult.getTimeBuckets().getNumBuckets() == 0) {
- return new TimeSeriesBlock(logicalPlanResult.getTimeBuckets(), new
HashMap<>());
+ timeSeriesBlock = new
TimeSeriesBlock(logicalPlanResult.getTimeBuckets(), new HashMap<>());
+ return timeSeriesBlock;
}
TimeSeriesDispatchablePlan dispatchablePlan =
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest,
requestContext, logicalPlanResult);
@@ -166,6 +168,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
timeSeriesBlock =
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
timeSeriesRequest.getTimeout().toMillis(), requestContext);
+
TimeSeriesResponseMapper.setStatsInRequestContext(requestContext,
timeSeriesBlock.getMetadata());
setExceptionsFromBlockToRequestContext(timeSeriesBlock, requestContext);
return timeSeriesBlock;
@@ -180,9 +183,12 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
requestContext.setErrorCode(qe.getErrorCode());
throw qe;
} finally {
- _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS,
System.currentTimeMillis() - queryStartTime,
- TimeUnit.MILLISECONDS);
+ long endToEndTimeMs = System.currentTimeMillis() - queryStartTime;
+ _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS,
endToEndTimeMs, TimeUnit.MILLISECONDS);
_brokerQueryEventListener.onQueryCompletion(requestContext);
+ if (timeSeriesBlock != null) {
+
timeSeriesBlock.getMetadata().put(DataTable.MetadataKey.TIME_USED_MS.getName(),
String.valueOf(endToEndTimeMs));
+ }
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index ae146ea8273..4930f45279f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -158,7 +158,18 @@ public class TimeSeriesResponseMapper {
getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED));
map.merge(BrokerResponseNativeV2.StatKey.TOTAL_DOCS,
getLongMetadataValue(metadata, DataTable.MetadataKey.TOTAL_DOCS));
+ map.merge(BrokerResponseNativeV2.StatKey.MIN_CONSUMING_FRESHNESS_TIME_MS,
+ getLongMetadataValue(metadata,
DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_PRUNED_INVALID,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_PRUNED_BY_VALUE,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE));
brokerResponse.addBrokerStats(map);
+ brokerResponse.setTimeUsedMs(getLongMetadataValue(metadata,
DataTable.MetadataKey.TIME_USED_MS));
}
public static void setStatsInRequestContext(RequestContext requestContext,
Map<String, String> metadata) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index 60b6660671d..8d276c9716e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -412,6 +412,7 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
assertTrue(Long.parseLong(result.path("totalDocs").asText()) > 0);
assertTrue(Long.parseLong(result.path("numDocsScanned").asText()) > 0);
assertTrue(Long.parseLong(result.path("numSegmentsQueried").asText()) > 0);
+ assertTrue(Long.parseLong(result.path("timeUsedMs").asText()) > 0);
}
private List<String> extractStrings(JsonNode arrayNode) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index ab18744a1a7..07fea4bd8d0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -64,7 +64,15 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED,
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED,
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED,
- DataTable.MetadataKey.TOTAL_DOCS
+ DataTable.MetadataKey.TOTAL_DOCS,
+ DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
+ DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID,
+ DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+ DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE
+ );
+
+ private static final List<DataTable.MetadataKey> MIN_STATS_KEYS = List.of(
+ DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS
);
/**
@@ -210,6 +218,21 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
}
}
}
+
+ for (DataTable.MetadataKey statKey : MIN_STATS_KEYS) {
+ String key = statKey.getName();
+ String existingValue = aggregatedStats.getOrDefault(key,
Long.toString(Long.MAX_VALUE));
+ String newValue = metadata.get(key);
+ if (newValue != null) {
+ try {
+ long newLong = Long.parseLong(newValue);
+ long existingLong = Long.parseLong(existingValue);
+ aggregatedStats.put(key, Long.toString(Math.min(existingLong,
newLong)));
+ } catch (NumberFormatException e) {
+ // Ignore malformed stats
+ }
+ }
+ }
}
private TimeSeriesBlock getNextBlockNoAggregation()
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
index c7e4b41a2c3..a0ffe116a91 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -19,7 +19,7 @@
package org.apache.pinot.tsdb.spi.series;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -59,7 +59,7 @@ public class TimeSeriesBlock {
Map<String, String> metadata) {
_timeBuckets = timeBuckets;
_seriesMap = seriesMap;
- _metadata = Collections.unmodifiableMap(metadata);
+ _metadata = new HashMap<>(metadata);
_exceptions = new ArrayList<>();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]