This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3ce1cff782 [timeseries] Returning TimeSeriesBlock to API handlers to 
eliminate extraneous transformation (#16641)
c3ce1cff782 is described below

commit c3ce1cff782a2ea479ef2ef6040c76cd4034998c
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Aug 29 07:39:53 2025 +0530

    [timeseries] Returning TimeSeriesBlock to API handlers to eliminate 
extraneous transformation (#16641)
    
    Co-authored-by: Shaurya Chaturvedi <[email protected]>
---
 .../broker/api/resources/PinotClientRequest.java   |  22 +--
 .../requesthandler/BrokerRequestHandler.java       |   7 +-
 .../BrokerRequestHandlerDelegate.java              |   9 +-
 .../requesthandler/TimeSeriesRequestHandler.java   |  35 ++---
 .../response/PinotBrokerTimeSeriesResponse.java    |  64 +--------
 .../response/mapper/TimeSeriesResponseMapper.java  | 126 +++++++++++++++++
 .../mapper/TimeSeriesResponseMapperTest.java       | 157 +++++++++++++++++++++
 .../query/service/dispatch/QueryDispatcher.java    |  14 +-
 .../apache/pinot/spi/exception/QueryErrorCode.java |   1 +
 9 files changed, 332 insertions(+), 103 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index dc49e0aec73..ee9dfbe5097 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -68,6 +68,7 @@ import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.mapper.TimeSeriesResponseMapper;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.auth.Actions;
@@ -81,6 +82,7 @@ import 
org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.RequestScope;
@@ -90,6 +92,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.PinotSqlType;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.glassfish.jersey.server.ManagedAsync;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -295,10 +298,12 @@ public class PinotClientRequest {
       String language = queryParams.get(Request.LANGUAGE);
       String queryString = queryParams.get(Request.QUERY);
       try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
-        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, queryParams,
+        TimeSeriesBlock timeSeriesBlock = executeTimeSeriesQuery(language, 
queryString, queryParams,
             requestContext, makeHttpIdentity(requestCtx), httpHeaders);
-        asyncResponse.resume(response.toBrokerResponse());
+        
asyncResponse.resume(TimeSeriesResponseMapper.toBrokerResponse(timeSeriesBlock));
       }
+    } catch (QueryException e) {
+      asyncResponse.resume(TimeSeriesResponseMapper.toBrokerResponse(e));
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing POST timeseries 
request", e);
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
@@ -320,20 +325,21 @@ public class PinotClientRequest {
     try {
       try (RequestScope requestContext = 
Tracing.getTracer().createRequestScope()) {
         String queryString = requestCtx.getQueryString();
-        PinotBrokerTimeSeriesResponse response = 
executeTimeSeriesQuery(language, queryString, Map.of(), requestContext,
+        TimeSeriesBlock timeSeriesBlock = executeTimeSeriesQuery(language, 
queryString, Map.of(), requestContext,
           makeHttpIdentity(requestCtx), httpHeaders);
+        PinotBrokerTimeSeriesResponse response = 
PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(timeSeriesBlock);
         if (response.getErrorType() != null && 
!response.getErrorType().isEmpty()) {
           
asyncResponse.resume(Response.serverError().entity(response).build());
           return;
         }
         asyncResponse.resume(response);
       }
+    } catch (QueryException e) {
+      asyncResponse.resume(PinotBrokerTimeSeriesResponse.fromException(e));
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
-      asyncResponse.resume(Response.serverError().entity(
-              new PinotBrokerTimeSeriesResponse("error", null, 
e.getClass().getSimpleName(), e.getMessage()))
-          .build());
+      asyncResponse.resume(PinotBrokerTimeSeriesResponse.fromException(e));
     }
   }
 
@@ -564,9 +570,9 @@ public class PinotClientRequest {
     }
   }
 
-  private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String 
language, String queryString,
+  private TimeSeriesBlock executeTimeSeriesQuery(String language, String 
queryString,
       Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
-      HttpHeaders httpHeaders) {
+      HttpHeaders httpHeaders) throws QueryException {
     return _requestHandler.handleTimeSeriesRequest(language, queryString, 
queryParams, requestContext,
         requesterIdentity, httpHeaders);
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index fbab6bbf8e6..71ea2d3cc9f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -29,14 +29,15 @@ import javax.annotation.concurrent.ThreadSafe;
 import javax.ws.rs.core.HttpHeaders;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.pinot.common.response.BrokerResponse;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.RequestScope;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
 
 @ThreadSafe
@@ -64,9 +65,9 @@ public interface BrokerRequestHandler {
   /**
    * Run a query and use the time-series engine.
    */
-  default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
+  default TimeSeriesBlock handleTimeSeriesRequest(String lang, String 
rawQueryParamString,
       Map<String, String> queryParams, RequestContext requestContext, 
@Nullable RequesterIdentity requesterIdentity,
-      HttpHeaders httpHeaders) {
+      HttpHeaders httpHeaders) throws QueryException {
     throw new UnsupportedOperationException("Handler does not support Time 
Series requests");
   }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index 730fe380b71..9fe1936e154 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -29,15 +29,16 @@ import 
org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.pinot.common.cursors.AbstractResponseStore;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.CursorResponse;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
 
 /**
@@ -124,14 +125,14 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
   }
 
   @Override
-  public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
+  public TimeSeriesBlock handleTimeSeriesRequest(String lang, String 
rawQueryParamString,
       Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
-      HttpHeaders httpHeaders) {
+      HttpHeaders httpHeaders) throws QueryException {
     if (_timeSeriesRequestHandler != null) {
       return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang, 
rawQueryParamString, queryParams, requestContext,
           requesterIdentity, httpHeaders);
     }
-    return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time 
series query engine not enabled.");
+    throw new QueryException(QueryErrorCode.INTERNAL, "Time series query 
engine not enabled.");
   }
 
   @Override
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index b7c93bb4c1d..65e04288438 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -43,7 +43,6 @@ import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.response.BrokerResponse;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.utils.HumanReadableDuration;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.TargetType;
@@ -52,12 +51,15 @@ import 
org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.auth.AuthorizationResult;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.apache.pinot.tsdb.planner.TimeSeriesQueryEnvironment;
 import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
 import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
 import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,10 +109,10 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
   }
 
   @Override
-  public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, 
String rawQueryParamString,
+  public TimeSeriesBlock handleTimeSeriesRequest(String lang, String 
rawQueryParamString,
       Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
-      HttpHeaders httpHeaders) {
-    PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
+      HttpHeaders httpHeaders) throws QueryException {
+    TimeSeriesBlock timeSeriesBlock = null;
     long queryStartTime = System.currentTimeMillis();
     try {
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
@@ -121,31 +123,30 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
       try {
         timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString, queryParams);
       } catch (URISyntaxException e) {
-        return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST", 
"Error building RangeTimeSeriesRequest");
+        throw new QueryException(QueryErrorCode.TIMESERIES_PARSING, "Error 
building RangeTimeSeriesRequest", e);
       }
       TimeSeriesLogicalPlanResult logicalPlanResult = 
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
       // If there are no buckets in the logical plan, return an empty response.
       if (logicalPlanResult.getTimeBuckets().getNumBuckets() == 0) {
-        return PinotBrokerTimeSeriesResponse.newEmptyResponse();
+        return new TimeSeriesBlock(logicalPlanResult.getTimeBuckets(), new 
HashMap<>());
       }
       TimeSeriesDispatchablePlan dispatchablePlan =
           _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, 
requestContext, logicalPlanResult);
 
       tableLevelAccessControlCheck(httpHeaders, 
dispatchablePlan.getTableNames());
-      timeSeriesResponse = _queryDispatcher.submitAndGet(requestContext, 
dispatchablePlan,
-          timeSeriesRequest.getTimeout().toMillis(), new HashMap<>());
-      return timeSeriesResponse;
+      timeSeriesBlock = 
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
+          timeSeriesRequest.getTimeout().toMillis(), new HashMap<>(), 
requestContext);
+      return timeSeriesBlock;
+    } catch (Exception e) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
 1);
+      if (e instanceof QueryException) {
+        throw (QueryException) e;
+      } else {
+        throw new QueryException(QueryErrorCode.UNKNOWN, "Error processing 
time-series query", e);
+      }
     } finally {
       _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, 
System.currentTimeMillis() - queryStartTime,
           TimeUnit.MILLISECONDS);
-      if (timeSeriesResponse == null
-          || 
timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS))
 {
-        
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
 1);
-        final String errorMessage = timeSeriesResponse == null ? "null 
time-series response"
-            : timeSeriesResponse.getError();
-        // TODO(timeseries): Remove logging for failed queries.
-        LOGGER.warn("time-series query failed with error: {}", errorMessage);
-      }
     }
   }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 820ac01996b..3198d752c30 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -25,17 +25,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.annotations.InterfaceStability;
-import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
@@ -114,59 +109,12 @@ public class PinotBrokerTimeSeriesResponse {
     return convertBucketedSeriesBlock(seriesBlock);
   }
 
-  public BrokerResponse toBrokerResponse() {
-    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
-    if (_errorType != null) {
-      // TODO: Introduce proper error code based exception handling for 
timeseries.
-      brokerResponse.addException(new 
QueryProcessingException(QueryErrorCode.UNKNOWN, _errorType + ": "
-          + _error));
-      return brokerResponse;
+  public static PinotBrokerTimeSeriesResponse fromException(Exception e) {
+    if (e instanceof QueryException) {
+      QueryException qe = (QueryException) e;
+      return newErrorResponse(qe.getErrorCode().getDefaultMessage(), 
e.getMessage());
     }
-    DataSchema dataSchema = deriveBrokerResponseDataSchema(_data);
-    ResultTable resultTable = new ResultTable(dataSchema, 
deriveBrokerResponseRows(_data, dataSchema.getColumnNames()));
-    brokerResponse.setResultTable(resultTable);
-    return brokerResponse;
-  }
-
-  private List<Object[]> deriveBrokerResponseRows(Data data, String[] 
columnNames) {
-    List<Object[]> rows = new ArrayList<>();
-    if (columnNames.length == 0) {
-      return rows;
-    }
-    for (Value value : data.getResult()) {
-      Long[] ts = Arrays.stream(value.getValues()).map(entry -> (Long) 
entry[0]).toArray(Long[]::new);
-      Double[] values = Arrays.stream(value.getValues())
-          .map(entry -> entry[1] == null ? null : 
Double.valueOf(String.valueOf(entry[1])))
-          .toArray(Double[]::new);
-
-      Object[] row = new Object[columnNames.length];
-      int index = 0;
-      for (String columnName : columnNames) {
-        if ("ts".equals(columnName)) {
-          row[index] = ts;
-        } else if ("values".equals(columnName)) {
-          row[index] = values;
-        } else {
-          row[index] = value.getMetric().getOrDefault(columnName, null);
-        }
-        index++;
-      }
-      rows.add(row);
-    }
-    return rows;
-  }
-
-  private DataSchema deriveBrokerResponseDataSchema(Data data) {
-    List<String> columnNames = new ArrayList<>(List.of("ts", "values", 
"__name__"));
-    List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(List.of(
-        DataSchema.ColumnDataType.LONG_ARRAY, 
DataSchema.ColumnDataType.DOUBLE_ARRAY));
-    if (!data.getResult().isEmpty()) {
-      data.getResult().get(0).getMetric().forEach((key, value) -> {
-        columnNames.add(key);
-        columnTypes.add(DataSchema.ColumnDataType.STRING);
-      });
-    }
-    return new DataSchema(columnNames.toArray(new String[0]), 
columnTypes.toArray(new DataSchema.ColumnDataType[0]));
+    return newErrorResponse(e.getClass().getSimpleName(), e.getMessage());
   }
 
   private static PinotBrokerTimeSeriesResponse 
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
new file mode 100644
index 00000000000..714dd264456
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.mapper;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+public class TimeSeriesResponseMapper {
+
+  private static final String TS_COLUMN = "ts";
+  private static final String VALUES_COLUMN = "values";
+  private static final String NAME_COLUMN = "__name__";
+
+  private TimeSeriesResponseMapper() {
+  }
+
+  /**
+   * Creates a BrokerResponseNativeV2 from a TimeSeriesBlock.
+   * This method converts the time series data into a format compatible with 
the broker response.
+   */
+  public static BrokerResponse toBrokerResponse(TimeSeriesBlock 
timeSeriesBlock) {
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+    if (timeSeriesBlock == null) {
+      throw new IllegalArgumentException("timeSeriesBlock must not be null");
+    }
+    if (timeSeriesBlock.getTimeBuckets() == null) {
+      throw new UnsupportedOperationException("Non-bucketed series block not 
supported yet");
+    }
+    // Convert TimeSeriesBlock to ResultTable format
+    DataSchema dataSchema = 
deriveDataSchemaFromTimeSeriesBlock(timeSeriesBlock);
+    List<Object[]> rows = deriveRowsFromTimeSeriesBlock(timeSeriesBlock, 
dataSchema.getColumnNames());
+
+    ResultTable resultTable = new ResultTable(dataSchema, rows);
+    brokerResponse.setResultTable(resultTable);
+    return brokerResponse;
+  }
+
+  public static BrokerResponse toBrokerResponse(QueryException e) {
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+    brokerResponse.addException(new QueryProcessingException(e.getErrorCode(), 
e.getMessage()));
+    return brokerResponse;
+  }
+
+  private static DataSchema 
deriveDataSchemaFromTimeSeriesBlock(TimeSeriesBlock timeSeriesBlock) {
+    List<String> columnNames = new ArrayList<>(List.of(TS_COLUMN, 
VALUES_COLUMN, NAME_COLUMN));
+    List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>(List.of(
+      DataSchema.ColumnDataType.LONG_ARRAY, 
DataSchema.ColumnDataType.DOUBLE_ARRAY,
+      DataSchema.ColumnDataType.STRING));
+
+    // Add tag columns if any series exist
+    if (!timeSeriesBlock.getSeriesMap().isEmpty()) {
+      // Get the first series to determine tag columns
+      TimeSeries firstSeries = 
timeSeriesBlock.getSeriesMap().values().iterator().next().get(0);
+      firstSeries.getTagKeyValuesAsMap().forEach((key, value) -> {
+        if (!columnNames.contains(key)) {
+          columnNames.add(key);
+          columnTypes.add(DataSchema.ColumnDataType.STRING);
+        }
+      });
+    }
+
+    return new DataSchema(columnNames.toArray(new String[0]),
+      columnTypes.toArray(new DataSchema.ColumnDataType[0]));
+  }
+
+  private static List<Object[]> deriveRowsFromTimeSeriesBlock(TimeSeriesBlock 
timeSeriesBlock,
+    String[] columnNames) {
+    List<Object[]> rows = new ArrayList<>();
+    if (columnNames.length == 0) {
+      return rows;
+    }
+
+    Long[] timeValues = timeSeriesBlock.getTimeBuckets().getTimeBuckets();
+    for (var listOfTimeSeries : timeSeriesBlock.getSeriesMap().values()) {
+      for (TimeSeries timeSeries : listOfTimeSeries) {
+        Object[] row = new Object[columnNames.length];
+        int index = 0;
+
+        for (String columnName : columnNames) {
+          if (TS_COLUMN.equals(columnName)) {
+            row[index] = timeValues;
+          } else if (VALUES_COLUMN.equals(columnName)) {
+            Double[] values = new Double[timeValues.length];
+            for (int i = 0; i < timeValues.length; i++) {
+              Object nullableValue = timeSeries.getDoubleValues()[i];
+              values[i] = nullableValue == null ? null : 
Double.valueOf(String.valueOf(nullableValue));
+            }
+            row[index] = values;
+          } else if (NAME_COLUMN.equals(columnName)) {
+            row[index] = timeSeries.getTagsSerialized();
+          } else {
+            row[index] = 
timeSeries.getTagKeyValuesAsMap().getOrDefault(columnName, null);
+          }
+          index++;
+        }
+        rows.add(row);
+      }
+    }
+    return rows;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
new file mode 100644
index 00000000000..49afff1df7d
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapperTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.mapper;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesResponseMapperTest {
+
+  @Mock TimeSeriesBlock _block;
+  @Mock TimeBuckets _timeBuckets;
+
+  @BeforeTest
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void toBrokerResponseNullBlockThrows() {
+    TimeSeriesResponseMapper.toBrokerResponse((TimeSeriesBlock) null);
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void toBrokerResponseNonBucketedThrows() {
+    when(_block.getTimeBuckets()).thenReturn(null);
+    TimeSeriesResponseMapper.toBrokerResponse(_block);
+  }
+
+  @Test
+  public void toBrokerResponseWithException() {
+    BrokerResponseNativeV2 resp = (BrokerResponseNativeV2) 
TimeSeriesResponseMapper.toBrokerResponse(
+      new QueryException(QueryErrorCode.INTERNAL, "time series exception"));
+    List<QueryProcessingException> exceptions = resp.getExceptions();
+    assertEquals(exceptions.size(), 1);
+    assertEquals(exceptions.get(0).getErrorCode(), 
QueryErrorCode.INTERNAL.getId());
+  }
+
+  @Test
+  public void toBrokerResponseWithEmptySeriesEmptyRowsBaseSchema() {
+    when(_block.getSeriesMap()).thenReturn(Collections.emptyMap());
+    when(_timeBuckets.getTimeBuckets()).thenReturn(new Long[]{100L, 200L, 
300L});
+    when(_block.getTimeBuckets()).thenReturn(_timeBuckets);
+
+    BrokerResponseNativeV2 resp = (BrokerResponseNativeV2) 
TimeSeriesResponseMapper.toBrokerResponse(_block);
+    ResultTable table = resp.getResultTable();
+    assertNotNull(table);
+
+    DataSchema schema = table.getDataSchema();
+    assertNotNull(schema);
+    assertEqualsNoOrder(schema.getColumnNames(), new String[]{"ts", "values", 
"__name__"});
+    assertEquals(schema.getColumnDataTypes()[0], 
DataSchema.ColumnDataType.LONG_ARRAY);
+    assertEquals(schema.getColumnDataTypes()[1], 
DataSchema.ColumnDataType.DOUBLE_ARRAY);
+    assertEquals(schema.getColumnDataTypes()[2], 
DataSchema.ColumnDataType.STRING);
+
+    assertTrue(table.getRows().isEmpty(), "Expected no rows for empty 
seriesMap");
+  }
+
+  @Test
+  public void toBrokerResponseForSchemaAndTagsOnlyFromFirstSeries() {
+    when(_timeBuckets.getTimeBuckets()).thenReturn(new Long[]{10L, 20L});
+    when(_block.getTimeBuckets()).thenReturn(_timeBuckets);
+    // first series decides tag columns: region, host
+    TimeSeries s1 = new TimeSeries(
+      "id1",
+      null,
+      _timeBuckets,
+      new Double[]{1.0, 2.0},
+      Arrays.asList("region", "host"),
+      new Object[]{"us-west", "h1"}
+    );
+    // another list under same metric with different extra tag "zone"
+    TimeSeries s2 = new TimeSeries(
+      "id2",
+      null,
+      _timeBuckets,
+      new Double[]{3.0, 4.5}, // mixed types: Integer and String
+      Arrays.asList("region", "zone"),
+      new Object[]{"us-west", "z1"}
+    );
+
+    Map<Long, List<TimeSeries>> seriesMap = new LinkedHashMap<>();
+    seriesMap.put(123456789L, Arrays.asList(s1, s2));
+    when(_block.getSeriesMap()).thenReturn(seriesMap);
+
+    BrokerResponseNativeV2 resp = (BrokerResponseNativeV2) 
TimeSeriesResponseMapper.toBrokerResponse(_block);
+    ResultTable table = resp.getResultTable();
+    DataSchema schema = table.getDataSchema();
+
+    // schema should include base + first-series tags (region, host), NOT 
"zone"
+    List<String> schemaColumns = Arrays.asList(schema.getColumnNames());
+    assertEqualsNoOrder(schemaColumns, Arrays.asList("ts", "values", 
"__name__", "region", "host"));
+    int regionIndex = schemaColumns.indexOf("region");
+    int hostIndex = schemaColumns.indexOf("host");
+
+    // verify rows: two rows, tag values present per series
+    List<Object[]> rows = table.getRows();
+    assertEquals(rows.size(), 2);
+
+    // time buckets
+    Object[] r1 = rows.get(0);
+    assertTrue(r1[0] instanceof Long[]);
+    assertEquals(((Long[]) r1[0]).length, 2);
+
+    assertEquals(r1[1], new Double[]{1.0, 2.0});
+    // __name__
+    assertEquals(r1[2], "region=us-west,host=h1");
+    // region & host
+    assertEquals(r1[regionIndex], "us-west");
+    assertEquals(r1[hostIndex], "h1");
+
+    // Row 2 (s2)
+    Object[] r2 = rows.get(1);
+    // values conversion covers Integer and String -> Double
+    assertEquals(r2[1], new Double[]{3.0, 4.5});
+    // __name__
+    assertEquals(r2[2], "region=us-west,zone=z1");
+    // region present, host missing (null), "zone" absent from schema
+    assertEquals(r2[regionIndex], "us-west");
+    assertNull(r2[hostIndex]);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index c7e4bd5c9ba..2812a7806a9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -55,7 +55,6 @@ import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.failuredetector.FailureDetector;
 import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -698,18 +697,7 @@ public class QueryDispatcher {
     _executorService.shutdown();
   }
 
-  public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext context, 
TimeSeriesDispatchablePlan plan,
-      long timeoutMs, Map<String, String> queryOptions) {
-    long requestId = context.getRequestId();
-    try {
-      TimeSeriesBlock result = submitAndGet(requestId, plan, timeoutMs, 
queryOptions, context);
-      return PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(result);
-    } catch (Throwable t) {
-      return 
PinotBrokerTimeSeriesResponse.newErrorResponse(t.getClass().getSimpleName(), 
t.getMessage());
-    }
-  }
-
-  TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan 
plan, long timeoutMs,
+  public TimeSeriesBlock submitAndGet(long requestId, 
TimeSeriesDispatchablePlan plan, long timeoutMs,
       Map<String, String> queryOptions, RequestContext requestContext)
       throws Exception {
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
index a92f5b1be69..592f685afe4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java
@@ -32,6 +32,7 @@ public enum QueryErrorCode {
   JSON_PARSING(100, "JsonParsingError"),
   /// Error detected at parsing time. For example, syntax error.
   SQL_PARSING(150, "SQLParsingError"),
+  TIMESERIES_PARSING(155, "TimeseriesParsingError"),
   SQL_RUNTIME(160, "SQLRuntimeError"),
   ACCESS_DENIED(180, "AccessDenied"),
   TABLE_DOES_NOT_EXIST(190, "TableDoesNotExistError"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to