This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a54853279bd [timeseries] End-to-end query execution statistics
propagation for timeseries queries (#17170)
a54853279bd is described below
commit a54853279bda6d6a6783a7c9737af04837e2b2f3
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Nov 19 13:01:13 2025 -0800
[timeseries] End-to-end query execution statistics propagation for
timeseries queries (#17170)
---
.../response/mapper/TimeSeriesResponseMapper.java | 41 ++++++++++++++++++++
.../mapper/TimeSeriesResponseMapperTest.java | 2 +-
.../timeseries/TimeSeriesOperatorUtils.java | 8 ++--
.../core/query/executor/QueryExecutorTest.java | 7 ++--
.../tests/TimeSeriesIntegrationTest.java | 7 ++++
.../apache/pinot/query/runtime/QueryRunner.java | 4 +-
.../runtime/timeseries/LeafTimeSeriesOperator.java | 5 ++-
.../TimeSeriesExchangeReceiveOperator.java | 45 +++++++++++++++++++++-
.../timeseries/serde/TimeSeriesBlockSerde.java | 5 ++-
.../timeseries/TimeSeriesDispatchObserver.java | 3 +-
.../timeseries/serde/TimeSeriesBlockSerdeTest.java | 3 +-
.../pinot/tsdb/spi/series/TimeSeriesBlock.java | 15 ++++++++
12 files changed, 128 insertions(+), 17 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index 726e99b98fa..d412c8b8263 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -20,6 +20,9 @@ package org.apache.pinot.common.response.mapper;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.QueryProcessingException;
@@ -57,6 +60,7 @@ public class TimeSeriesResponseMapper {
ResultTable resultTable = new ResultTable(dataSchema, rows);
brokerResponse.setResultTable(resultTable);
+ setStats(brokerResponse, timeSeriesBlock.getMetadata());
return brokerResponse;
}
@@ -123,4 +127,41 @@ public class TimeSeriesResponseMapper {
}
return rows;
}
+
+ private static void setStats(BrokerResponseNativeV2 brokerResponse,
Map<String, String> metadata) {
+ if (metadata == null || metadata.isEmpty()) {
+ return;
+ }
+
+ StatMap<BrokerResponseNativeV2.StatKey> map = new
StatMap<>(BrokerResponseNativeV2.StatKey.class);
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_DOCS_SCANNED,
getLongMetadataValue(metadata,
+ DataTable.MetadataKey.NUM_DOCS_SCANNED));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_ENTRIES_SCANNED_IN_FILTER,
+ getLongMetadataValue(metadata,
DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_ENTRIES_SCANNED_POST_FILTER,
+ getLongMetadataValue(metadata,
DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_QUERIED,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_QUERIED));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_PROCESSED,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_MATCHED,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_SEGMENTS_MATCHED));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_CONSUMING_SEGMENTS_MATCHED,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_CONSUMING_SEGMENTS_PROCESSED,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED));
+ map.merge(BrokerResponseNativeV2.StatKey.NUM_CONSUMING_SEGMENTS_QUERIED,
+ getIntMetadataValue(metadata,
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED));
+ map.merge(BrokerResponseNativeV2.StatKey.TOTAL_DOCS,
+ getLongMetadataValue(metadata, DataTable.MetadataKey.TOTAL_DOCS));
+ brokerResponse.addBrokerStats(map);
+ }
+
+ private static long getLongMetadataValue(Map<String, String> metadata,
DataTable.MetadataKey key) {
+ return Long.parseLong(metadata.getOrDefault(key.getName(), "0"));
+ }
+
+ private static int getIntMetadataValue(Map<String, String> metadata,
DataTable.MetadataKey key) {
+ return Integer.parseInt(metadata.getOrDefault(key.getName(), "0"));
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
index 49afff1df7d..769d4ae562d 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
@@ -65,7 +65,7 @@ public class TimeSeriesResponseMapperTest {
@Test
public void toBrokerResponseWithException() {
BrokerResponseNativeV2 resp = (BrokerResponseNativeV2)
TimeSeriesResponseMapper.toBrokerResponse(
- new QueryException(QueryErrorCode.INTERNAL, "time series exception"));
+ new QueryException(QueryErrorCode.INTERNAL, "time series exception"));
List<QueryProcessingException> exceptions = resp.getExceptions();
assertEquals(exceptions.size(), 1);
assertEquals(exceptions.get(0).getErrorCode(),
QueryErrorCode.INTERNAL.getId());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
index 7f095c47d17..8df6cccebfb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
@@ -40,7 +40,7 @@ public class TimeSeriesOperatorUtils {
}
public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
- GroupByResultsBlock groupByResultsBlock) {
+ GroupByResultsBlock groupByResultsBlock, Map<String, String>
responseMetadata) {
if (groupByResultsBlock.getNumRows() == 0) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
}
@@ -62,11 +62,11 @@ public class TimeSeriesOperatorUtils {
timeSeriesList.add(seriesBuilder.buildWithTagOverrides(tagNames,
tagValues));
timeSeriesMap.put(seriesHash, timeSeriesList);
}
- return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+ return new TimeSeriesBlock(timeBuckets, timeSeriesMap, responseMetadata);
}
public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
- AggregationResultsBlock aggregationResultsBlock) {
+ AggregationResultsBlock aggregationResultsBlock, Map<String, String>
responseMetadata) {
if (aggregationResultsBlock.getResults() == null) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
}
@@ -76,7 +76,7 @@ public class TimeSeriesOperatorUtils {
timeSeriesList.add(seriesBuilder.buildWithTagOverrides(Collections.emptyList(),
new Object[]{}));
Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
timeSeriesMap.put(seriesHash, timeSeriesList);
- return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+ return new TimeSeriesBlock(timeBuckets, timeSeriesMap, responseMetadata);
}
private static List<String> getTagNamesFromDataSchema(DataSchema dataSchema)
{
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 2cbeeb44674..b4cf43b59b0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -235,7 +235,7 @@ public class QueryExecutorTest {
InstanceResponseBlock instanceResponse = execute(queryRequest);
assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
TimeSeriesBlock timeSeriesBlock =
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets,
- (AggregationResultsBlock) instanceResponse.getResultsBlock());
+ (AggregationResultsBlock) instanceResponse.getResultsBlock(),
instanceResponse.getResponseMetadata());
assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]);
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1],
29885544.0);
@@ -251,7 +251,8 @@ public class QueryExecutorTest {
InstanceResponseBlock instanceResponse = execute(queryRequest);
assertTrue(instanceResponse.getResultsBlock() instanceof
GroupByResultsBlock);
GroupByResultsBlock resultsBlock = (GroupByResultsBlock)
instanceResponse.getResultsBlock();
- TimeSeriesBlock timeSeriesBlock =
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, resultsBlock);
+ TimeSeriesBlock timeSeriesBlock =
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, resultsBlock,
+ instanceResponse.getResponseMetadata());
assertEquals(5, timeSeriesBlock.getSeriesMap().size());
// For any city, say "New York", the max order item count should be 4
boolean foundNewYork = false;
@@ -280,7 +281,7 @@ public class QueryExecutorTest {
InstanceResponseBlock instanceResponse = execute(queryRequest);
assertTrue(instanceResponse.getResultsBlock() instanceof
GroupByResultsBlock);
TimeSeriesBlock timeSeriesBlock =
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets,
- (GroupByResultsBlock) instanceResponse.getResultsBlock());
+ (GroupByResultsBlock) instanceResponse.getResultsBlock(),
instanceResponse.getResponseMetadata());
assertEquals(5, timeSeriesBlock.getSeriesMap().size());
// For any city, say "Chicago", the min order item count should be 0
boolean foundChicago = false;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index 5c10778f3e4..d2e376f5751 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -50,6 +50,8 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
@@ -276,6 +278,11 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
validator.validate(tsArray.get(i).asLong(), valArray.get(i).asLong(),
metric);
}
}
+
+ // Validate query stats
+ assertTrue(Long.parseLong(result.path("totalDocs").asText()) > 0);
+ assertTrue(Long.parseLong(result.path("numDocsScanned").asText()) > 0);
+ assertTrue(Long.parseLong(result.path("numSegmentsQueried").asText()) > 0);
}
private List<String> extractStrings(JsonNode arrayNode) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6e59800d60c..d2d2e56f73a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -404,9 +404,11 @@ public class QueryRunner {
currentPlanId = fragmentRoots.get(index).getId();
BaseTimeSeriesOperator fragmentOpChain =
fragmentOpChains.get(index);
TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
+ Map<String, String> metadataMap = new
HashMap<>(seriesBlock.getMetadata());
+ metadataMap.put(Response.MetadataKeys.TimeSeries.PLAN_ID,
currentPlanId);
Worker.TimeSeriesResponse response =
Worker.TimeSeriesResponse.newBuilder()
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
-
.putAllMetadata(Map.of(Response.MetadataKeys.TimeSeries.PLAN_ID, currentPlanId))
+ .putAllMetadata(metadataMap)
.build();
responseObserver.onNext(response);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index 427df4f9e41..83b82591d52 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -63,10 +63,11 @@ public class LeafTimeSeriesOperator extends
BaseTimeSeriesOperator {
}
if (instanceResponseBlock.getResultsBlock() instanceof
GroupByResultsBlock) {
return
TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
- (GroupByResultsBlock) instanceResponseBlock.getResultsBlock());
+ (GroupByResultsBlock) instanceResponseBlock.getResultsBlock(),
instanceResponseBlock.getResponseMetadata());
} else if (instanceResponseBlock.getResultsBlock() instanceof
AggregationResultsBlock) {
return
TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
- (AggregationResultsBlock) instanceResponseBlock.getResultsBlock());
+ (AggregationResultsBlock) instanceResponseBlock.getResultsBlock(),
+ instanceResponseBlock.getResponseMetadata());
} else if (instanceResponseBlock.getResultsBlock() == null) {
throw new IllegalStateException("Found null results block in time-series
query");
} else {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index 79d49e0b462..cf2e06870bc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -50,6 +51,20 @@ import
org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
* different servers, we will simply append them to the list, creating a union.
*/
public class TimeSeriesExchangeReceiveOperator extends BaseTimeSeriesOperator {
+
+ private static final List<DataTable.MetadataKey> ADDITIVE_STATS_KEYS =
List.of(
+ DataTable.MetadataKey.NUM_DOCS_SCANNED,
+ DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER,
+ DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER,
+ DataTable.MetadataKey.NUM_SEGMENTS_QUERIED,
+ DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED,
+ DataTable.MetadataKey.NUM_SEGMENTS_MATCHED,
+ DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED,
+ DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED,
+ DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED,
+ DataTable.MetadataKey.TOTAL_DOCS
+ );
+
/**
* Receiver will receive either TimeSeriesBlock or Throwable. And will have
at most _numServersQueried objects that
* can be polled.
@@ -100,6 +115,7 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
throws Throwable {
TimeBuckets timeBuckets = null;
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
+ Map<String, String> aggregatedStats = new HashMap<>();
for (int index = 0; index < _numServersQueried; index++) {
// Step-1: Poll, and ensure we received a TimeSeriesBlock.
long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
@@ -137,6 +153,8 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
seriesBuilder.mergeAlignedSeries(timeSeries);
}
}
+ // Step-4: Merge stats
+ mergeStats(aggregatedStats, blockToMerge.getMetadata());
}
// Convert series builders to series and return.
Map<Long, List<TimeSeries>> seriesMap = new
HashMap<>(seriesBuilderMap.size());
@@ -146,13 +164,35 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
timeSeriesList.add(entry.getValue().build());
seriesMap.put(seriesHash, timeSeriesList);
}
- return new TimeSeriesBlock(timeBuckets, seriesMap);
+ return new TimeSeriesBlock(timeBuckets, seriesMap, aggregatedStats);
+ }
+
+ /**
+ * TODO: Consider consolidating stats merging logic with
+ * {@link org.apache.pinot.core.query.reduce.ExecutionStatsAggregator}
+ **/
+ private void mergeStats(Map<String, String> aggregatedStats, Map<String,
String> metadata) {
+ for (DataTable.MetadataKey statKey : ADDITIVE_STATS_KEYS) {
+ String key = statKey.getName();
+ String existingValue = aggregatedStats.getOrDefault(key, "0");
+ String newValue = metadata.get(key);
+ if (newValue != null) {
+ try {
+ long newLong = Long.parseLong(newValue);
+ long existingLong = Long.parseLong(existingValue);
+ aggregatedStats.put(key, Long.toString(existingLong + newLong));
+ } catch (NumberFormatException e) {
+ // Ignore malformed stats
+ }
+ }
+ }
}
private TimeSeriesBlock getNextBlockNoAggregation()
throws Throwable {
Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
TimeBuckets timeBuckets = null;
+ Map<String, String> aggregatedStats = new HashMap<>();
for (int index = 0; index < _numServersQueried; index++) {
long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
Preconditions.checkState(remainingTimeMs > 0, "Timed out before polling
exchange receive");
@@ -175,8 +215,9 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
List<TimeSeries> timeSeriesList = entry.getValue();
timeSeriesMap.computeIfAbsent(seriesHash, x -> new
ArrayList<>()).addAll(timeSeriesList);
}
+ mergeStats(aggregatedStats, blockToMerge.getMetadata());
}
Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange
receive operator");
- return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+ return new TimeSeriesBlock(timeBuckets, timeSeriesMap, aggregatedStats);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
index fff71ac51ef..a4842a9cb44 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -87,7 +87,8 @@ public class TimeSeriesBlockSerde {
private TimeSeriesBlockSerde() {
}
- public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer
readOnlyByteBuffer)
+ public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer
readOnlyByteBuffer,
+ Map<String, String> metadataMap)
throws IOException {
DataBlock dataBlock = DataBlockUtils.readFrom(readOnlyByteBuffer);
SerializedDataBlock mseBlock = new SerializedDataBlock(dataBlock);
@@ -103,7 +104,7 @@ public class TimeSeriesBlockSerde {
long seriesId = Long.parseLong(timeSeries.getId());
seriesMap.computeIfAbsent(seriesId, x -> new
ArrayList<>()).add(timeSeries);
}
- return new TimeSeriesBlock(timeBuckets, seriesMap);
+ return new TimeSeriesBlock(timeBuckets, seriesMap, metadataMap);
}
public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock
timeSeriesBlock)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
index f285418fc7b..85886bdda35 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
@@ -60,7 +60,8 @@ public class TimeSeriesDispatchObserver implements
StreamObserver<Worker.TimeSer
TimeSeriesBlock block = null;
Throwable error = null;
try {
- block =
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(timeSeriesResponse.getPayload().asReadOnlyByteBuffer());
+ block =
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(timeSeriesResponse.getPayload().asReadOnlyByteBuffer(),
+ timeSeriesResponse.getMetadataMap());
} catch (Throwable t) {
error = t;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
index 5c9c797a587..a6d556dfee3 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -51,7 +51,8 @@ public class TimeSeriesBlockSerdeTest {
// Serialize, deserialize and serialize again
ByteString byteString1 =
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
String serializedBlockString1 = byteString1.toStringUtf8();
- TimeSeriesBlock block2 =
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer());
+ TimeSeriesBlock block2 =
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer(),
+ Collections.emptyMap());
String serializedBlockString2 =
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block2).toStringUtf8();
// Serialized blocks in both cases should be the same since
serialization is deterministic.
assertEquals(serializedBlockString1, serializedBlockString2);
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
index 1556954d20f..cc813ae1ead 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tsdb.spi.series;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -37,10 +38,20 @@ public class TimeSeriesBlock {
* {@link TimeSeries#getId()}.
*/
private final Map<Long, List<TimeSeries>> _seriesMap;
+ /**
+ * Holds optional metadata about the block (e.g., statistics).
+ */
+ private Map<String, String> _metadata;
public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long,
List<TimeSeries>> seriesMap) {
+ this(timeBuckets, seriesMap, Map.of());
+ }
+
+ public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long,
List<TimeSeries>> seriesMap,
+ Map<String, String> metadata) {
_timeBuckets = timeBuckets;
_seriesMap = seriesMap;
+ _metadata = Collections.unmodifiableMap(metadata);
}
@Nullable
@@ -51,4 +62,8 @@ public class TimeSeriesBlock {
public Map<Long, List<TimeSeries>> getSeriesMap() {
return _seriesMap;
}
+
+ public Map<String, String> getMetadata() {
+ return _metadata;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]