ankitsultana commented on code in PR #16286:
URL: https://github.com/apache/pinot/pull/16286#discussion_r2196641190
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##########
@@ -483,8 +475,110 @@ private static StreamingOutput
constructQueryExceptionResponse(QueryErrorCode er
};
}
+ private StreamingOutput executeTimeSeriesQueryCatching(HttpHeaders
httpHeaders, String language, String query,
+ String start, String end, String step) {
+ try {
+ return executeTimeSeriesQuery(httpHeaders, language, query, start, end,
step);
+ } catch (ProcessingException pe) {
+ LOGGER.error("Caught exception while processing timeseries request {}",
pe.getMessage());
+ return
constructQueryExceptionResponse(QueryErrorCode.fromErrorCode(pe.getErrorCode()),
pe.getMessage());
+ } catch (QueryException ex) {
+ LOGGER.warn("Caught exception while processing timeseries request {}",
ex.getMessage());
+ return constructQueryExceptionResponse(ex.getErrorCode(),
ex.getMessage());
+ } catch (WebApplicationException wae) {
+ LOGGER.error("Caught exception while processing timeseries request",
wae);
+ throw wae;
+ } catch (Exception e) {
+ LOGGER.error("Caught unknown exception while processing timeseries
request", e);
+ return constructQueryExceptionResponse(QueryErrorCode.INTERNAL,
e.getMessage());
+ }
+ }
+
+ private StreamingOutput executeTimeSeriesQuery(HttpHeaders httpHeaders,
String language, String query,
+ String start, String end, String step) throws Exception {
+ LOGGER.debug("Language: {}, Query: {}, Start: {}, End: {}, Step: {}",
language, query, start, end, step);
+
+ // Get available broker instances for timeseries queries
+ List<String> instanceIds =
_pinotHelixResourceManager.getAllBrokerInstances();
+ if (instanceIds.isEmpty()) {
+ throw QueryErrorCode.BROKER_INSTANCE_MISSING.asException("No online
broker found for timeseries query");
+ }
+
+ String instanceId = selectRandomInstanceId(instanceIds);
+ return sendTimeSeriesRequestToBroker(language, query, start, end, step,
instanceId, httpHeaders);
+ }
+
+ private StreamingOutput sendTimeSeriesRequestToBroker(String language,
String query, String start, String end,
+ String step, String instanceId, HttpHeaders httpHeaders) {
+ String hostName = getHost(instanceId);
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ int port = getPort(instanceId);
+ String url = getTimeSeriesQueryURL(protocol, hostName, port, language,
query, start, end, step);
+
+ // Forward client-supplied headers
+ Map<String, String> headers = extractHeaders(httpHeaders);
+
+ return sendRequestRaw(url, "GET", query, JsonUtils.newObjectNode(),
headers);
+ }
+
+ private String getTimeSeriesQueryURL(String protocol, String hostName, int
port, String language, String query,
+ String start, String end, String step) {
+ try {
+ URIBuilder uriBuilder = new URIBuilder()
Review Comment:
Pinot style is to inline as much as you can. e.g.
```
new URIBuilder().setScheme(protocol).setHost(hostName)..
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##########
@@ -415,47 +407,47 @@ private String getQueryURL(String protocol, String
hostName, int port) {
return String.format("%s://%s:%d/query/sql", protocol, hostName, port);
}
- public void sendPostRaw(String urlStr, String requestStr, Map<String,
String> headers, OutputStream outputStream) {
+ public void sendRequestRaw(String urlStr, String method, String requestStr,
Map<String, String> headers,
+ OutputStream outputStream) {
HttpURLConnection conn = null;
try {
- LOGGER.info("url string passed is : {}", urlStr);
+ LOGGER.debug("Sending {} request to: {}", method, urlStr);
Review Comment:
why change this? This is Controller API so info should be safe
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##########
@@ -372,31 +379,16 @@ private List<String>
findCommonBrokerInstances(Set<String> brokerTenants) {
private StreamingOutput sendRequestToBroker(String query, String instanceId,
String traceEnabled, String queryOptions,
HttpHeaders httpHeaders) {
- InstanceConfig instanceConfig =
_pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
- if (instanceConfig == null) {
- LOGGER.error("Instance {} not found", instanceId);
- throw QueryErrorCode.INTERNAL.asException();
- }
-
- String hostName = instanceConfig.getHostName();
- // Backward-compatible with legacy hostname of format 'Broker_<hostname>'
- if (hostName.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
- hostName =
hostName.substring(CommonConstants.Helix.BROKER_INSTANCE_PREFIX_LENGTH);
- }
-
+ String hostName = getHost(instanceId);
String protocol = _controllerConf.getControllerBrokerProtocol();
- int port = _controllerConf.getControllerBrokerPortOverride() > 0 ?
_controllerConf.getControllerBrokerPortOverride()
- : Integer.parseInt(instanceConfig.getPort());
+ int port = getPort(instanceId);
Review Comment:
nit: this doubles the number of getHelixInstanceConfig calls. Maybe we can
make one call to get instance config, and then pass it on for hostname and port
extraction?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]