This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8318650f2f Add new broker query point for querying multi-stage engine
(#11341)
8318650f2f is described below
commit 8318650f2fad839dea3f5c19aede084cd7103102
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Aug 14 12:44:22 2023 -0700
Add new broker query point for querying multi-stage engine (#11341)
---
.../broker/api/resources/PinotClientRequest.java | 76 +++++++++++++++++++++-
.../MultiStageBrokerRequestHandler.java | 3 +-
.../client/JsonAsyncHttpPinotClientTransport.java | 58 ++++++-----------
.../JsonAsyncHttpPinotClientTransportFactory.java | 6 +-
.../tests/BaseClusterIntegrationTest.java | 13 +++-
.../tests/ClusterIntegrationTestUtils.java | 32 +++++----
.../pinot/integration/tests/ClusterTest.java | 27 +++++---
7 files changed, 148 insertions(+), 67 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 93d9bda03b..33de745c52 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.api.resources;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
@@ -169,6 +170,70 @@ public class PinotClientRequest {
}
}
+ @GET
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("query")
+ @ApiOperation(value = "Querying pinot using MultiStage Query Engine")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Query response"),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ @ManualAuthorization
+ public void processSqlWithMultiStageQueryEngineGet(
+ @ApiParam(value = "Query", required = true) @QueryParam("sql") String
query,
+ @Suspended AsyncResponse asyncResponse, @Context
org.glassfish.grizzly.http.server.Request requestContext,
+ @Context HttpHeaders httpHeaders) {
+ try {
+ ObjectNode requestJson = JsonUtils.newObjectNode();
+ requestJson.put(Request.SQL, query);
+ BrokerResponse brokerResponse =
+ executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true,
httpHeaders, true);
+ asyncResponse.resume(brokerResponse.toJsonString());
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing GET request", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
+ asyncResponse.resume(new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ @POST
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("query")
+ @ApiOperation(value = "Querying pinot using MultiStage Query Engine")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Query response"),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ @ManualAuthorization
+ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended
AsyncResponse asyncResponse,
+ @Context org.glassfish.grizzly.http.server.Request requestContext,
+ @Context HttpHeaders httpHeaders) {
+ try {
+ JsonNode requestJson = JsonUtils.stringToJsonNode(query);
+ if (!requestJson.has(Request.SQL)) {
+ throw new IllegalStateException("Payload is missing the query string
field 'sql'");
+ }
+ BrokerResponse brokerResponse =
+ executeSqlQuery((ObjectNode) requestJson,
makeHttpIdentity(requestContext), false, httpHeaders, true);
+ asyncResponse.resume(brokerResponse.toJsonString());
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing POST request", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+ asyncResponse.resume(
+ new WebApplicationException(e,
+ Response
+ .status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(e.getMessage())
+ .build()));
+ }
+ }
+
@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.CANCEL_QUERY)
@@ -185,7 +250,7 @@ public class PinotClientRequest {
@ApiParam(value = "Timeout for servers to respond the cancel request")
@QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting")
@QueryParam("verbose") @DefaultValue("false")
- boolean verbose) {
+ boolean verbose) {
try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor,
_httpConnMgr, serverResponses)) {
@@ -226,12 +291,21 @@ public class PinotClientRequest {
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson,
HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders)
throws Exception {
+ return executeSqlQuery(sqlRequestJson, httpRequesterIdentity, onlyDql,
httpHeaders, false);
+ }
+
+ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson,
HttpRequesterIdentity httpRequesterIdentity,
+ boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
+ throws Exception {
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions =
RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(),
sqlRequestJson);
} catch (Exception e) {
return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e));
}
+ if (forceUseMultiStage) {
+
sqlNodeAndOptions.setExtraOptions(ImmutableMap.of(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE,
"true"));
+ }
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 46c9af7c34..680145e3e8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -179,8 +179,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
boolean traceEnabled = Boolean.parseBoolean(
- request.has(CommonConstants.Broker.Request.TRACE) ?
request.get(CommonConstants.Broker.Request.TRACE).asText()
- : "false");
+
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE,
"false"));
ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index c87ae3fd80..2778dac075 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
-import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -61,6 +60,7 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<C
private final int _brokerReadTimeout;
private final AsyncHttpClient _httpClient;
private final String _extraOptionStr;
+ private final boolean _useMultiStageEngine;
public JsonAsyncHttpPinotClientTransport() {
_brokerReadTimeout = 60000;
@@ -68,15 +68,17 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<C
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
_httpClient =
Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
+ _useMultiStageEngine = false;
}
public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String
scheme, String extraOptionString,
- @Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts,
TlsProtocols tlsProtocols,
- @Nullable String appId) {
+ boolean useMultiStageEngine, @Nullable SSLContext sslContext,
ConnectionTimeouts connectionTimeouts,
+ TlsProtocols tlsProtocols, @Nullable String appId) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionString) ?
DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionString;
+ _useMultiStageEngine = useMultiStageEngine;
Builder builder = Dsl.config();
if (sslContext != null) {
@@ -92,28 +94,6 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<C
_httpClient = Dsl.asyncHttpClient(builder.build());
}
- public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String
scheme, String extraOptionStr,
- @Nullable SslContext sslContext, ConnectionTimeouts connectionTimeouts,
TlsProtocols tlsProtocols,
- @Nullable String appId) {
- _brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
- _headers = headers;
- _scheme = scheme;
- _extraOptionStr = StringUtils.isEmpty(extraOptionStr) ?
DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionStr;
-
- Builder builder = Dsl.config();
- if (sslContext != null) {
- builder.setSslContext(sslContext);
- }
-
- builder.setRequestTimeout(_brokerReadTimeout)
- .setReadTimeout(connectionTimeouts.getReadTimeoutMs())
- .setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
- .setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
- .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua",
appId))
- .setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new
String[0]));
- _httpClient = Dsl.asyncHttpClient(builder.build());
- }
-
@Override
public BrokerResponse executeQuery(String brokerAddress, String query)
throws PinotClientException {
@@ -131,7 +111,7 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<C
json.put("sql", query);
json.put("queryOptions", _extraOptionStr);
- String url = _scheme + "://" + brokerAddress + "/query/sql";
+ String url = String.format("%s://%s%s", _scheme, brokerAddress,
_useMultiStageEngine ? "/query" : "/query/sql");
BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);
if (_headers != null) {
@@ -139,21 +119,21 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<C
}
LOGGER.debug("Sending query {} to {}", query, url);
return requestBuilder.addHeader("Content-Type", "application/json;
charset=utf-8").setBody(json.toString())
- .execute().toCompletableFuture().thenApply(httpResponse -> {
- LOGGER.debug("Completed query, HTTP status is {}",
httpResponse.getStatusCode());
+ .execute().toCompletableFuture().thenApply(httpResponse -> {
+ LOGGER.debug("Completed query, HTTP status is {}",
httpResponse.getStatusCode());
- if (httpResponse.getStatusCode() != 200) {
- throw new PinotClientException(
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
"Pinot returned HTTP status " + httpResponse.getStatusCode()
+ ", expected 200");
- }
-
- String responseBody =
httpResponse.getResponseBody(StandardCharsets.UTF_8);
- try {
- return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
- } catch (JsonProcessingException e) {
- throw new CompletionException(e);
- }
- });
+ }
+
+ String responseBody =
httpResponse.getResponseBody(StandardCharsets.UTF_8);
+ try {
+ return
BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
+ } catch (JsonProcessingException e) {
+ throw new CompletionException(e);
+ }
+ });
} catch (Exception e) {
throw new PinotClientException(e);
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
index 0adb4e68d3..9539a5470b 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
@@ -45,14 +45,15 @@ public class JsonAsyncHttpPinotClientTransportFactory
implements PinotClientTran
private int _handshakeTimeoutMs =
Integer.parseInt(DEFAULT_BROKER_HANDSHAKE_TIMEOUT_MS);
private String _appId = null;
private String _extraOptionString;
+ private boolean _useMultiStageEngine;
@Override
public PinotClientTransport buildTransport() {
ConnectionTimeouts connectionTimeouts =
ConnectionTimeouts.create(_readTimeoutMs, _connectTimeoutMs,
_handshakeTimeoutMs);
TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(_tlsV10Enabled);
- return new JsonAsyncHttpPinotClientTransport(_headers, _scheme,
_extraOptionString, _sslContext, connectionTimeouts,
- tlsProtocols, _appId);
+ return new JsonAsyncHttpPinotClientTransport(_headers, _scheme,
_extraOptionString, _useMultiStageEngine,
+ _sslContext, connectionTimeouts, tlsProtocols, _appId);
}
public Map<String, String> getHeaders() {
@@ -103,6 +104,7 @@ public class JsonAsyncHttpPinotClientTransportFactory
implements PinotClientTran
System.getProperties().getProperty("broker.tlsV10Enabled",
DEFAULT_BROKER_TLS_V10_ENABLED));
_extraOptionString = properties.getProperty("queryOptions", "");
+ _useMultiStageEngine =
Boolean.parseBoolean(properties.getProperty("useMultiStageEngine", "false"));
return this;
}
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 42ea2c0bae..da466e4e93 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -103,6 +103,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected List<StreamDataServerStartable> _kafkaStarters;
protected org.apache.pinot.client.Connection _pinotConnection;
+ protected org.apache.pinot.client.Connection _pinotConnectionV2;
protected Connection _h2Connection;
protected QueryGenerator _queryGenerator;
@@ -506,6 +507,14 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
* @return Pinot connection
*/
protected org.apache.pinot.client.Connection getPinotConnection() {
+ if (useMultiStageQueryEngine()) {
+ if (_pinotConnectionV2 == null) {
+ Properties properties = getPinotConnectionProperties();
+ properties.put("useMultiStageEngine", "true");
+ _pinotConnectionV2 = ConnectionFactory.fromZookeeper(properties,
getZkUrl() + "/" + getHelixClusterName());
+ }
+ return _pinotConnectionV2;
+ }
if (_pinotConnection == null) {
_pinotConnection =
ConnectionFactory.fromZookeeper(getPinotConnectionProperties(),
getZkUrl() + "/" + getHelixClusterName());
@@ -753,7 +762,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
ClusterIntegrationTestUtils.testQuery(pinotQuery, getBrokerBaseApiUrl(),
getPinotConnection(), h2Query,
- getH2Connection(), null, getExtraQueryProperties());
+ getH2Connection(), null, getExtraQueryProperties(),
useMultiStageQueryEngine());
}
/**
@@ -762,6 +771,6 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected void testQueryWithMatchingRowCount(String pinotQuery, String
h2Query)
throws Exception {
ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery,
getBrokerBaseApiUrl(), getPinotConnection(),
- h2Query, getH2Connection(), null, getExtraQueryProperties());
+ h2Query, getH2Connection(), null, getExtraQueryProperties(),
useMultiStageQueryEngine());
}
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 457b3d2971..15c4f90ed7 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -395,7 +395,7 @@ public class ClusterIntegrationTestUtils {
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
- for (String recordCsv: csvRecords) {
+ for (String recordCsv : csvRecords) {
try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
for (CSVRecord csv : parser) {
byte[] keyBytes = (partitionColumnIndex == null) ?
Longs.toByteArray(counter++)
@@ -650,7 +650,7 @@ public class ClusterIntegrationTestUtils {
static void testQuery(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String>
headers)
throws Exception {
- testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query,
h2Connection, headers, null);
+ testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query,
h2Connection, headers, null, false);
}
/**
@@ -659,11 +659,12 @@ public class ClusterIntegrationTestUtils {
*/
static void testQueryWithMatchingRowCount(String pinotQuery, String
queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query,
Connection h2Connection,
- @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties)
+ @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties,
+ boolean useMultiStageQueryEngine)
throws Exception {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection,
h2Query, h2Connection, headers,
- extraJsonProperties, true, false);
+ extraJsonProperties, useMultiStageQueryEngine, true, false);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
@@ -671,10 +672,10 @@ public class ClusterIntegrationTestUtils {
static void testQuery(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String>
headers,
- @Nullable Map<String, String> extraJsonProperties) {
+ @Nullable Map<String, String> extraJsonProperties, boolean
useMultiStageQueryEngine) {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection,
h2Query, h2Connection, headers,
- extraJsonProperties, false, false);
+ extraJsonProperties, useMultiStageQueryEngine, false, false);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
@@ -682,10 +683,11 @@ public class ClusterIntegrationTestUtils {
static void testQueryViaController(String pinotQuery, String
queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query,
Connection h2Connection,
- @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties) {
+ @Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties,
+ boolean useMultiStageQueryEngine) {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection,
h2Query, h2Connection, headers,
- extraJsonProperties, false, true);
+ extraJsonProperties, useMultiStageQueryEngine, false, true);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
@@ -694,14 +696,16 @@ public class ClusterIntegrationTestUtils {
private static void testQueryInternal(String pinotQuery, String
queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query,
Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String>
extraJsonProperties,
- boolean matchingRowCount, boolean viaController)
+ boolean useMultiStageQueryEngine, boolean matchingRowCount, boolean
viaController)
throws Exception {
// broker response
JsonNode pinotResponse;
if (viaController) {
pinotResponse = ClusterTest.postQueryToController(pinotQuery,
queryResourceUrl, headers, extraJsonProperties);
} else {
- pinotResponse = ClusterTest.postQuery(pinotQuery, queryResourceUrl,
headers, extraJsonProperties);
+ pinotResponse =
+ ClusterTest.postQuery(pinotQuery,
getBrokerQueryApiUrl(queryResourceUrl, useMultiStageQueryEngine), headers,
+ extraJsonProperties);
}
if (!pinotResponse.get("exceptions").isEmpty()) {
throw new RuntimeException("Got Exceptions from Query Response: " +
pinotResponse);
@@ -824,10 +828,15 @@ public class ClusterIntegrationTestUtils {
@Nullable Map<String, String> extraJsonProperties)
throws Exception {
JsonNode explainPlanForResponse =
- ClusterTest.postQuery("explain plan for " + pinotQuery, brokerUrl,
headers, extraJsonProperties);
+ ClusterTest.postQuery("explain plan for " + pinotQuery,
getBrokerQueryApiUrl(brokerUrl, false), headers,
+ extraJsonProperties);
return ExplainPlanUtils.formatExplainPlan(explainPlanForResponse);
}
+ public static String getBrokerQueryApiUrl(String brokerBaseApiUrl, boolean
useMultiStageQueryEngine) {
+ return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" :
brokerBaseApiUrl + "/query/sql";
+ }
+
private static int getH2ExpectedValues(Set<String> expectedValues,
List<String> expectedOrderByValues,
ResultSet h2ResultSet, ResultSetMetaData h2MetaData, Collection<String>
orderByColumns)
throws SQLException {
@@ -1021,6 +1030,7 @@ public class ClusterIntegrationTestUtils {
String failureMessage = "Caught exception while testing query!";
failure(pinotQuery, h2Query, failureMessage, e);
}
+
/**
* Helper method to report failures.
*
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 42988b57e7..2a278166c7 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -78,6 +78,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.DataProvider;
+import static
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -447,20 +448,18 @@ public abstract class ClusterTest extends ControllerTest {
*/
protected JsonNode postQuery(String query)
throws Exception {
- return postQuery(query, getBrokerBaseApiUrl(), null,
getExtraQueryProperties());
+ return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine()), null,
+ getExtraQueryProperties());
}
protected Map<String, String> getExtraQueryProperties() {
- if (!useMultiStageQueryEngine()) {
- return Collections.emptyMap();
- }
- return ImmutableMap.of("queryOptions", "useMultistageEngine=true");
+ return Collections.emptyMap();
}
/**
- * Queries the broker's sql query endpoint (/sql)
+ * Queries the broker's sql query endpoint (/query or /query/sql)
*/
- public static JsonNode postQuery(String query, String brokerBaseApiUrl,
Map<String, String> headers,
+ public static JsonNode postQuery(String query, String brokerQueryApiUrl,
Map<String, String> headers,
Map<String, String> extraJsonProperties)
throws Exception {
ObjectNode payload = JsonUtils.newObjectNode();
@@ -470,7 +469,7 @@ public abstract class ClusterTest extends ControllerTest {
payload.put(extraProperty.getKey(), extraProperty.getValue());
}
}
- return JsonUtils.stringToJsonNode(sendPostRequest(brokerBaseApiUrl +
"/query/sql", payload.toString(), headers));
+ return JsonUtils.stringToJsonNode(sendPostRequest(brokerQueryApiUrl,
payload.toString(), headers));
}
/**
@@ -478,7 +477,8 @@ public abstract class ClusterTest extends ControllerTest {
*/
protected JsonNode postQueryWithOptions(String query, String queryOptions)
throws Exception {
- return postQuery(query, getBrokerBaseApiUrl(), null,
ImmutableMap.of("queryOptions", queryOptions));
+ return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine()), null,
+ ImmutableMap.of("queryOptions", queryOptions));
}
/**
@@ -486,7 +486,14 @@ public abstract class ClusterTest extends ControllerTest {
*/
public JsonNode postQueryToController(String query)
throws Exception {
- return postQueryToController(query, getControllerBaseApiUrl(), null,
getExtraQueryProperties());
+ return postQueryToController(query, getControllerBaseApiUrl(), null,
getExtraQueryPropertiesForController());
+ }
+
+ private Map<String, String> getExtraQueryPropertiesForController() {
+ if (!useMultiStageQueryEngine()) {
+ return Collections.emptyMap();
+ }
+ return ImmutableMap.of("queryOptions", "useMultistageEngine=true");
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]