This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch test-async-query-execution
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit fb35321475a30612d1d346ee5ea500d65df2a14e
Author: Jack Li(Analytics Engineering) <[email protected]>
AuthorDate: Fri Nov 4 16:39:28 2022 -0700

    Add sample code to show how pagination protocol works in broker code
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 129 +++++++++++++++------
 .../tests/BaseClusterIntegrationTest.java          |   2 +-
 2 files changed, 97 insertions(+), 34 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 224f46c440..2b3a175167 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.requesthandler;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,6 +31,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -273,6 +276,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     PinotQuery pinotQuery;
     try {
       // Parse the request
+      // TODO: consider adding the option from subquery if sqlNodeAndOptions 
isn't null?
       sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
       // Compile the request into PinotQuery
       compilationStartTimeNs = System.nanoTime();
@@ -356,6 +360,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
         (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
 
+    // TODO: go through all the tables for authorization.
     // Second-stage table-level access control
     // TODO: Modify AccessControl interface to directly take PinotQuery
     BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
@@ -371,6 +376,86 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
         System.nanoTime() - compilationEndTimeNs);
 
+    // Validate QPS quota
+    if (!_queryQuotaManager.acquire(tableName)) {
+      String errorMessage =
+          String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
+      LOGGER.info(errorMessage);
+      requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
+    }
+
+    // Validate the request
+    try {
+      validateRequest(serverPinotQuery, _queryResponseLimit);
+    } catch (Exception e) {
+      LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
+      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
+    }
+
+    _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
+    _brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.REQUEST_SIZE, query.length());
+
+    if (Boolean.parseBoolean(pinotQuery.getQueryOptions().get("pagination"))) {
+//      String tableName = 
TableNameBuilder.extractRawTableName(pinotQuery.getDataSource().getTableName());
+      // Step 1: Generate a pointer.
+      // TODO: a. add a method to generate a ID
+      //       b. replace the dummyInstanceId with a real one.
+//      int hash = ("dummyInstanceId" + requestId + 
System.currentTimeMillis()).hashCode();
+      String pointer = rawTableName + "_" + "dummyInstanceId" + requestId + 
System.currentTimeMillis();
+
+      // Step 2: TODO invoke pagination query initialization API.
+
+      // Step 3: Submit to query executor.
+      // TODO: use an pool based executor as the 2nd parameter below.
+      CompletableFuture.supplyAsync(() -> {
+        try {
+          return handleRequest(requestId, query, serverPinotQuery, 
brokerRequest, serverBrokerRequest,
+              compilationStartTimeNs, tableName, rawTableName, 
requesterIdentity, requestContext);
+        } catch (Exception e) {
+          throw new CompletionException(e);
+        }
+      }).thenApply(brokerResponseNative -> {
+        // Step 5: TODO invoke upload result API.
+        try {
+          System.out.println("Async query execution response: " + 
brokerResponseNative.toJsonString());
+          return null;
+        } catch (IOException e) {
+          throw new CompletionException(e);
+        }
+      }).exceptionally(exception -> {
+        // Step 6: TODO Handle exception.
+        System.out.println(exception.getMessage());
+        return null;
+      });
+
+      // Step 4: TODO Put pointer only to the response and return.
+      BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+
+      DataSchema dataSchema =
+          new DataSchema(new String[]{"pointer"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
+      Object[] objects = new Object[]{pointer};
+      List<Object[]> rows = new ArrayList<>();
+      rows.add(objects);
+      ResultTable resultTable = new ResultTable(dataSchema, rows);
+      brokerResponseNative.setResultTable(resultTable);
+      System.out.println("Submission response: " + 
brokerResponseNative.toJsonString());
+      return brokerResponseNative;
+    }
+
+    return handleRequest(requestId, query, serverPinotQuery, brokerRequest, 
serverBrokerRequest, compilationStartTimeNs,
+        tableName, rawTableName, requesterIdentity, requestContext);
+  }
+
+  // Exclude query compilation + authorization.
+  private BrokerResponseNative handleRequest(long requestId, String query, 
PinotQuery serverPinotQuery,
+      BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest, long 
compilationStartTimeNs, String tableName,
+      String rawTableName, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext)
+      throws Exception {
+
     // Get the tables hit by the request
     String offlineTableName = null;
     String realtimeTableName = null;
@@ -430,29 +515,6 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       handleApproximateFunctionOverride(serverPinotQuery);
     }
 
-    // Validate QPS quota
-    if (!_queryQuotaManager.acquire(tableName)) {
-      String errorMessage =
-          String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
-      LOGGER.info(errorMessage);
-      requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
-    }
-
-    // Validate the request
-    try {
-      validateRequest(serverPinotQuery, _queryResponseLimit);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
-    }
-
-    _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
-    _brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.REQUEST_SIZE, query.length());
-
     // Prepare OFFLINE and REALTIME requests
     BrokerRequest offlineBrokerRequest = null;
     BrokerRequest realtimeBrokerRequest = null;
@@ -520,7 +582,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
 
     if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
-      if (pinotQuery.isExplain()) {
+      if (serverPinotQuery.isExplain()) {
         // EXPLAIN PLAN results to show that query is evaluated exclusively by 
Broker.
         return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
       }
@@ -528,8 +590,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       // Send empty response since we don't need to evaluate either offline or 
realtime request.
       BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
       // Extract source info from incoming request
-      _queryLogger.log(new QueryLogger.QueryLogParams(
-          requestId, query, requestContext, tableName, 0, new ServerStats(),
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestId, query, 
requestContext, tableName, 0, new ServerStats(),
           brokerResponse, System.nanoTime(), requesterIdentity));
       return brokerResponse;
     }
@@ -640,7 +701,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     //       - Compile time function invocation
     //       - Literal only queries
     //       - Any rewrites
-    if (pinotQuery.isExplain()) {
+    if (serverPinotQuery.isExplain()) {
       // Update routing tables to only send request to offline servers for 
OFFLINE and HYBRID tables.
       // TODO: Assess if the Explain Plan Query should also be routed to 
REALTIME servers for HYBRID tables
       if (offlineRoutingTable != null) {
@@ -670,9 +731,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         LOGGER.debug("Remove track of running query: {}", requestId);
       }
     } else {
-      brokerResponse = processBrokerRequest(requestId, brokerRequest, 
serverBrokerRequest, offlineBrokerRequest,
-          offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, 
remainingTimeMs, serverStats,
-          requestContext);
+      brokerResponse =
+          processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, 
offlineBrokerRequest, offlineRoutingTable,
+              realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, 
serverStats, requestContext);
     }
 
     brokerResponse.setExceptions(exceptions);
@@ -696,9 +757,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
     // Extract source info from incoming request
     _queryLogger.log(
-        new QueryLogger.QueryLogParams(
-            requestId, query, requestContext, tableName, 
numUnavailableSegments, serverStats, brokerResponse,
-            totalTimeMs, requesterIdentity));
+        new QueryLogger.QueryLogParams(requestId, query, requestContext, 
tableName, numUnavailableSegments, serverStats,
+            brokerResponse, totalTimeMs, requesterIdentity));
     return brokerResponse;
   }
 
@@ -812,6 +872,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       String subquery = subqueryLiteral.getStringValue();
       BrokerResponseNative response =
           handleRequest(requestId, subquery, null, jsonRequest, 
requesterIdentity, requestContext);
+
+//      handleRequest(requestId, subquery, null, brokerRequest, 
serverBrokerRequest,
+//          compilationStartTimeNs, tableName, rawTableName, 
requesterIdentity, requestContext);
       if (response.getExceptionsSize() != 0) {
         throw new RuntimeException("Caught exception while executing subquery: 
" + subquery);
       }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index b133a27551..73f80d78cb 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -592,7 +592,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected long getCurrentCountStarResult(String tableName) {
-    return getPinotConnection().execute("SELECT COUNT(*) FROM " + 
tableName).getResultSet(0).getLong(0);
+    return getPinotConnection().execute("SET pagination = true; SELECT 
COUNT(*) FROM " + tableName).getResultSet(0).getLong(0);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to