This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c7a8f5c3b59 [timeseries] Added TimeSeriesBlock based exception
propagation (#17440)
c7a8f5c3b59 is described below
commit c7a8f5c3b59bff50b712cc22f682d68ea8470fb0
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Jan 5 14:12:28 2026 -0800
[timeseries] Added TimeSeriesBlock based exception propagation (#17440)
* [timeseries] Added TimeSeriesBlock based exception propagation
* Addressed comments
* Prometheus API should error out on any exception
---------
Co-authored-by: shauryachats <[email protected]>
---
.../response/PinotBrokerTimeSeriesResponse.java | 16 +++++++---
.../tests/TimeSeriesIntegrationTest.java | 33 +++++++++++++++++++
.../apache/pinot/query/runtime/QueryRunner.java | 1 +
.../runtime/timeseries/LeafTimeSeriesOperator.java | 31 ++++++++++++++----
.../TimeSeriesExchangeReceiveOperator.java | 10 ++++++
.../timeseries/serde/TimeSeriesBlockSerde.java | 37 ++++++++++++++++++++--
6 files changed, 114 insertions(+), 14 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
index 08f09440afb..e55983143aa 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.tsdb.spi.series.TimeSeries;
@@ -62,7 +63,8 @@ public class PinotBrokerTimeSeriesResponse {
@JsonCreator
public PinotBrokerTimeSeriesResponse(@JsonProperty("status") String status,
@JsonProperty("data") Data data,
- @JsonProperty("errorType") String errorType, @JsonProperty("error")
String error) {
+ @JsonProperty("errorType") String errorType, @JsonProperty("error")
String error,
+ @JsonProperty("warnings") List<String> warnings) {
_status = status;
_data = data;
_errorType = errorType;
@@ -91,21 +93,26 @@ public class PinotBrokerTimeSeriesResponse {
}
public static PinotBrokerTimeSeriesResponse newEmptyResponse() {
- return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, Data.EMPTY, null,
null);
+ return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, Data.EMPTY, null,
null, null);
}
public static PinotBrokerTimeSeriesResponse newSuccessResponse(Data data) {
- return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null);
+ return new PinotBrokerTimeSeriesResponse(SUCCESS_STATUS, data, null, null,
null);
}
public static PinotBrokerTimeSeriesResponse newErrorResponse(String
errorType, String errorMessage) {
- return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY,
errorType, errorMessage);
+ return new PinotBrokerTimeSeriesResponse(ERROR_STATUS, Data.EMPTY,
errorType, errorMessage, null);
}
public static PinotBrokerTimeSeriesResponse
fromTimeSeriesBlock(TimeSeriesBlock seriesBlock) {
if (seriesBlock.getTimeBuckets() == null) {
throw new UnsupportedOperationException("Non-bucketed series block not
supported yet");
}
+ if (seriesBlock.getExceptions() != null &&
!seriesBlock.getExceptions().isEmpty()) {
+ QueryException firstException = seriesBlock.getExceptions().get(0);
+ return
newErrorResponse(firstException.getErrorCode().getDefaultMessage(),
seriesBlock.getExceptions().stream()
+ .map(QueryException::toString).collect(Collectors.joining("; ")));
+ }
return convertBucketedSeriesBlock(seriesBlock);
}
@@ -120,6 +127,7 @@ public class PinotBrokerTimeSeriesResponse {
private static PinotBrokerTimeSeriesResponse
convertBucketedSeriesBlock(TimeSeriesBlock seriesBlock) {
Long[] timeValues =
Objects.requireNonNull(seriesBlock.getTimeBuckets()).getTimeBuckets();
List<PinotBrokerTimeSeriesResponse.Value> result = new ArrayList<>();
+ List<String> warnings = null;
for (var listOfTimeSeries : seriesBlock.getSeriesMap().values()) {
Preconditions.checkState(!listOfTimeSeries.isEmpty(), "Received empty
time-series");
TimeSeries anySeries = listOfTimeSeries.get(0);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index d2e376f5751..d3aa6b62250 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -49,6 +49,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -206,6 +207,38 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
assertEquals(statusCodeAndResponse.getRight(), "[\"m3ql\"]");
}
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testQueryWithServerExceptionsForPrometheus() {
+ // JSON_MATCH on a column without JSON index triggers a QUERY_EXECUTION
error.
+ String query = String.format(
+ "fetch{table=\"mytable_OFFLINE\",filter=\"JSON_MATCH(%s,
'\\\"$.key=value\\\"')\",ts_column=\"%s\","
+ + "ts_unit=\"MILLISECONDS\",value=\"%s\"} | max{%s}",
+ DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
+ );
+ // Prometheus endpoint should throw an exception.
+ getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC,
getHeaders());
+ }
+
+ @Test
+ public void testQueryWithServerExceptionsForBroker() {
+ // JSON_MATCH on a column without JSON index triggers a QUERY_EXECUTION
error.
+ String query = String.format(
+ "fetch{table=\"mytable_OFFLINE\",filter=\"JSON_MATCH(%s,
'\\\"$.key=value\\\"')\",ts_column=\"%s\","
+ + "ts_unit=\"MILLISECONDS\",value=\"%s\"} | max{%s}",
+ DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
+ );
+ JsonNode result = postTimeseriesQuery(getBrokerBaseApiUrl(), query,
QUERY_START_TIME_SEC, QUERY_END_TIME_SEC,
+ getHeaders());
+ assertNotNull(result);
+
+ JsonNode exceptions = result.get("exceptions");
+ assertNotNull(exceptions);
+ assertFalse(exceptions.isEmpty());
+ JsonNode firstException = exceptions.get(0);
+ assertEquals(firstException.get("errorCode").asInt(), 200);
+ assertTrue(firstException.get("message").asText().contains("Cannot apply
JSON_MATCH on column"));
+ }
+
@DataProvider(name = "isBrokerResponseCompatible")
public Object[][] isBrokerResponseCompatible() {
return new Object[][]{
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d2d2e56f73a..54882329a82 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -406,6 +406,7 @@ public class QueryRunner {
TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
Map<String, String> metadataMap = new
HashMap<>(seriesBlock.getMetadata());
metadataMap.put(Response.MetadataKeys.TimeSeries.PLAN_ID,
currentPlanId);
+ TimeSeriesBlockSerde.encodeExceptionsToMetadata(seriesBlock,
metadataMap);
Worker.TimeSeriesResponse response =
Worker.TimeSeriesResponse.newBuilder()
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
.putAllMetadata(metadataMap)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index 83b82591d52..477658b40a0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.timeseries;
import com.google.common.base.Preconditions;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.collections4.MapUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
@@ -29,6 +30,8 @@ import
org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -54,13 +57,12 @@ public class LeafTimeSeriesOperator extends
BaseTimeSeriesOperator {
public TimeSeriesBlock getNextBlock() {
Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has
not been initialized");
InstanceResponseBlock instanceResponseBlock =
_queryExecutor.execute(_request, _executorService);
- assert instanceResponseBlock.getResultsBlock() instanceof
GroupByResultsBlock;
- _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries");
if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
- // TODO: Return error in the TimeSeriesBlock instead?
- String message =
instanceResponseBlock.getExceptions().values().iterator().next();
- throw new RuntimeException("Error running time-series query: " +
message);
+ return buildErrorBlock(instanceResponseBlock, "Error running time-series
query");
}
+ assert instanceResponseBlock.getResultsBlock() instanceof
GroupByResultsBlock;
+ _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries");
+
if (instanceResponseBlock.getResultsBlock() instanceof
GroupByResultsBlock) {
return
TimeSeriesOperatorUtils.buildTimeSeriesBlock(_context.getInitialTimeBuckets(),
(GroupByResultsBlock) instanceResponseBlock.getResultsBlock(),
instanceResponseBlock.getResponseMetadata());
@@ -69,13 +71,28 @@ public class LeafTimeSeriesOperator extends
BaseTimeSeriesOperator {
(AggregationResultsBlock) instanceResponseBlock.getResultsBlock(),
instanceResponseBlock.getResponseMetadata());
} else if (instanceResponseBlock.getResultsBlock() == null) {
- throw new IllegalStateException("Found null results block in time-series
query");
+ return buildErrorBlock(instanceResponseBlock, "Found null results block
in time-series query");
} else {
- throw new UnsupportedOperationException(
+ return buildErrorBlock(instanceResponseBlock,
String.format("Unknown results block: %s",
instanceResponseBlock.getResultsBlock().getClass().getName()));
}
}
+ private TimeSeriesBlock buildErrorBlock(InstanceResponseBlock
instanceResponseBlock, String baseMessage) {
+ TimeSeriesBlock errorBlock = new
TimeSeriesBlock(_context.getInitialTimeBuckets(), new java.util.HashMap<>(),
+ instanceResponseBlock.getResponseMetadata());
+ if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
+ for (Map.Entry<Integer, String> entry :
instanceResponseBlock.getExceptions().entrySet()) {
+ QueryException qe = new
QueryException(QueryErrorCode.fromErrorCode(entry.getKey()), entry.getValue());
+ errorBlock.addToExceptions(qe);
+ }
+ } else {
+ QueryException qe = new QueryException(QueryErrorCode.QUERY_EXECUTION,
baseMessage);
+ errorBlock.addToExceptions(qe);
+ }
+ return errorBlock;
+ }
+
@Override
public String getExplainName() {
return "TIME_SERIES_LEAF_STAGE_OPERATOR";
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
index cb65215c374..ab18744a1a7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -121,6 +121,7 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
TimeBuckets timeBuckets = null;
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>();
Map<String, String> aggregatedStats = new HashMap<>();
+ List<QueryException> allExceptions = new ArrayList<>();
QueryException timeoutException = null;
for (int index = 0; index < _numServersQueried; index++) {
@@ -145,6 +146,8 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
"Found unexpected object. This is a bug: %s", result.getClass());
// Step-2: Init timeBuckets and ensure they are the same across all
servers.
TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+ // Merge exceptions from the received block
+ allExceptions.addAll(blockToMerge.getExceptions());
if (timeBuckets == null) {
timeBuckets = blockToMerge.getTimeBuckets();
} else {
@@ -180,6 +183,8 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
seriesMap.put(seriesHash, timeSeriesList);
}
TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets, seriesMap,
aggregatedStats);
+ // Add all collected exceptions to the result block
+ allExceptions.forEach(resultBlock::addToExceptions);
if (timeoutException != null) {
resultBlock.addToExceptions(timeoutException);
}
@@ -212,6 +217,7 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>();
TimeBuckets timeBuckets = null;
Map<String, String> aggregatedStats = new HashMap<>();
+ List<QueryException> allExceptions = new ArrayList<>();
QueryException timeoutException = null;
for (int index = 0; index < _numServersQueried; index++) {
long remainingTimeMs = _deadlineMs - System.currentTimeMillis();
@@ -233,6 +239,8 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
Preconditions.checkState(result instanceof TimeSeriesBlock,
"Found unexpected object. This is a bug: %s", result.getClass());
TimeSeriesBlock blockToMerge = (TimeSeriesBlock) result;
+ // Merge exceptions from the received block
+ allExceptions.addAll(blockToMerge.getExceptions());
if (timeBuckets == null) {
timeBuckets = blockToMerge.getTimeBuckets();
} else {
@@ -248,6 +256,8 @@ public class TimeSeriesExchangeReceiveOperator extends
BaseTimeSeriesOperator {
}
Preconditions.checkNotNull(timeBuckets, "Time buckets is null in exchange
receive operator");
TimeSeriesBlock resultBlock = new TimeSeriesBlock(timeBuckets,
timeSeriesMap, aggregatedStats);
+ // Add all collected exceptions to the result block
+ allExceptions.forEach(resultBlock::addToExceptions);
if (timeoutException != null) {
resultBlock.addToExceptions(timeoutException);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
index a4842a9cb44..37280f2b7cc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
@@ -42,6 +42,9 @@ import
org.apache.pinot.core.common.datablock.DataBlockBuilder;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
import org.apache.pinot.query.runtime.blocks.SerializedDataBlock;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.series.TimeSeries;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -83,6 +86,9 @@ public class TimeSeriesBlockSerde {
*/
private static final String VALUES_COLUMN_NAME = "__ts_serde_values";
private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
+ private static final String EXCEPTIONS_METADATA_KEY = "__ts_exceptions";
+ private static final String ERROR_CODE = "errorCode";
+ private static final String MESSAGE = "message";
private TimeSeriesBlockSerde() {
}
@@ -104,7 +110,17 @@ public class TimeSeriesBlockSerde {
long seriesId = Long.parseLong(timeSeries.getId());
seriesMap.computeIfAbsent(seriesId, x -> new
ArrayList<>()).add(timeSeries);
}
- return new TimeSeriesBlock(timeBuckets, seriesMap, metadataMap);
+ TimeSeriesBlock block = new TimeSeriesBlock(timeBuckets, seriesMap,
metadataMap);
+ if (metadataMap.containsKey(EXCEPTIONS_METADATA_KEY)) {
+ String exceptionsJson = metadataMap.get(EXCEPTIONS_METADATA_KEY);
+ List<Map<String, Object>> exceptionsList =
JsonUtils.stringToObject(exceptionsJson, List.class);
+ for (Map<String, Object> exceptionData : exceptionsList) {
+ int errorCode = ((Number) exceptionData.get(ERROR_CODE)).intValue();
+ String message = (String) exceptionData.get(MESSAGE);
+ block.addToExceptions(new
QueryException(QueryErrorCode.fromErrorCode(errorCode), message));
+ }
+ }
+ return block;
}
public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock
timeSeriesBlock)
@@ -118,8 +134,23 @@ public class TimeSeriesBlockSerde {
container.add(timeSeriesToRow(timeSeries, dataSchema));
}
}
- RowHeapDataBlock transferableBlock = new RowHeapDataBlock(container,
dataSchema);
- return
DataBlockUtils.toByteString(transferableBlock.asSerialized().getDataBlock());
+ RowHeapDataBlock rowHeapBlock = new RowHeapDataBlock(container,
dataSchema);
+ return
DataBlockUtils.toByteString(rowHeapBlock.asSerialized().getDataBlock());
+ }
+
+ public static void encodeExceptionsToMetadata(TimeSeriesBlock
timeSeriesBlock, Map<String, String> metadataMap)
+ throws Exception {
+ List<QueryException> exceptions = timeSeriesBlock.getExceptions();
+ if (exceptions != null && !exceptions.isEmpty()) {
+ List<Map<String, Object>> exceptionsList = new ArrayList<>();
+ for (QueryException exception : exceptions) {
+ Map<String, Object> exceptionData = new HashMap<>();
+ exceptionData.put(ERROR_CODE, exception.getErrorCode().getId());
+ exceptionData.put(MESSAGE, exception.getMessage());
+ exceptionsList.add(exceptionData);
+ }
+ metadataMap.put(EXCEPTIONS_METADATA_KEY,
JsonUtils.objectToString(exceptionsList));
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]