This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 82b7d2a6632 [timeseries] Adding support for query options (#17454)
82b7d2a6632 is described below
commit 82b7d2a663259c0819cf60501d2bcdf849ac3cef
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Mon Jan 5 19:01:24 2026 -0800
[timeseries] Adding support for query options (#17454)
* [timeseries] Adding support for query options for timeseries engine
* Fixed checkstyle
* Adding integration tests
* Fixed endpoint to handle queryOptions as map
* Verified that non-string query options work correctly
* Fixed comments
---------
Co-authored-by: shauryachats <[email protected]>
---
.../broker/api/resources/PinotClientRequest.java | 17 ++++++++++----
.../requesthandler/TimeSeriesRequestHandler.java | 18 +++++++++++++--
.../api/resources/PinotQueryResource.java | 26 +++++++++++++++++-----
.../pinot/integration/tests/ClusterTest.java | 17 ++++++++++++--
.../tests/TimeSeriesIntegrationTest.java | 21 +++++++++++++++++
.../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 1 +
.../pinot/tsdb/spi/RangeTimeSeriesRequest.java | 10 ++++++++-
7 files changed, 95 insertions(+), 15 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 4e7105a8f5e..b33369a3e94 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -325,14 +325,23 @@ public class PinotClientRequest {
@Path("query/timeseries")
@ApiOperation(value = "Query Pinot using the Time Series Engine")
@ManualAuthorization
- public void processTimeSeriesQueryEngine(Map<String, String> queryParams,
@Suspended AsyncResponse asyncResponse,
+ public void processTimeSeriesQueryEngine(JsonNode requestJson, @Suspended
AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestCtx, @Context
HttpHeaders httpHeaders) {
try {
- if (!queryParams.containsKey(Request.QUERY)) {
+ if (!requestJson.has(Request.QUERY)) {
throw new IllegalStateException("Payload is missing the query string
field 'query'");
}
- String language = queryParams.get(Request.LANGUAGE);
- String queryString = queryParams.get(Request.QUERY);
+ String language = requestJson.has(Request.LANGUAGE) ?
requestJson.get(Request.LANGUAGE).asText() : null;
+ String queryString = requestJson.get(Request.QUERY).asText();
+ Map<String, String> queryParams = new HashMap<>();
+ requestJson.fields().forEachRemaining(entry -> {
+ if (entry.getValue().isTextual()) {
+ queryParams.put(entry.getKey(), entry.getValue().asText());
+ } else {
+ queryParams.put(entry.getKey(), entry.getValue().toString());
+ }
+ });
+
try (RequestScope requestContext =
Tracing.getTracer().createRequestScope()) {
TimeSeriesBlock timeSeriesBlock = executeTimeSeriesQuery(language,
queryString, queryParams,
requestContext, makeHttpIdentity(requestCtx), httpHeaders);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 309a53dddc3..92cbb6285c2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.requesthandler;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import java.net.URI;
@@ -62,6 +63,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -201,7 +203,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
Long stepSeconds = getStepSeconds(mergedParams.get("step"));
Duration timeout = StringUtils.isNotBlank(mergedParams.get("timeout"))
? HumanReadableDuration.from(mergedParams.get("timeout")) :
Duration.ofMillis(_brokerTimeoutMs);
-
+ Map<String, String> queryOptions =
parseQueryOptionsFromJson(mergedParams.get("queryOptions"));
Preconditions.checkNotNull(query, "Query cannot be null");
Preconditions.checkNotNull(startTs, "Start time cannot be null");
Preconditions.checkNotNull(endTs, "End time cannot be null");
@@ -210,10 +212,22 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
return new RangeTimeSeriesRequest(language, query, startTs, endTs,
stepSeconds, timeout,
parseIntOrDefault(mergedParams.get("limit"),
RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT),
parseIntOrDefault(mergedParams.get("numGroupsLimit"),
RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT),
- queryParamString
+ queryParamString, queryOptions
);
}
+ private Map<String, String> parseQueryOptionsFromJson(String
queryOptionsJson) {
+ if (queryOptionsJson == null || queryOptionsJson.isEmpty()) {
+ return Map.of();
+ }
+ try {
+ return JsonUtils.stringToObject(queryOptionsJson, new TypeReference<>()
{ });
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse queryOptions JSON: {}", queryOptionsJson,
e);
+ return Map.of();
+ }
+ }
+
private Long parseLongSafe(String value) {
return value != null ? Long.parseLong(value) : null;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 2f1656b0bbc..3188693f84d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -33,6 +33,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -181,7 +182,14 @@ public class PinotQueryResource {
String end = requestJson.has("end") ? requestJson.get("end").asText() :
null;
String step = requestJson.has("step") ? requestJson.get("step").asText()
: null;
- return executeTimeSeriesQueryCatching(httpHeaders, language, query,
start, end, step, true);
+ Map<String, String> queryOptions = new HashMap<>();
+ if (requestJson.has("queryOptions") &&
requestJson.get("queryOptions").isObject()) {
+ requestJson.get("queryOptions").fields().forEachRemaining(entry -> {
+ queryOptions.put(entry.getKey(), entry.getValue().asText());
+ });
+ }
+
+ return executeTimeSeriesQueryCatching(httpHeaders, language, query,
start, end, step, queryOptions, true);
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST timeseries
request", e);
return constructQueryExceptionResponse(QueryErrorCode.INTERNAL,
e.getMessage());
@@ -687,16 +695,16 @@ public class PinotQueryResource {
private StreamingOutput executeTimeSeriesQueryCatching(HttpHeaders
httpHeaders, String language, String query,
String start, String end, String step) {
- return executeTimeSeriesQueryCatching(httpHeaders, language, query, start,
end, step, false);
+ return executeTimeSeriesQueryCatching(httpHeaders, language, query, start,
end, step, Map.of(), false);
}
private StreamingOutput executeTimeSeriesQueryCatching(HttpHeaders
httpHeaders, String language, String query,
- String start, String end, String step, boolean useBrokerCompatibleApi) {
+ String start, String end, String step, Map<String, String> queryOptions,
boolean useBrokerCompatibleApi) {
try {
LOGGER.debug("Language: {}, Query: {}, Start: {}, End: {}, Step: {},
UseBrokerAPI: {}",
language, query, start, end, step, useBrokerCompatibleApi);
String instanceId = retrieveBrokerForTimeSeriesQuery(query, language,
start, end);
- return sendTimeSeriesRequestToBroker(language, query, start, end, step,
instanceId, httpHeaders,
+ return sendTimeSeriesRequestToBroker(language, query, start, end, step,
queryOptions, instanceId, httpHeaders,
useBrokerCompatibleApi);
} catch (QueryException ex) {
LOGGER.warn("Caught exception while processing timeseries request {}",
ex.getMessage());
@@ -714,7 +722,7 @@ public class PinotQueryResource {
TimeSeriesLogicalPlanner planner =
TimeSeriesQueryEnvironment.buildLogicalPlanner(language, _controllerConf);
TimeSeriesLogicalPlanResult planResult = planner.plan(
new RangeTimeSeriesRequest(language, query, Integer.parseInt(start),
Long.parseLong(end),
- 60L, Duration.ofMinutes(1), 100, 100, ""),
+ 60L, Duration.ofMinutes(1), 100, 100, "", Map.of()),
new
TimeSeriesTableMetadataProvider(_pinotHelixResourceManager.getTableCache()));
String tableName = planner.getTableName(planResult);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
@@ -724,7 +732,8 @@ public class PinotQueryResource {
private StreamingOutput sendTimeSeriesRequestToBroker(String language,
String query, String start, String end,
- String step, String instanceId, HttpHeaders httpHeaders, boolean
useBrokerCompatibleApi) {
+ String step, Map<String, String> queryOptions, String instanceId,
HttpHeaders httpHeaders,
+ boolean useBrokerCompatibleApi) {
InstanceConfig instanceConfig = getInstanceConfig(instanceId);
String hostName = getHost(instanceConfig);
String protocol = _controllerConf.getControllerBrokerProtocol();
@@ -750,6 +759,11 @@ public class PinotQueryResource {
if (step != null && !step.isEmpty()) {
requestJson.put("step", step);
}
+ if (queryOptions != null && !queryOptions.isEmpty()) {
+ ObjectNode queryOptionsNode = JsonUtils.newObjectNode();
+ queryOptions.forEach(queryOptionsNode::put);
+ requestJson.set("queryOptions", queryOptionsNode);
+ }
return sendRequestRaw(url, "POST", query, requestJson, headers);
} else {
// Use GET /timeseries/api/v1/query_range endpoint (Prometheus
compatible API)
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 476003cff72..f166a05520f 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -582,9 +582,22 @@ public abstract class ClusterTest extends ControllerTest {
public JsonNode postTimeseriesQuery(String baseUrl, String query, long
startTime, long endTime,
Map<String, String> headers) {
+ return postTimeseriesQuery(baseUrl, query, startTime, endTime, headers,
null);
+ }
+
+ public JsonNode postTimeseriesQuery(String baseUrl, String query, long
startTime, long endTime,
+ Map<String, String> headers, Map<String, Object> queryOptions) {
try {
- Map<String, String> payload = Map.of("language", "m3ql", "query", query,
"start",
- String.valueOf(startTime), "end", String.valueOf(endTime));
+ ObjectNode payload = JsonUtils.newObjectNode();
+ payload.put("language", "m3ql");
+ payload.put("query", query);
+ payload.put("start", String.valueOf(startTime));
+ payload.put("end", String.valueOf(endTime));
+ if (queryOptions != null && !queryOptions.isEmpty()) {
+ ObjectNode queryOptionsNode = JsonUtils.newObjectNode();
+ queryOptions.forEach(queryOptionsNode::putPOJO);
+ payload.set("queryOptions", queryOptionsNode);
+ }
return JsonUtils.stringToJsonNode(
sendPostRequest(baseUrl + "/query/timeseries",
JsonUtils.objectToString(payload), headers));
} catch (Exception e) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index d3aa6b62250..13c9d6e8437 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -239,6 +239,27 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
assertTrue(firstException.get("message").asText().contains("Cannot apply
JSON_MATCH on column"));
}
+ @Test
+ public void testQueryOptionsNumGroupsLimit() {
+ String query = String.format(
+
"fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}"
+ + " | max{%s} | transformNull{0} | keepLastValue{}",
+ TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN
+ );
+ // This query would normally return 3 groups (one for each device OS:
windows, android, ios)
+ // With numGroupsLimit=1 query option, we expect only 1 group
+ JsonNode resultWithLimit = postTimeseriesQuery(getBrokerBaseApiUrl(),
query, QUERY_START_TIME_SEC,
+ QUERY_END_TIME_SEC, getHeaders(), Map.of("numGroupsLimit", 1,
"enableNullHandling", true));
+ assertNotNull(resultWithLimit);
+ assertEquals(resultWithLimit.path("numRowsResultSet").asInt(), 1,
+ "Expected only 1 group with numGroupsLimit=1 query option");
+
+ JsonNode resultTable = resultWithLimit.path("resultTable");
+ assertNotNull(resultTable);
+ JsonNode rows = resultTable.path("rows");
+ assertEquals(rows.size(), 1, "Expected only 1 row in result table with
numGroupsLimit=1");
+ }
+
@DataProvider(name = "isBrokerResponseCompatible")
public Object[][] isBrokerResponseCompatible() {
return new Object[][]{
diff --git
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index 57fc6ab8bc4..9bcd37e5f9a 100644
---
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -166,6 +166,7 @@ public class M3TimeSeriesPlanner implements
TimeSeriesLogicalPlanner {
if (request.getNumGroupsLimit() > 0) {
queryOptions.put("numGroupsLimit",
Integer.toString(request.getNumGroupsLimit()));
}
+ queryOptions.putAll(request.getQueryOptions());
return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn,
timeUnit, 0L, filter, valueExpr, aggInfo,
groupByColumns, request.getLimit(), queryOptions);
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
index 7a3511dbfd8..77a9fc1493d 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.tsdb.spi;
import com.google.common.base.Preconditions;
import java.time.Duration;
+import java.util.Map;
/**
@@ -72,9 +73,11 @@ public class RangeTimeSeriesRequest {
private final int _numGroupsLimit;
/** Full query string to allow language implementations to pass custom
parameters. */
private final String _fullQueryString;
+ /** Query options to allow language implementations to parse custom options.
*/
+ private final Map<String, String> _queryOptions;
public RangeTimeSeriesRequest(String language, String query, long
startSeconds, long endSeconds, long stepSeconds,
- Duration timeout, int limit, int numGroupsLimit, String fullQueryString)
{
+ Duration timeout, int limit, int numGroupsLimit, String fullQueryString,
Map<String, String> queryOptions) {
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range.
startSeconds "
+ "should be greater than or equal to endSeconds. Found
startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
@@ -87,6 +90,7 @@ public class RangeTimeSeriesRequest {
_limit = limit;
_numGroupsLimit = numGroupsLimit;
_fullQueryString = fullQueryString;
+ _queryOptions = queryOptions;
}
public String getLanguage() {
@@ -124,4 +128,8 @@ public class RangeTimeSeriesRequest {
public String getFullQueryString() {
return _fullQueryString;
}
+
+ public Map<String, String> getQueryOptions() {
+ return _queryOptions;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]