This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bfa176c  Update Selection Query Limit with pre-configured value (#5040)
bfa176c is described below

commit bfa176ce6f44b6d859b15baa1eab5c071cc37ddc
Author: harshinielath <[email protected]>
AuthorDate: Tue Feb 11 14:12:54 2020 -0600

    Update Selection Query Limit with pre-configured value (#5040)
    
    * update max selectionquery limit behavior
    
    * changed limit config setup
    
    * fixed compilation error
    
    * Changed logic to broker
    
    * Added Broker Query Limit Override key
    
    * moved Pinot Query Parser back to broker requesthandler
    
    * Changed config var name
    
    * Few nit changes
---
 .../broker/broker/helix/HelixBrokerStarter.java    |  4 +-
 .../requesthandler/BaseBrokerRequestHandler.java   | 37 ++++++++++-
 .../requesthandler/PinotQueryParserFactory.java    |  2 +-
 .../requesthandler/QueryLimitOverrideTest.java     | 76 ++++++++++++++++++++++
 .../apache/pinot/common/utils/CommonConstants.java |  2 +-
 .../parsers/PinotQuery2BrokerRequestConverter.java |  1 +
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  1 -
 .../helix/core/util/HelixSetupUtils.java           |  1 +
 8 files changed, 118 insertions(+), 6 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 3dc2c78..6986182 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -36,7 +36,6 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -182,7 +181,8 @@ public class HelixBrokerStarter {
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
     String enableCaseInsensitivePql = configAccessor.get(helixConfigScope, 
Helix.ENABLE_CASE_INSENSITIVE_PQL_KEY);
     _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_PQL_KEY, 
Boolean.valueOf(enableCaseInsensitivePql));
-
+    String enableQueryLimitOverride = configAccessor.get(helixConfigScope, 
Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE);
+    _brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
Boolean.valueOf(enableQueryLimitOverride));
     _brokerServerBuilder = new BrokerServerBuilder(_brokerConf, 
_helixExternalViewBasedRouting,
         _helixExternalViewBasedRouting.getTimeBoundaryService(), 
_helixExternalViewBasedQueryQuotaManager, _propertyStore);
     BrokerRequestHandler brokerRequestHandler = 
_brokerServerBuilder.getBrokerRequestHandler();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 6ac6be2..7667e61 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -60,7 +60,6 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.request.FilterQuery;
 import org.apache.pinot.common.request.FilterQueryMap;
-import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -104,6 +103,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   private final AtomicInteger _numDroppedLog;
 
   private final boolean _enableCaseInsensitivePql;
+  private final boolean _enableQueryLimitOverride;
   private final TableCache _tableCache;
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable 
routingTable,
@@ -122,6 +122,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     } else {
       _tableCache = null;
     }
+
+    _enableQueryLimitOverride = 
_config.getBoolean(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
+
     _brokerId = config.getString(Broker.CONFIG_OF_BROKER_ID, 
getDefaultBrokerId());
     _brokerTimeoutMs = config.getLong(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 
Broker.DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit =
@@ -175,6 +178,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
               e.getMessage());
         }
       }
+      if (_enableQueryLimitOverride) {
+        handleQueryLimitOverride(brokerRequest, _queryResponseLimit);
+      }
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling request {}: {}, {}", 
requestId, query, e.getMessage());
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
@@ -363,6 +369,35 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   }
 
   /**
+   * Reset limit for selection query if it exceeds maxQuerySelectionLimit.
+   * @param brokerRequest
+   * @param queryLimit
+   *
+   */
+  static void handleQueryLimitOverride(BrokerRequest brokerRequest, int 
queryLimit) {
+    if (queryLimit > 0) {
+      // Handle GroupBy for BrokerRequest
+      if (brokerRequest.getGroupBy() != null) {
+        if (brokerRequest.getGroupBy().getTopN() > queryLimit) {
+          brokerRequest.getGroupBy().setTopN(queryLimit);
+        }
+      }
+
+      // Handle Selection for BrokerRequest
+      if (brokerRequest.getLimit() > queryLimit) {
+        brokerRequest.setLimit(queryLimit);
+      }
+
+      // Handle Selection & GroupBy for PinotQuery
+      if (brokerRequest.getPinotQuery()!= null) {
+        if (brokerRequest.getPinotQuery().getLimit() > queryLimit) {
+          brokerRequest.getPinotQuery().setLimit(queryLimit);
+        }
+      }
+    }
+  }
+
+  /**
    * Given the tableCache and broker request, it fixes the pql
    * @param brokerRequest
    */
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/PinotQueryParserFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/PinotQueryParserFactory.java
index 19d4f4d..fbb321d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/PinotQueryParserFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/PinotQueryParserFactory.java
@@ -33,7 +33,7 @@ public class PinotQueryParserFactory {
   private static final CalciteSqlCompiler CALCITE_SQL_COMPILER = new 
CalciteSqlCompiler();
 
   public static AbstractCompiler get(String queryFormat) {
-    switch (queryFormat) {
+    switch (queryFormat.toLowerCase()) {
       case PQL:
         return PQL_2_COMPILER;
       case SQL:
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryLimitOverrideTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryLimitOverrideTest.java
new file mode 100644
index 0000000..2dfbe8f
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryLimitOverrideTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.parsers.AbstractCompiler;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class QueryLimitOverrideTest {
+
+  @Test
+  public void testPql() {
+    AbstractCompiler pqlCompiler = PinotQueryParserFactory.get("PQL");
+    testFixedQuerySetWithCompiler(pqlCompiler);
+  }
+
+  @Test
+  public void testCalciteSql() {
+    AbstractCompiler sqlCompiler = PinotQueryParserFactory.get("SQL");
+    testFixedQuerySetWithCompiler(sqlCompiler);
+  }
+
+  private void testFixedQuerySetWithCompiler(AbstractCompiler compiler) {
+    // Selections
+    testSelectionQueryWithCompiler(compiler, "select * from vegetables LIMIT 
999", 1000, 999);
+    testSelectionQueryWithCompiler(compiler, "select * from vegetables LIMIT 
1000", 1000, 1000);
+    testSelectionQueryWithCompiler(compiler, "select * from vegetables LIMIT 
1001", 1000, 1000);
+    testSelectionQueryWithCompiler(compiler, "select * from vegetables LIMIT 
10000", 1000, 1000);
+
+    // GroupBys
+    testGroupByQueryWithCompiler(compiler, "select count(*) from vegetables 
group by a LIMIT 999", 1000, 999);
+    testGroupByQueryWithCompiler(compiler, "select count(*) from vegetables 
group by a LIMIT 1000", 1000, 1000);
+    testGroupByQueryWithCompiler(compiler, "select count(*) from vegetables 
group by a LIMIT 1001", 1000, 1000);
+    testGroupByQueryWithCompiler(compiler, "select count(*) from vegetables 
group by a LIMIT 10000", 1000, 1000);
+  }
+
+  private void testSelectionQueryWithCompiler(AbstractCompiler compiler, 
String query, int maxQuerySelectionLimit,
+      int expectedLimit) {
+    BrokerRequest brokerRequest;
+    brokerRequest = compiler.compileToBrokerRequest(query);
+    BaseBrokerRequestHandler.handleQueryLimitOverride(brokerRequest, 
maxQuerySelectionLimit);
+    Assert.assertEquals(brokerRequest.getLimit(), expectedLimit);
+    if (brokerRequest.getPinotQuery() != null) {
+      Assert.assertEquals(brokerRequest.getPinotQuery().getLimit(), 
expectedLimit);
+    }
+  }
+
+  private void testGroupByQueryWithCompiler(AbstractCompiler compiler, String 
query, int maxQuerySelectionLimit,
+      int expectedLimit) {
+    BrokerRequest brokerRequest;
+    brokerRequest = compiler.compileToBrokerRequest(query);
+    BaseBrokerRequestHandler.handleQueryLimitOverride(brokerRequest, 
maxQuerySelectionLimit);
+    Assert.assertEquals(brokerRequest.getGroupBy().getTopN(), expectedLimit);
+    if (brokerRequest.getPinotQuery() != null) {
+      Assert.assertEquals(brokerRequest.getPinotQuery().getLimit(), 
expectedLimit);
+    }
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 598d4c3..e8938cf 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -44,7 +44,6 @@ public class CommonConstants {
     public static final String ENABLE_CASE_INSENSITIVE_PQL_KEY = 
"enable.case.insensitive.pql";
 
 
-
     // More information on why these numbers are set can be found in the 
following doc:
     // 
https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot
     public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE = 
24;
@@ -166,6 +165,7 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START 
=
         "pinot.broker.startup.minResourcePercent";
     public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 
100.0;
+    public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = 
"pinot.broker.enable.query.limit.override";
 
     public static class Request {
       public static final String PQL = "pql";
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
index 2ecab36..ffd4958 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
@@ -75,6 +75,7 @@ public class PinotQuery2BrokerRequestConverter {
     brokerRequest.setQueryOptions(pinotQuery.getQueryOptions());
     //brokerRequest.setBucketHashKey();
     //brokerRequest.setDuration();
+    brokerRequest.setLimit(pinotQuery.getLimit());
     brokerRequest.setPinotQuery(pinotQuery);
 
     return brokerRequest;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index 3b7add1..5a85b38 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -41,7 +41,6 @@ import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlSelectKeyword;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.parser.SqlParserImplFactory;
 import org.apache.calcite.sql.parser.babel.SqlBabelParserImpl;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.pinot.common.function.AggregationFunctionType;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index a06b7cd..41b8418 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -76,6 +76,7 @@ public class HelixSetupUtils {
           new 
HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
       admin.setConfig(configScope, 
Collections.singletonMap(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true"));
       admin.setConfig(configScope, 
Collections.singletonMap(ENABLE_CASE_INSENSITIVE_PQL_KEY, 
Boolean.FALSE.toString()));
+      admin.setConfig(configScope, 
Collections.singletonMap(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE,
 Boolean.FALSE.toString()));
       LOGGER.info("New Helix cluster: {} created", helixClusterName);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to