This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c7a8f5c3b59 [timeseries] Added TimeSeriesBlock based exception 
propagation (#17440)
c7a8f5c3b59 is described below

commit c7a8f5c3b59bff50b712cc22f682d68ea8470fb0
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Jan 5 14:12:28 2026 -0800

    [timeseries] Added TimeSeriesBlock based exception propagation (#17440)
    
    * [timeseries] Added TimeSeriesBlock based exception propagation
    
    * Addressed comments
    
    * Prometheus API should error out on any exception
    
    ---------
    
    Co-authored-by: shauryachats <[email protected]>
---
 .../response/PinotBrokerTimeSeriesResponse.java    | 16 +++++++---
 .../tests/TimeSeriesIntegrationTest.java           | 33 +++++++++++++++++++
 .../apache/pinot/query/runtime/QueryRunner.java    |  1 +
 .../runtime/timeseries/LeafTimeSeriesOperator.java | 31 ++++++++++++++----
 .../TimeSeriesExchangeReceiveOperator.java         | 10 ++++++
 .../timeseries/serde/TimeSeriesBlockSerde.java     | 37 ++++++++++++++++++++--
 6 files changed, 114 insertions(+), 14 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 08f09440afb..e55983143aa 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
@@ -62,7 +63,8 @@ public class PinotBrokerTimeSeriesResponse {
 
   @JsonCreator
   public PinotBrokerTimeSeriesResponse(@JsonProperty("status") String status, 
@JsonProperty("data") Data data,
-      @JsonProperty("errorType") String errorType, @JsonProperty("error") 
String error) {
+      @JsonProperty("errorType") String errorType, @JsonProperty("error") 
String error,
+      @JsonProperty("warnings") List<String> warnings) {
     _status = status;
     _data = data;
     _errorType = errorType;
@@ -91,21 +93,26 @@ public class PinotBrokerTimeSeriesResponse {
   }
 
   public static PinotBrokerTimeSeriesResponse newEmptyResponse() {
-    return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, Data.EMPTY, null, 
null);
+    return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, Data.EMPTY, null, 
null, null);
   }
 
   public static PinotBrokerTimeSeriesResponse newSuccessResponse(Data data) {
-    return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null);
+    return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null, 
null);
   }
 
   public static PinotBrokerTimeSeriesResponse newErrorResponse(String 
errorType, String errorMessage) {
-    return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY, 
errorType, errorMessage);
+    return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY, 
errorType, errorMessage, null);
   }
 
   public static PinotBrokerTimeSeriesResponse 
fromTimeSeriesBlock(TimeSeriesBlock seriesBlock) {
     if (seriesBlock.getTimeBuckets() == null) {
       throw new UnsupportedOperationException("Non-bucketed series block not 
supported yet");
     }
+    if (seriesBlock.getExceptions() != null && 
!seriesBlock.getExceptions().isEmpty()) {
+      QueryException firstException = seriesBlock.getExceptions().get(0);
+      return 
newErrorResponse(firstException.getErrorCode().getDefaultMessage(), 
seriesBlock.getExceptions().stream()
+          .map(QueryException::toString).collect(Collectors.joining("; ")));
+    }
     return convertBucketedSeriesBlock(seriesBlock);
   }
 
@@ -120,6 +127,7 @@ public class PinotBrokerTimeSeriesResponse {
   private static PinotBrokerTimeSeriesResponse 
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
     Long[] timeValues = 
Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets();
     List<PinotBrokerTimeSeriesResponse.Value> result = new ArrayList<>();
+    List<String> warnings = null;
     for (var listOfTimeSeries : seriesBlock.getSeriesMap().values()) {
       Preconditions.checkState(!listOfTimeSeries.isEmpty(), "Received empty 
time-series");
       TimeSeries anySeries = listOfTimeSeries.get(0);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index d2e376f5751..d3aa6b62250 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -49,6 +49,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -206,6 +207,38 @@ public class TimeSeriesIntegrationTest extends 
BaseClusterIntegrationTest {
     assertEquals(statusCodeAndResponse.getRight(), "[\"m3ql\"]");
   }
 
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testQueryWithServerExceptionsForPrometheus() {
+    // JSON_MATCH on a column without JSON index triggers a QUERY_EXECUTION 
error.
+    String query = String.format(
+        "fetch{table=\"mytable_OFFLINE\",filter=\"JSON_MATCH(%s, 
'\\\"$.key=value\\\"')\",ts_column=\"%s\","
+            + "ts_unit=\"MILLISECONDS\",value=\"%s\"} | max{%s}",
+        DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
+    );
+    // Prometheus endpoint should throw an exception.
+    getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC, 
getHeaders());
+  }
+
+  @Test
+  public void testQueryWithServerExceptionsForBroker() {
+    // JSON_MATCH on a column without JSON index triggers a QUERY_EXECUTION 
error.
+    String query = String.format(
+      "fetch{table=\"mytable_OFFLINE\",filter=\"JSON_MATCH(%s, 
'\\\"$.key=value\\\"')\",ts_column=\"%s\","
+        + "ts_unit=\"MILLISECONDS\",value=\"%s\"} | max{%s}",
+      DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
+    );
+    JsonNode result = postTimeseriesQuery(getBrokerBaseApiUrl(), query, 
QUERY_START_TIME_SEC, QUERY_END_TIME_SEC,
+      getHeaders());
+    assertNotNull(result);
+
+    JsonNode exceptions = result.get("exceptions");
+    assertNotNull(exceptions);
+    assertFalse(exceptions.isEmpty());
+    JsonNode firstException = exceptions.get(0);
+    assertEquals(firstException.get("errorCode").asInt(), 200);
+    assertTrue(firstException.get("message").asText().contains("Cannot apply 
JSON_MATCH on column"));
+  }
+
   @DataProvider(name = "isBrokerResponseCompatible")
   public Object[][] isBrokerResponseCompatible() {
     return new Object[][]{
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d2d2e56f73a..54882329a82 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -406,6 +406,7 @@ public class QueryRunner {
             TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
             Map<String, String> metadataMap = new 
HashMap<>(seriesBlock.getMetadata());
             metadataMap.put(Response.MetadataKeys.TimeSeries.PLAN_ID, 
currentPlanId);
+            TimeSeriesBlockSerde.encodeExceptionsToMetadata(seriesBlock, 
metadataMap);
             Worker.TimeSeriesResponse response = 
Worker.TimeSeriesResponse.newBuilder()
                 
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
                 .putAllMetadata(metadataMap)
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index 83b82591d52..477658b40a0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.timeseries;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
@@ -29,6 +30,8 @@ import 
org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
@@ -54,13 +57,12 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
   public TimeSeriesBlock getNextBlock() {
     Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has 
not been initialized");
     InstanceResponseBlock instanceResponseBlock = 
_queryExecutor.execute(_request, _executorService);
-    assert instanceResponseBlock.getResultsBlock() instanceof 
GroupByResultsBlock;
-    _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries");
     if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
-      // TODO: Return error in the TimeSeriesBlock instead?
-      String message = 
instanceResponseBlock.getExceptions().values().iterator().next();
-      throw new RuntimeException("Error running time-series query: " + 
message);
+      return buildErrorBlock(instanceResponseBlock, "Error running time-series 
query");
     }
+    assert instanceResponseBlock.getResultsBlock() instanceof 
GroupByResultsBlock;
+    _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries");
+
     if (instanceResponseBlock.getResultsBlock() instanceof 
GroupByResultsBlock) {
       return 
TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
           (GroupByResultsBlock) instanceResponseBlock.getResultsBlock(), 
instanceResponseBlock.getResponseMetadata());
@@ -69,13 +71,28 @@ public class LeafTimeSeriesOperator extends 
BaseTimeSeriesOperator {
           (AggregationResultsBlock) instanceResponseBlock.getResultsBlock(),
           instanceResponseBlock.getResponseMetadata());
     } else if (instanceResponseBlock.getResultsBlock() == null) {
-      throw new IllegalStateException("Found null results block in time-series 
query");
+      return buildErrorBlock(instanceResponseBlock, "Found null results block 
in time-series query");
     } else {
-      throw new UnsupportedOperationException(
+      return buildErrorBlock(instanceResponseBlock,
           String.format("Unknown results block: %s", 
instanceResponseBlock.getResultsBlock().getClass().getName()));
     }
   }
 
+  private TimeSeriesBlock buildErrorBlock(InstanceResponseBlock 
instanceResponseBlock, String baseMessage) {
+    TimeSeriesBlock errorBlock = new 
TimeSeriesBlock(_context.getInitialTimeBuckets(), new java.util.HashMap<>(),
+        instanceResponseBlock.getResponseMetadata());
+    if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
+      for (Map.Entry<Integer, String> entry : 
instanceResponseBlock.getExceptions().entrySet()) {
+        QueryException qe = new 
QueryException(QueryErrorCode.fromErrorCode(entry.getKey()), entry.getValue());
+        errorBlock.addToExceptions(qe);
+      }
+    } else {
+      QueryException qe = new QueryException(QueryErrorCode.QUERY_EXECUTION, 
baseMessage);
+      errorBlock.addToExceptions(qe);
+    }
+    return errorBlock;
+  }
+
   @Override
   public String getExplainName() {
     return "TIME_SERIES_LEAF_STAGE_OPERATOR";
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index cb65215c374..ab18744a1a7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -121,6 +121,7 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
     TimeBuckets timeBuckets = null;
     Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
     Map<String, String> aggregatedStats = new HashMap<>();
+    List<QueryException> allExceptions = new ArrayList<>();
     QueryException timeoutException = null;
 
     for (int index = 0; index < _numServersQueried; index++) {
@@ -145,6 +146,8 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
           "Found unexpected object. This is a bug: %s", result.getClass());
       // Step-2: Init timeBuckets and ensure they are the same across all 
servers.
       TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+      // Merge exceptions from the received block
+      allExceptions.addAll(blockToMerge.getExceptions());
       if (timeBuckets == null) {
         timeBuckets = blockToMerge.getTimeBuckets();
       } else {
@@ -180,6 +183,8 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
       seriesMap.put(seriesHash, timeSeriesList);
     }
     TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets, seriesMap, 
aggregatedStats);
+    // Add all collected exceptions to the result block
+    allExceptions.forEach(resultBlock::addToExceptions);
     if (timeoutException != null) {
       resultBlock.addToExceptions(timeoutException);
     }
@@ -212,6 +217,7 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
     Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
     TimeBuckets timeBuckets = null;
     Map<String, String> aggregatedStats = new HashMap<>();
+    List<QueryException> allExceptions = new ArrayList<>();
     QueryException timeoutException = null;
     for (int index = 0; index < _numServersQueried; index++) {
       long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
@@ -233,6 +239,8 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
       Preconditions.checkState(result instanceof TimeSeriesBlock,
           "Found unexpected object. This is a bug: %s", result.getClass());
       TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+      // Merge exceptions from the received block
+      allExceptions.addAll(blockToMerge.getExceptions());
       if (timeBuckets == null) {
         timeBuckets = blockToMerge.getTimeBuckets();
       } else {
@@ -248,6 +256,8 @@ public class TimeSeriesExchangeReceiveOperator extends 
BaseTimeSeriesOperator {
     }
     Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange 
receive operator");
     TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets, 
timeSeriesMap, aggregatedStats);
+    // Add all collected exceptions to the result block
+    allExceptions.forEach(resultBlock::addToExceptions);
     if (timeoutException != null) {
       resultBlock.addToExceptions(timeoutException);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
index a4842a9cb44..37280f2b7cc 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -42,6 +42,9 @@ import 
org.apache.pinot.core.common.datablock.DataBlockBuilder;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
 import org.apache.pinot.query.runtime.blocks.SerializedDataBlock;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -83,6 +86,9 @@ public class TimeSeriesBlockSerde {
    */
   private static final String VALUES_COLUMN_NAME = "__ts_serde_values";
   private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
+  private static final String EXCEPTIONS_METADATA_KEY = "__ts_exceptions";
+  private static final String ERROR_CODE = "errorCode";
+  private static final String MESSAGE = "message";
 
   private TimeSeriesBlockSerde() {
   }
@@ -104,7 +110,17 @@ public class TimeSeriesBlockSerde {
       long seriesId = Long.parseLong(timeSeries.getId());
       seriesMap.computeIfAbsent(seriesId, x -> new 
ArrayList<>()).add(timeSeries);
     }
-    return new TimeSeriesBlock(timeBuckets, seriesMap, metadataMap);
+    TimeSeriesBlock block = new TimeSeriesBlock(timeBuckets, seriesMap, 
metadataMap);
+    if (metadataMap.containsKey(EXCEPTIONS_METADATA_KEY)) {
+      String exceptionsJson = metadataMap.get(EXCEPTIONS_METADATA_KEY);
+      List<Map<String, Object>> exceptionsList = 
JsonUtils.stringToObject(exceptionsJson, List.class);
+      for (Map<String, Object> exceptionData : exceptionsList) {
+        int errorCode = ((Number) exceptionData.get(ERROR_CODE)).intValue();
+        String message = (String) exceptionData.get(MESSAGE);
+        block.addToExceptions(new 
QueryException(QueryErrorCode.fromErrorCode(errorCode), message));
+      }
+    }
+    return block;
   }
 
   public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock 
timeSeriesBlock)
@@ -118,8 +134,23 @@ public class TimeSeriesBlockSerde {
         container.add(timeSeriesToRow(timeSeries, dataSchema));
       }
     }
-    RowHeapDataBlock transferableBlock = new RowHeapDataBlock(container, 
dataSchema);
-    return 
DataBlockUtils.toByteString(transferableBlock.asSerialized().getDataBlock());
+    RowHeapDataBlock rowHeapBlock = new RowHeapDataBlock(container, 
dataSchema);
+    return 
DataBlockUtils.toByteString(rowHeapBlock.asSerialized().getDataBlock());
+  }
+
+  public static void encodeExceptionsToMetadata(TimeSeriesBlock 
timeSeriesBlock, Map<String, String> metadataMap)
+    throws Exception {
+    List<QueryException> exceptions = timeSeriesBlock.getExceptions();
+    if (exceptions != null && !exceptions.isEmpty()) {
+      List<Map<String, Object>> exceptionsList = new ArrayList<>();
+      for (QueryException exception : exceptions) {
+        Map<String, Object> exceptionData = new HashMap<>();
+        exceptionData.put(ERROR_CODE, exception.getErrorCode().getId());
+        exceptionData.put(MESSAGE, exception.getMessage());
+        exceptionsList.add(exceptionData);
+      }
+      metadataMap.put(EXCEPTIONS_METADATA_KEY, 
JsonUtils.objectToString(exceptionsList));
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to