This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a54853279bd [timeseries] End-to-end query execution statistics 
propagation for timeseries queries (#17170)
a54853279bd is described below

commit a54853279bda6d6a6783a7c9737af04837e2b2f3
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Nov 19 13:01:13 2025 -0800

    [timeseries] End-to-end query execution statistics propagation for 
timeseries queries (#17170)
---
 .../response/mapper/TimeSeriesResponseMapper.java  | 41 ++++++++++++++++++++
 .../mapper/TimeSeriesResponseMapperTest.java       |  2 +-
 .../timeseries/TimeSeriesOperatorUtils.java        |  8 ++--
 .../core/query/executor/QueryExecutorTest.java     |  7 ++--
 .../tests/TimeSeriesIntegrationTest.java           |  7 ++++
 .../apache/pinot/query/runtime/QueryRunner.java    |  4 +-
 .../runtime/timeseries/LeafTimeSeriesOperator.java |  5 ++-
 .../TimeSeriesExchangeReceiveOperator.java         | 45 +++++++++++++++++++++-
 .../timeseries/serde/TimeSeriesBlockSerde.java     |  5 ++-
 .../timeseries/TimeSeriesDispatchObserver.java     |  3 +-
 .../timeseries/serde/TimeSeriesBlockSerdeTest.java |  3 +-
 .../pinot/tsdb/spi/series/TimeSeriesBlock.java     | 15 ++++++++
 12 files changed, 128 insertions(+), 17 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index 726e99b98fa..d412c8b8263 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -20,6 +20,9 @@ package org.apache.pinot.common.response.mapper;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
@@ -57,6 +60,7 @@ public class TimeSeriesResponseMapper {
 
     ResultTable resultTable = new ResultTable(dataSchema, rows);
     brokerResponse.setResultTable(resultTable);
+    setStats(brokerResponse, timeSeriesBlock.getMetadata());
     return brokerResponse;
   }
 
@@ -123,4 +127,41 @@ public class TimeSeriesResponseMapper {
     }
     return rows;
   }
+
+  private static void setStats(BrokerResponseNativeV2 brokerResponse, 
Map<String, String> metadata) {
+    if (metadata == null || metadata.isEmpty()) {
+      return;
+    }
+
+    StatMap<BrokerResponseNativeV2.StatKey> map = new 
StatMap<>(BrokerResponseNativeV2.StatKey.class);
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_DOCS_SCANNED, 
getLongMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_DOCS_SCANNED));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_ENTRIES_SCANNED_IN_FILTER,
+        getLongMetadataValue(metadata, 
DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_ENTRIES_SCANNED_POST_FILTER,
+        getLongMetadataValue(metadata, 
DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_QUERIED,
+        getIntMetadataValue(metadata, 
DataTable.MetadataKey.NUM_SEGMENTS_QUERIED));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_PROCESSED,
+        getIntMetadataValue(metadata, 
DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_SEGMENTS_MATCHED,
+        getIntMetadataValue(metadata, 
DataTable.MetadataKey.NUM_SEGMENTS_MATCHED));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_CONSUMING_SEGMENTS_MATCHED,
+        getIntMetadataValue(metadata, 
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_CONSUMING_SEGMENTS_PROCESSED,
+        getIntMetadataValue(metadata, 
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED));
+    map.merge(BrokerResponseNativeV2.StatKey.NUM_CONSUMING_SEGMENTS_QUERIED,
+        getIntMetadataValue(metadata, 
DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED));
+    map.merge(BrokerResponseNativeV2.StatKey.TOTAL_DOCS,
+        getLongMetadataValue(metadata, DataTable.MetadataKey.TOTAL_DOCS));
+    brokerResponse.addBrokerStats(map);
+  }
+
+  private static long getLongMetadataValue(Map<String, String> metadata, 
DataTable.MetadataKey key) {
+    return Long.parseLong(metadata.getOrDefault(key.getName(), "0"));
+  }
+
+  private static int getIntMetadataValue(Map<String, String> metadata, 
DataTable.MetadataKey key) {
+    return Integer.parseInt(metadata.getOrDefault(key.getName(), "0"));
+  }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
index 49afff1df7d..769d4ae562d 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
@@ -65,7 +65,7 @@ public class TimeSeriesResponseMapperTest {
   @Test
   public void toBrokerResponseWithException() {
     BrokerResponseNativeV2 resp = (BrokerResponseNativeV2) 
TimeSeriesResponseMapper.toBrokerResponse(
-      new QueryException(QueryErrorCode.INTERNAL, "time series exception"));
+        new QueryException(QueryErrorCode.INTERNAL, "time series exception"));
     List<QueryProcessingException> exceptions = resp.getExceptions();
     assertEquals(exceptions.size(), 1);
     assertEquals(exceptions.get(0).getErrorCode(), 
QueryErrorCode.INTERNAL.getId());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
index 7f095c47d17..8df6cccebfb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
@@ -40,7 +40,7 @@ public class TimeSeriesOperatorUtils {
   }
 
   public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
-      GroupByResultsBlock groupByResultsBlock) {
+      GroupByResultsBlock groupByResultsBlock, Map<String, String> 
responseMetadata) {
     if (groupByResultsBlock.getNumRows() == 0) {
       return new TimeSeriesBlock(timeBuckets, new HashMap<>());
     }
@@ -62,11 +62,11 @@ public class TimeSeriesOperatorUtils {
       timeSeriesList.add(seriesBuilder.buildWithTagOverrides(tagNames, 
tagValues));
       timeSeriesMap.put(seriesHash, timeSeriesList);
     }
-    return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+    return new TimeSeriesBlock(timeBuckets, timeSeriesMap, responseMetadata);
   }
 
   public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
-      AggregationResultsBlock aggregationResultsBlock) {
+      AggregationResultsBlock aggregationResultsBlock, Map<String, String> 
responseMetadata) {
     if (aggregationResultsBlock.getResults() == null) {
       return new TimeSeriesBlock(timeBuckets, new HashMap<>());
     }
@@ -76,7 +76,7 @@ public class TimeSeriesOperatorUtils {
     
timeSeriesList.add(seriesBuilder.buildWithTagOverrides(Collections.emptyList(), 
new Object[]{}));
     Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
     timeSeriesMap.put(seriesHash, timeSeriesList);
-    return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+    return new TimeSeriesBlock(timeBuckets, timeSeriesMap, responseMetadata);
   }
 
   private static List<String> getTagNamesFromDataSchema(DataSchema dataSchema) 
{
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 2cbeeb44674..b4cf43b59b0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -235,7 +235,7 @@ public class QueryExecutorTest {
     InstanceResponseBlock instanceResponse = execute(queryRequest);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
     TimeSeriesBlock timeSeriesBlock = 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets,
-        (AggregationResultsBlock) instanceResponse.getResultsBlock());
+        (AggregationResultsBlock) instanceResponse.getResultsBlock(), 
instanceResponse.getResponseMetadata());
     assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
     
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]);
     
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1],
 29885544.0);
@@ -251,7 +251,8 @@ public class QueryExecutorTest {
     InstanceResponseBlock instanceResponse = execute(queryRequest);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
GroupByResultsBlock);
     GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
instanceResponse.getResultsBlock();
-    TimeSeriesBlock timeSeriesBlock = 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, resultsBlock);
+    TimeSeriesBlock timeSeriesBlock = 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets, resultsBlock,
+        instanceResponse.getResponseMetadata());
     assertEquals(5, timeSeriesBlock.getSeriesMap().size());
     // For any city, say "New York", the max order item count should be 4
     boolean foundNewYork = false;
@@ -280,7 +281,7 @@ public class QueryExecutorTest {
     InstanceResponseBlock instanceResponse = execute(queryRequest);
     assertTrue(instanceResponse.getResultsBlock() instanceof 
GroupByResultsBlock);
     TimeSeriesBlock timeSeriesBlock = 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(timeBuckets,
-        (GroupByResultsBlock) instanceResponse.getResultsBlock());
+        (GroupByResultsBlock) instanceResponse.getResultsBlock(), 
instanceResponse.getResponseMetadata());
     assertEquals(5, timeSeriesBlock.getSeriesMap().size());
     // For any city, say "Chicago", the min order item count should be 0
     boolean foundChicago = false;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index 5c10778f3e4..d2e376f5751 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -50,6 +50,8 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
 
 public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest {
 
@@ -276,6 +278,11 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
         validator.validate(tsArray.get(i).asLong(), valArray.get(i).asLong(), 
metric);
       }
     }
+
+    // Validate query stats
+    assertTrue(Long.parseLong(result.path("totalDocs").asText()) > 0);
+    assertTrue(Long.parseLong(result.path("numDocsScanned").asText()) > 0);
+    assertTrue(Long.parseLong(result.path("numSegmentsQueried").asText()) > 0);
   }
 
   private List<String> extractStrings(JsonNode arrayNode) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6e59800d60c..d2d2e56f73a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -404,9 +404,11 @@ public class QueryRunner {
             currentPlanId = fragmentRoots.get(index).getId();
             BaseTimeSeriesOperator fragmentOpChain = 
fragmentOpChains.get(index);
             TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
+            Map<String, String> metadataMap = new 
HashMap<>(seriesBlock.getMetadata());
+            metadataMap.put(Response.MetadataKeys.TimeSeries.PLAN_ID, 
currentPlanId);
             Worker.TimeSeriesResponse response = 
Worker.TimeSeriesResponse.newBuilder()
                 
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
-                
.putAllMetadata(Map.of(Response.MetadataKeys.TimeSeries.PLAN_ID, currentPlanId))
+                .putAllMetadata(metadataMap)
                 .build();
             responseObserver.onNext(response);
           }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index 427df4f9e41..83b82591d52 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -63,10 +63,11 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
     }
     if (instanceResponseBlock.getResultsBlock() instanceof 
GroupByResultsBlock) {
       return 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
-          (GroupByResultsBlock) instanceResponseBlock.getResultsBlock());
+          (GroupByResultsBlock) instanceResponseBlock.getResultsBlock(), 
instanceResponseBlock.getResponseMetadata());
     } else if (instanceResponseBlock.getResultsBlock() instanceof 
AggregationResultsBlock) {
       return 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
-          (AggregationResultsBlock) instanceResponseBlock.getResultsBlock());
+          (AggregationResultsBlock) instanceResponseBlock.getResultsBlock(),
+          instanceResponseBlock.getResponseMetadata());
     } else if (instanceResponseBlock.getResultsBlock() == null) {
       throw new IllegalStateException("Found null results block in time-series 
query");
     } else {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index 79d49e0b462..cf2e06870bc 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -50,6 +51,20 @@ import 
org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
  * different servers, we will simply append them to the list, creating a union.
  */
 public class TimeSeriesExchangeReceiveOperator extends BaseTimeSeriesOperator {
+
+  private static final List<DataTable.MetadataKey> ADDITIVE_STATS_KEYS = 
List.of(
+    DataTable.MetadataKey.NUM_DOCS_SCANNED,
+    DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER,
+    DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER,
+    DataTable.MetadataKey.NUM_SEGMENTS_QUERIED,
+    DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED,
+    DataTable.MetadataKey.NUM_SEGMENTS_MATCHED,
+    DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED,
+    DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED,
+    DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED,
+    DataTable.MetadataKey.TOTAL_DOCS
+  );
+
   /**
    * Receiver will receive either TimeSeriesBlock or Throwable. And will have 
at most _numServersQueried objects that
    * can be polled.
@@ -100,6 +115,7 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
       throws Throwable {
     TimeBuckets timeBuckets = null;
     Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
+    Map<String, String> aggregatedStats = new HashMap<>();
     for (int index = 0; index < _numServersQueried; index++) {
       // Step-1: Poll, and ensure we received a TimeSeriesBlock.
       long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
@@ -137,6 +153,8 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
           seriesBuilder.mergeAlignedSeries(timeSeries);
         }
       }
+      // Step-4: Merge stats
+      mergeStats(aggregatedStats, blockToMerge.getMetadata());
     }
     // Convert series builders to series and return.
     Map<Long, List<TimeSeries>> seriesMap = new 
HashMap<>(seriesBuilderMap.size());
@@ -146,13 +164,35 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
       timeSeriesList.add(entry.getValue().build());
       seriesMap.put(seriesHash, timeSeriesList);
     }
-    return new TimeSeriesBlock(timeBuckets, seriesMap);
+    return new TimeSeriesBlock(timeBuckets, seriesMap, aggregatedStats);
+  }
+
+  /**
+   * TODO: Consider consolidating stats merging logic with
+   * {@link org.apache.pinot.core.query.reduce.ExecutionStatsAggregator}
+   **/
+  private void mergeStats(Map<String, String> aggregatedStats, Map<String, 
String> metadata) {
+    for (DataTable.MetadataKey statKey : ADDITIVE_STATS_KEYS) {
+      String key = statKey.getName();
+      String existingValue = aggregatedStats.getOrDefault(key, "0");
+      String newValue = metadata.get(key);
+      if (newValue != null) {
+        try {
+          long newLong = Long.parseLong(newValue);
+          long existingLong = Long.parseLong(existingValue);
+          aggregatedStats.put(key, Long.toString(existingLong + newLong));
+        } catch (NumberFormatException e) {
+          // Ignore malformed stats
+        }
+      }
+    }
   }
 
   private TimeSeriesBlock getNextBlockNoAggregation()
       throws Throwable {
     Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
     TimeBuckets timeBuckets = null;
+    Map<String, String> aggregatedStats = new HashMap<>();
     for (int index = 0; index < _numServersQueried; index++) {
       long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
       Preconditions.checkState(remainingTimeMs > 0, "Timed out before polling 
exchange receive");
@@ -175,8 +215,9 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
         List<TimeSeries> timeSeriesList = entry.getValue();
         timeSeriesMap.computeIfAbsent(seriesHash, x -> new 
ArrayList<>()).addAll(timeSeriesList);
       }
+      mergeStats(aggregatedStats, blockToMerge.getMetadata());
     }
     Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange 
receive operator");
-    return new TimeSeriesBlock(timeBuckets, timeSeriesMap);
+    return new TimeSeriesBlock(timeBuckets, timeSeriesMap, aggregatedStats);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
index fff71ac51ef..a4842a9cb44 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -87,7 +87,8 @@ public class TimeSeriesBlockSerde {
   private TimeSeriesBlockSerde() {
   }
 
-  public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer 
readOnlyByteBuffer)
+  public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer 
readOnlyByteBuffer,
+      Map<String, String> metadataMap)
       throws IOException {
     DataBlock dataBlock = DataBlockUtils.readFrom(readOnlyByteBuffer);
     SerializedDataBlock mseBlock = new SerializedDataBlock(dataBlock);
@@ -103,7 +104,7 @@ public class TimeSeriesBlockSerde {
       long seriesId = Long.parseLong(timeSeries.getId());
       seriesMap.computeIfAbsent(seriesId, x -> new 
ArrayList<>()).add(timeSeries);
     }
-    return new TimeSeriesBlock(timeBuckets, seriesMap);
+    return new TimeSeriesBlock(timeBuckets, seriesMap, metadataMap);
   }
 
   public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock 
timeSeriesBlock)
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
index f285418fc7b..85886bdda35 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.java
@@ -60,7 +60,8 @@ public class TimeSeriesDispatchObserver implements 
StreamObserver<Worker.TimeSer
     TimeSeriesBlock block = null;
     Throwable error = null;
     try {
-      block = 
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(timeSeriesResponse.getPayload().asReadOnlyByteBuffer());
+      block = 
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(timeSeriesResponse.getPayload().asReadOnlyByteBuffer(),
+          timeSeriesResponse.getMetadataMap());
     } catch (Throwable t) {
       error = t;
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
index 5c9c797a587..a6d556dfee3 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -51,7 +51,8 @@ public class TimeSeriesBlockSerdeTest {
       // Serialize, deserialize and serialize again
       ByteString byteString1 = 
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
       String serializedBlockString1 = byteString1.toStringUtf8();
-      TimeSeriesBlock block2 = 
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer());
+      TimeSeriesBlock block2 = 
TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer(),
+          Collections.emptyMap());
       String serializedBlockString2 = 
TimeSeriesBlockSerde.serializeTimeSeriesBlock(block2).toStringUtf8();
       // Serialized blocks in both cases should be the same since 
serialization is deterministic.
       assertEquals(serializedBlockString1, serializedBlockString2);
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
index 1556954d20f..cc813ae1ead 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tsdb.spi.series;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -37,10 +38,20 @@ public class TimeSeriesBlock {
    * {@link TimeSeries#getId()}.
    */
   private final Map<Long, List<TimeSeries>> _seriesMap;
+  /**
+   * Holds optional metadata about the block (e.g., statistics).
+   */
+  private Map<String, String> _metadata;
 
   public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, 
List<TimeSeries>> seriesMap) {
+    this(timeBuckets, seriesMap, Map.of());
+  }
+
+  public TimeSeriesBlock(@Nullable TimeBuckets timeBuckets, Map<Long, 
List<TimeSeries>> seriesMap,
+      Map<String, String> metadata) {
     _timeBuckets = timeBuckets;
     _seriesMap = seriesMap;
+    _metadata = Collections.unmodifiableMap(metadata);
   }
 
   @Nullable
@@ -51,4 +62,8 @@ public class TimeSeriesBlock {
   public Map<Long, List<TimeSeries>> getSeriesMap() {
     return _seriesMap;
   }
+
+  public Map<String, String> getMetadata() {
+    return _metadata;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to