ankitsultana commented on code in PR #16286:
URL: https://github.com/apache/pinot/pull/16286#discussion_r2198749493
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##########
@@ -483,8 +476,103 @@ private static StreamingOutput
constructQueryExceptionResponse(QueryErrorCode er
};
}
+ private StreamingOutput executeTimeSeriesQueryCatching(HttpHeaders
httpHeaders, String language, String query,
+ String start, String end, String step) {
+ try {
+ return executeTimeSeriesQuery(httpHeaders, language, query, start, end,
step);
+ } catch (ProcessingException pe) {
+ LOGGER.error("Caught exception while processing timeseries request {}",
pe.getMessage());
+ return
constructQueryExceptionResponse(QueryErrorCode.fromErrorCode(pe.getErrorCode()),
pe.getMessage());
+ } catch (QueryException ex) {
+ LOGGER.warn("Caught exception while processing timeseries request {}",
ex.getMessage());
+ return constructQueryExceptionResponse(ex.getErrorCode(),
ex.getMessage());
+ } catch (WebApplicationException wae) {
+ LOGGER.error("Caught exception while processing timeseries request",
wae);
+ throw wae;
+ } catch (Exception e) {
+ LOGGER.error("Caught unknown exception while processing timeseries
request", e);
+ return constructQueryExceptionResponse(QueryErrorCode.INTERNAL,
e.getMessage());
+ }
+ }
+
+ private StreamingOutput executeTimeSeriesQuery(HttpHeaders httpHeaders,
String language, String query,
+ String start, String end, String step) throws Exception {
+ LOGGER.debug("Language: {}, Query: {}, Start: {}, End: {}, Step: {}",
language, query, start, end, step);
+
+ // Get available broker instances for timeseries queries
+ List<String> instanceIds =
_pinotHelixResourceManager.getAllBrokerInstances();
+ if (instanceIds.isEmpty()) {
+ throw QueryErrorCode.BROKER_INSTANCE_MISSING.asException("No online
broker found for timeseries query");
+ }
+
+ String instanceId = selectRandomInstanceId(instanceIds);
+ return sendTimeSeriesRequestToBroker(language, query, start, end, step,
instanceId, httpHeaders);
+ }
+
+ private StreamingOutput sendTimeSeriesRequestToBroker(String language,
String query, String start, String end,
+ String step, String instanceId, HttpHeaders httpHeaders) {
+ InstanceConfig instanceConfig = getInstanceConfig(instanceId);
+ String hostName = getHost(instanceConfig);
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ int port = getPort(instanceConfig);
+ String url = getTimeSeriesQueryURL(protocol, hostName, port, language,
query, start, end, step);
+
+ // Forward client-supplied headers
+ Map<String, String> headers = extractHeaders(httpHeaders);
+
+ return sendRequestRaw(url, "GET", query, JsonUtils.newObjectNode(),
headers);
+ }
+
+ private String getTimeSeriesQueryURL(String protocol, String hostName, int
port, String language, String query,
+ String start, String end, String step) {
+ try {
+ URIBuilder uriBuilder = new
URIBuilder().setScheme(protocol).setHost(hostName).setPort(port)
+ .setPath("/timeseries/api/v1/query_range").addParameter("language",
language);
+ // Add optional parameters
+ if (query != null && !query.isEmpty()) {
Review Comment:
timeout also needs to be passed. but you can add it in the next PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]