This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8318650f2f Add new broker query point for querying multi-stage engine 
(#11341)
8318650f2f is described below

commit 8318650f2fad839dea3f5c19aede084cd7103102
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Aug 14 12:44:22 2023 -0700

    Add new broker query point for querying multi-stage engine (#11341)
---
 .../broker/api/resources/PinotClientRequest.java   | 76 +++++++++++++++++++++-
 .../MultiStageBrokerRequestHandler.java            |  3 +-
 .../client/JsonAsyncHttpPinotClientTransport.java  | 58 ++++++-----------
 .../JsonAsyncHttpPinotClientTransportFactory.java  |  6 +-
 .../tests/BaseClusterIntegrationTest.java          | 13 +++-
 .../tests/ClusterIntegrationTestUtils.java         | 32 +++++----
 .../pinot/integration/tests/ClusterTest.java       | 27 +++++---
 7 files changed, 148 insertions(+), 67 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 93d9bda03b..33de745c52 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.api.resources;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
@@ -169,6 +170,70 @@ public class PinotClientRequest {
     }
   }
 
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("query")
+  @ApiOperation(value = "Querying pinot using MultiStage Query Engine")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public void processSqlWithMultiStageQueryEngineGet(
+      @ApiParam(value = "Query", required = true) @QueryParam("sql") String 
query,
+      @Suspended AsyncResponse asyncResponse, @Context 
org.glassfish.grizzly.http.server.Request requestContext,
+      @Context HttpHeaders httpHeaders) {
+    try {
+      ObjectNode requestJson = JsonUtils.newObjectNode();
+      requestJson.put(Request.SQL, query);
+      BrokerResponse brokerResponse =
+          executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true, 
httpHeaders, true);
+      asyncResponse.resume(brokerResponse.toJsonString());
+    } catch (WebApplicationException wae) {
+      asyncResponse.resume(wae);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
+      asyncResponse.resume(new WebApplicationException(e, 
Response.Status.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("query")
+  @ApiOperation(value = "Querying pinot using MultiStage Query Engine")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended 
AsyncResponse asyncResponse,
+      @Context org.glassfish.grizzly.http.server.Request requestContext,
+      @Context HttpHeaders httpHeaders) {
+    try {
+      JsonNode requestJson = JsonUtils.stringToJsonNode(query);
+      if (!requestJson.has(Request.SQL)) {
+        throw new IllegalStateException("Payload is missing the query string 
field 'sql'");
+      }
+      BrokerResponse brokerResponse =
+          executeSqlQuery((ObjectNode) requestJson, 
makeHttpIdentity(requestContext), false, httpHeaders, true);
+      asyncResponse.resume(brokerResponse.toJsonString());
+    } catch (WebApplicationException wae) {
+      asyncResponse.resume(wae);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing POST request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
+      asyncResponse.resume(
+          new WebApplicationException(e,
+              Response
+                  .status(Response.Status.INTERNAL_SERVER_ERROR)
+                  .entity(e.getMessage())
+                  .build()));
+    }
+  }
+
   @DELETE
   @Path("query/{queryId}")
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.CANCEL_QUERY)
@@ -185,7 +250,7 @@ public class PinotClientRequest {
       @ApiParam(value = "Timeout for servers to respond the cancel request") 
@QueryParam("timeoutMs")
       @DefaultValue("3000") int timeoutMs,
       @ApiParam(value = "Return server responses for troubleshooting") 
@QueryParam("verbose") @DefaultValue("false")
-          boolean verbose) {
+      boolean verbose) {
     try {
       Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
       if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, 
_httpConnMgr, serverResponses)) {
@@ -226,12 +291,21 @@ public class PinotClientRequest {
   private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, 
HttpRequesterIdentity httpRequesterIdentity,
       boolean onlyDql, HttpHeaders httpHeaders)
       throws Exception {
+    return executeSqlQuery(sqlRequestJson, httpRequesterIdentity, onlyDql, 
httpHeaders, false);
+  }
+
+  private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, 
HttpRequesterIdentity httpRequesterIdentity,
+      boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
+      throws Exception {
     SqlNodeAndOptions sqlNodeAndOptions;
     try {
       sqlNodeAndOptions = 
RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), 
sqlRequestJson);
     } catch (Exception e) {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
     }
+    if (forceUseMultiStage) {
+      
sqlNodeAndOptions.setExtraOptions(ImmutableMap.of(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE,
 "true"));
+    }
     PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
     if (onlyDql && sqlType != PinotSqlType.DQL) {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 46c9af7c34..680145e3e8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -179,8 +179,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     }
 
     boolean traceEnabled = Boolean.parseBoolean(
-        request.has(CommonConstants.Broker.Request.TRACE) ? 
request.get(CommonConstants.Broker.Request.TRACE).asText()
-            : "false");
+        
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE,
 "false"));
 
     ResultTable queryResults;
     Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index c87ae3fd80..2778dac075 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.JdkSslContext;
-import io.netty.handler.ssl.SslContext;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
@@ -61,6 +60,7 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
   private final int _brokerReadTimeout;
   private final AsyncHttpClient _httpClient;
   private final String _extraOptionStr;
+  private final boolean _useMultiStageEngine;
 
   public JsonAsyncHttpPinotClientTransport() {
     _brokerReadTimeout = 60000;
@@ -68,15 +68,17 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
     _scheme = CommonConstants.HTTP_PROTOCOL;
     _extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
     _httpClient = 
Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
+    _useMultiStageEngine = false;
   }
 
   public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String 
scheme, String extraOptionString,
-      @Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts, 
TlsProtocols tlsProtocols,
-      @Nullable String appId) {
+      boolean useMultiStageEngine, @Nullable SSLContext sslContext, 
ConnectionTimeouts connectionTimeouts,
+      TlsProtocols tlsProtocols, @Nullable String appId) {
     _brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
     _headers = headers;
     _scheme = scheme;
     _extraOptionStr = StringUtils.isEmpty(extraOptionString) ? 
DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionString;
+    _useMultiStageEngine = useMultiStageEngine;
 
     Builder builder = Dsl.config();
     if (sslContext != null) {
@@ -92,28 +94,6 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
     _httpClient = Dsl.asyncHttpClient(builder.build());
   }
 
-  public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String 
scheme, String extraOptionStr,
-      @Nullable SslContext sslContext, ConnectionTimeouts connectionTimeouts, 
TlsProtocols tlsProtocols,
-      @Nullable String appId) {
-    _brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
-    _headers = headers;
-    _scheme = scheme;
-    _extraOptionStr = StringUtils.isEmpty(extraOptionStr) ? 
DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionStr;
-
-    Builder builder = Dsl.config();
-    if (sslContext != null) {
-      builder.setSslContext(sslContext);
-    }
-
-    builder.setRequestTimeout(_brokerReadTimeout)
-        .setReadTimeout(connectionTimeouts.getReadTimeoutMs())
-        .setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
-        .setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
-        .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
appId))
-        .setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new 
String[0]));
-    _httpClient = Dsl.asyncHttpClient(builder.build());
-  }
-
   @Override
   public BrokerResponse executeQuery(String brokerAddress, String query)
       throws PinotClientException {
@@ -131,7 +111,7 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
       json.put("sql", query);
       json.put("queryOptions", _extraOptionStr);
 
-      String url = _scheme + "://" + brokerAddress + "/query/sql";
+      String url = String.format("%s://%s%s", _scheme, brokerAddress, 
_useMultiStageEngine ? "/query" : "/query/sql");
       BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);
 
       if (_headers != null) {
@@ -139,21 +119,21 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
       }
       LOGGER.debug("Sending query {} to {}", query, url);
       return requestBuilder.addHeader("Content-Type", "application/json; 
charset=utf-8").setBody(json.toString())
-              .execute().toCompletableFuture().thenApply(httpResponse -> {
-        LOGGER.debug("Completed query, HTTP status is {}", 
httpResponse.getStatusCode());
+          .execute().toCompletableFuture().thenApply(httpResponse -> {
+            LOGGER.debug("Completed query, HTTP status is {}", 
httpResponse.getStatusCode());
 
-        if (httpResponse.getStatusCode() != 200) {
-          throw new PinotClientException(
+            if (httpResponse.getStatusCode() != 200) {
+              throw new PinotClientException(
                   "Pinot returned HTTP status " + httpResponse.getStatusCode() 
+ ", expected 200");
-        }
-
-        String responseBody = 
httpResponse.getResponseBody(StandardCharsets.UTF_8);
-        try {
-          return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
-        } catch (JsonProcessingException e) {
-          throw new CompletionException(e);
-        }
-      });
+            }
+
+            String responseBody = 
httpResponse.getResponseBody(StandardCharsets.UTF_8);
+            try {
+              return 
BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
+            } catch (JsonProcessingException e) {
+              throw new CompletionException(e);
+            }
+          });
     } catch (Exception e) {
       throw new PinotClientException(e);
     }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
index 0adb4e68d3..9539a5470b 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
@@ -45,14 +45,15 @@ public class JsonAsyncHttpPinotClientTransportFactory 
implements PinotClientTran
   private int _handshakeTimeoutMs = 
Integer.parseInt(DEFAULT_BROKER_HANDSHAKE_TIMEOUT_MS);
   private String _appId = null;
   private String _extraOptionString;
+  private boolean _useMultiStageEngine;
 
   @Override
   public PinotClientTransport buildTransport() {
     ConnectionTimeouts connectionTimeouts =
         ConnectionTimeouts.create(_readTimeoutMs, _connectTimeoutMs, 
_handshakeTimeoutMs);
     TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(_tlsV10Enabled);
-    return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, 
_extraOptionString, _sslContext, connectionTimeouts,
-        tlsProtocols, _appId);
+    return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, 
_extraOptionString, _useMultiStageEngine,
+        _sslContext, connectionTimeouts, tlsProtocols, _appId);
   }
 
   public Map<String, String> getHeaders() {
@@ -103,6 +104,7 @@ public class JsonAsyncHttpPinotClientTransportFactory 
implements PinotClientTran
         System.getProperties().getProperty("broker.tlsV10Enabled", 
DEFAULT_BROKER_TLS_V10_ENABLED));
 
     _extraOptionString = properties.getProperty("queryOptions", "");
+    _useMultiStageEngine = 
Boolean.parseBoolean(properties.getProperty("useMultiStageEngine", "false"));
     return this;
   }
 }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 42ea2c0bae..da466e4e93 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -103,6 +103,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected List<StreamDataServerStartable> _kafkaStarters;
 
   protected org.apache.pinot.client.Connection _pinotConnection;
+  protected org.apache.pinot.client.Connection _pinotConnectionV2;
   protected Connection _h2Connection;
   protected QueryGenerator _queryGenerator;
 
@@ -506,6 +507,14 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
    * @return Pinot connection
    */
   protected org.apache.pinot.client.Connection getPinotConnection() {
+    if (useMultiStageQueryEngine()) {
+      if (_pinotConnectionV2 == null) {
+        Properties properties = getPinotConnectionProperties();
+        properties.put("useMultiStageEngine", "true");
+        _pinotConnectionV2 = ConnectionFactory.fromZookeeper(properties, 
getZkUrl() + "/" + getHelixClusterName());
+      }
+      return _pinotConnectionV2;
+    }
     if (_pinotConnection == null) {
       _pinotConnection =
           ConnectionFactory.fromZookeeper(getPinotConnectionProperties(), 
getZkUrl() + "/" + getHelixClusterName());
@@ -753,7 +762,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected void testQuery(String pinotQuery, String h2Query)
       throws Exception {
     ClusterIntegrationTestUtils.testQuery(pinotQuery, getBrokerBaseApiUrl(), 
getPinotConnection(), h2Query,
-        getH2Connection(), null, getExtraQueryProperties());
+        getH2Connection(), null, getExtraQueryProperties(), 
useMultiStageQueryEngine());
   }
 
   /**
@@ -762,6 +771,6 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected void testQueryWithMatchingRowCount(String pinotQuery, String 
h2Query)
       throws Exception {
     ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, 
getBrokerBaseApiUrl(), getPinotConnection(),
-        h2Query, getH2Connection(), null, getExtraQueryProperties());
+        h2Query, getH2Connection(), null, getExtraQueryProperties(), 
useMultiStageQueryEngine());
   }
 }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 457b3d2971..15c4f90ed7 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -395,7 +395,7 @@ public class ClusterIntegrationTestUtils {
       }
     }
     CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
-    for (String recordCsv: csvRecords) {
+    for (String recordCsv : csvRecords) {
       try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
         for (CSVRecord csv : parser) {
           byte[] keyBytes = (partitionColumnIndex == null) ? 
Longs.toByteArray(counter++)
@@ -650,7 +650,7 @@ public class ClusterIntegrationTestUtils {
   static void testQuery(String pinotQuery, String queryResourceUrl, 
org.apache.pinot.client.Connection pinotConnection,
       String h2Query, Connection h2Connection, @Nullable Map<String, String> 
headers)
       throws Exception {
-    testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query, 
h2Connection, headers, null);
+    testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query, 
h2Connection, headers, null, false);
   }
 
   /**
@@ -659,11 +659,12 @@ public class ClusterIntegrationTestUtils {
    */
   static void testQueryWithMatchingRowCount(String pinotQuery, String 
queryResourceUrl,
       org.apache.pinot.client.Connection pinotConnection, String h2Query, 
Connection h2Connection,
-      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties)
+      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties,
+      boolean useMultiStageQueryEngine)
       throws Exception {
     try {
       testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, 
h2Query, h2Connection, headers,
-          extraJsonProperties, true, false);
+          extraJsonProperties, useMultiStageQueryEngine, true, false);
     } catch (Exception e) {
       failure(pinotQuery, h2Query, e);
     }
@@ -671,10 +672,10 @@ public class ClusterIntegrationTestUtils {
 
   static void testQuery(String pinotQuery, String queryResourceUrl, 
org.apache.pinot.client.Connection pinotConnection,
       String h2Query, Connection h2Connection, @Nullable Map<String, String> 
headers,
-      @Nullable Map<String, String> extraJsonProperties) {
+      @Nullable Map<String, String> extraJsonProperties, boolean 
useMultiStageQueryEngine) {
     try {
       testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, 
h2Query, h2Connection, headers,
-          extraJsonProperties, false, false);
+          extraJsonProperties, useMultiStageQueryEngine, false, false);
     } catch (Exception e) {
       failure(pinotQuery, h2Query, e);
     }
@@ -682,10 +683,11 @@ public class ClusterIntegrationTestUtils {
 
   static void testQueryViaController(String pinotQuery, String 
queryResourceUrl,
       org.apache.pinot.client.Connection pinotConnection, String h2Query, 
Connection h2Connection,
-      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties) {
+      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties,
+      boolean useMultiStageQueryEngine) {
     try {
       testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, 
h2Query, h2Connection, headers,
-          extraJsonProperties, false, true);
+          extraJsonProperties, useMultiStageQueryEngine, false, true);
     } catch (Exception e) {
       failure(pinotQuery, h2Query, e);
     }
@@ -694,14 +696,16 @@ public class ClusterIntegrationTestUtils {
   private static void testQueryInternal(String pinotQuery, String 
queryResourceUrl,
       org.apache.pinot.client.Connection pinotConnection, String h2Query, 
Connection h2Connection,
       @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties,
-      boolean matchingRowCount, boolean viaController)
+      boolean useMultiStageQueryEngine, boolean matchingRowCount, boolean 
viaController)
       throws Exception {
     // broker response
     JsonNode pinotResponse;
     if (viaController) {
       pinotResponse = ClusterTest.postQueryToController(pinotQuery, 
queryResourceUrl, headers, extraJsonProperties);
     } else {
-      pinotResponse = ClusterTest.postQuery(pinotQuery, queryResourceUrl, 
headers, extraJsonProperties);
+      pinotResponse =
+          ClusterTest.postQuery(pinotQuery, 
getBrokerQueryApiUrl(queryResourceUrl, useMultiStageQueryEngine), headers,
+              extraJsonProperties);
     }
     if (!pinotResponse.get("exceptions").isEmpty()) {
       throw new RuntimeException("Got Exceptions from Query Response: " + 
pinotResponse);
@@ -824,10 +828,15 @@ public class ClusterIntegrationTestUtils {
       @Nullable Map<String, String> extraJsonProperties)
       throws Exception {
     JsonNode explainPlanForResponse =
-        ClusterTest.postQuery("explain plan for " + pinotQuery, brokerUrl, 
headers, extraJsonProperties);
+        ClusterTest.postQuery("explain plan for " + pinotQuery, 
getBrokerQueryApiUrl(brokerUrl, false), headers,
+            extraJsonProperties);
     return ExplainPlanUtils.formatExplainPlan(explainPlanForResponse);
   }
 
+  public static String getBrokerQueryApiUrl(String brokerBaseApiUrl, boolean 
useMultiStageQueryEngine) {
+    return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" : 
brokerBaseApiUrl + "/query/sql";
+  }
+
   private static int getH2ExpectedValues(Set<String> expectedValues, 
List<String> expectedOrderByValues,
       ResultSet h2ResultSet, ResultSetMetaData h2MetaData, Collection<String> 
orderByColumns)
       throws SQLException {
@@ -1021,6 +1030,7 @@ public class ClusterIntegrationTestUtils {
     String failureMessage = "Caught exception while testing query!";
     failure(pinotQuery, h2Query, failureMessage, e);
   }
+
   /**
    * Helper method to report failures.
    *
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 42988b57e7..2a278166c7 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -78,6 +78,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.DataProvider;
 
+import static 
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -447,20 +448,18 @@ public abstract class ClusterTest extends ControllerTest {
    */
   protected JsonNode postQuery(String query)
       throws Exception {
-    return postQuery(query, getBrokerBaseApiUrl(), null, 
getExtraQueryProperties());
+    return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine()), null,
+        getExtraQueryProperties());
   }
 
   protected Map<String, String> getExtraQueryProperties() {
-    if (!useMultiStageQueryEngine()) {
-      return Collections.emptyMap();
-    }
-    return ImmutableMap.of("queryOptions", "useMultistageEngine=true");
+    return Collections.emptyMap();
   }
 
   /**
-   * Queries the broker's sql query endpoint (/sql)
+   * Queries the broker's sql query endpoint (/query or /query/sql)
    */
-  public static JsonNode postQuery(String query, String brokerBaseApiUrl, 
Map<String, String> headers,
+  public static JsonNode postQuery(String query, String brokerQueryApiUrl, 
Map<String, String> headers,
       Map<String, String> extraJsonProperties)
       throws Exception {
     ObjectNode payload = JsonUtils.newObjectNode();
@@ -470,7 +469,7 @@ public abstract class ClusterTest extends ControllerTest {
         payload.put(extraProperty.getKey(), extraProperty.getValue());
       }
     }
-    return JsonUtils.stringToJsonNode(sendPostRequest(brokerBaseApiUrl + 
"/query/sql", payload.toString(), headers));
+    return JsonUtils.stringToJsonNode(sendPostRequest(brokerQueryApiUrl, 
payload.toString(), headers));
   }
 
   /**
@@ -478,7 +477,8 @@ public abstract class ClusterTest extends ControllerTest {
    */
   protected JsonNode postQueryWithOptions(String query, String queryOptions)
       throws Exception {
-    return postQuery(query, getBrokerBaseApiUrl(), null, 
ImmutableMap.of("queryOptions", queryOptions));
+    return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine()), null,
+        ImmutableMap.of("queryOptions", queryOptions));
   }
 
   /**
@@ -486,7 +486,14 @@ public abstract class ClusterTest extends ControllerTest {
    */
   public JsonNode postQueryToController(String query)
       throws Exception {
-    return postQueryToController(query, getControllerBaseApiUrl(), null, 
getExtraQueryProperties());
+    return postQueryToController(query, getControllerBaseApiUrl(), null, 
getExtraQueryPropertiesForController());
+  }
+
+  private Map<String, String> getExtraQueryPropertiesForController() {
+    if (!useMultiStageQueryEngine()) {
+      return Collections.emptyMap();
+    }
+    return ImmutableMap.of("queryOptions", "useMultistageEngine=true");
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to