This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 98dd54d52a Fix application qps quota stalls. (#14859)
98dd54d52a is described below

commit 98dd54d52a99144d404f035f6ea77317162999ab
Author: Bolek Ziobrowski <[email protected]>
AuthorDate: Thu Feb 20 19:49:18 2025 +0100

    Fix application qps quota stalls. (#14859)
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 104 ++++++++++++++-------
 ...elixExternalViewBasedQueryQuotaManagerTest.java |   2 +-
 .../PinotApplicationQuotaRestletResource.java      |  14 +--
 .../tests/QueryQuotaClusterIntegrationTest.java    |  68 ++++++++++----
 4 files changed, 129 insertions(+), 59 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 48c5c33d0a..925fbc4860 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -74,6 +74,13 @@ import org.slf4j.LoggerFactory;
  * - broker added or removed from cluster
  */
 public class HelixExternalViewBasedQueryQuotaManager implements 
ClusterChangeHandler, QueryQuotaManager {
+
+  // Maximum 'disabled' value for app quota. If actual value is equal or less 
than this, it is considered as
+  // disabled, otherwise it's enabled. This is a side effect of rate limiter 
accepting only positive values.
+  private static final double MAX_DISABLED_APP_QUOTA = 0.0d;
+  // standard value meaning - no app quota limit set
+  private static final double DISABLED_APP_QUOTA = -1;
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
   private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
   private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
@@ -130,9 +137,9 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
 
       String appName = entry.getKey();
       double appQpsQuota =
-          entry.getValue() != null && entry.getValue() != -1.0d ? 
entry.getValue() : _defaultQpsQuotaForApplication;
+          entry.getValue() != null ? entry.getValue() : 
_defaultQpsQuotaForApplication;
 
-      if (appQpsQuota < 0) {
+      if (isDisabled(appQpsQuota)) {
         buildEmptyOrResetApplicationRateLimiter(appName);
         continue;
       }
@@ -144,8 +151,14 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
               new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 
numOnlineBrokers, appQpsQuota, -1);
       _applicationRateLimiterMap.put(appName, queryQuotaEntity);
     }
+  }
+
+  private static boolean isEnabled(double appQpsQuota) {
+    return appQpsQuota > MAX_DISABLED_APP_QUOTA;
+  }
 
-    return;
+  private static boolean isDisabled(double appQpsQuota) {
+    return appQpsQuota <= MAX_DISABLED_APP_QUOTA;
   }
 
   @Override
@@ -348,19 +361,45 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
   }
 
   public synchronized void createOrUpdateApplicationRateLimiter(String 
applicationName) {
-    
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+    
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName),
 DISABLED_APP_QUOTA);
+  }
+
+  public synchronized void createOrUpdateApplicationRateLimiter(String 
applicationName, double newQps) {
+    
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName),
 newQps);
   }
 
-  // Caller method need not worry about getting lock on 
_applicationRateLimiterMap
-  // as this method will do idempotent updates to the application rate limiters
   private synchronized void createOrUpdateApplicationRateLimiter(List<String> 
applicationNames) {
+    createOrUpdateApplicationRateLimiter(applicationNames, DISABLED_APP_QUOTA);
+  }
+
+  /**
+   * Caller method need not worry about getting lock on 
_applicationRateLimiterMap
+   *  as this method will do idempotent updates to the application rate 
limiters
+   * @param applicationNames application names for which to update the rate 
limiter
+   * @param newQps - if > 0, fixed value to use for rate limiter(s), otherwise 
value is fetched from ZK.
+   */
+  private synchronized void createOrUpdateApplicationRateLimiter(List<String> 
applicationNames, double newQps) {
     ExternalView brokerResource = getBrokerResource();
+    Map<String, Double> quotas = null;
+    if (applicationNames.size() > 0 && !isEnabled(newQps)) {
+      quotas = 
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
+    }
+
     for (String appName : applicationNames) {
-      double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
-      if (qpsQuota < 0) {
+      double qpsQuota;
+      if (isEnabled(newQps)) {
+        qpsQuota = newQps;
+      } else if (quotas != null && quotas.get(appName) != null) {
+        qpsQuota = quotas.get(appName);
+      } else {
+        qpsQuota = _defaultQpsQuotaForApplication;
+      }
+
+      if (isDisabled(qpsQuota)) {
         buildEmptyOrResetApplicationRateLimiter(appName);
         continue;
       }
+
       int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
       double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
       QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName);
@@ -436,22 +475,6 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     return _defaultQpsQuotaForDatabase;
   }
 
-  /**
-   * Utility to get the effective query quota being imposed on an application. 
It is computed based on the default quota
-   * set at cluster config.
-   *
-   * @param applicationName application name to get the query quota on.
-   * @return effective query quota limit being applied
-   */
-  private double getEffectiveQueryQuotaOnApplication(String applicationName) {
-    Map<String, Double> quotas =
-        
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
-    if (quotas != null && quotas.get(applicationName) != null && 
quotas.get(applicationName) != -1.0d) {
-      return quotas.get(applicationName);
-    }
-    return _defaultQpsQuotaForApplication;
-  }
-
   /**
    * Creates a new database rate limiter. Will not update the database rate 
limiter if it already exists.
    * @param databaseName database name for which rate limiter needs to be 
created
@@ -472,7 +495,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     if (_applicationRateLimiterMap.containsKey(applicationName)) {
       return;
     }
-    
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+    createOrUpdateApplicationRateLimiter(applicationName);
   }
 
   /**
@@ -579,10 +602,12 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
     QueryQuotaEntity queryQuota = 
_applicationRateLimiterMap.get(applicationName);
     if (queryQuota == null) {
-      if (getDefaultQueryQuotaForApplication() < 0) {
+      // do not create a new rate limiter because that could lead to OOM if 
client floods us with many unique app names
+      if (isDisabled(_defaultQpsQuotaForApplication)) {
         return true;
       } else {
-        createOrUpdateApplicationRateLimiter(applicationName);
+        // create limiter without querying ZK
+        createOrUpdateApplicationRateLimiter(applicationName, 
_defaultQpsQuotaForApplication);
         queryQuota = _applicationRateLimiterMap.get(applicationName);
       }
     }
@@ -809,9 +834,12 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
         quota.setNumOnlineBrokers(onlineBrokerCount);
       }
-      if (quota.getOverallRate() > 0) {
+      if (isEnabled(quota.getOverallRate())) {
         double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
-        quota.setRateLimiter(RateLimiter.create(qpsQuota));
+        // dividing small qps value by broker's count can result in 0 and blow 
up in rate limiter
+        if (isEnabled(qpsQuota)) {
+          quota.setRateLimiter(RateLimiter.create(qpsQuota));
+        }
       }
     }
 
@@ -820,9 +848,8 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
     }
     _lastKnownBrokerResourceVersion.set(currentVersionNumber);
     long endTime = System.currentTimeMillis();
-    LOGGER
-        .info("Processed query quota change in {}ms, {} out of {} query quota 
configs rebuilt.", (endTime - startTime),
-            numRebuilt, _rateLimiterMap.size());
+    LOGGER.info("Processed query quota change in {}ms, {} out of {} query 
quota configs rebuilt.",
+        (endTime - startTime), numRebuilt, _rateLimiterMap.size());
   }
 
   /**
@@ -857,11 +884,16 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
 
   private double getDefaultQueryQuotaForApplication() {
     HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
-    HelixConfigScope configScope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
-        _helixManager.getClusterName()).build();
-    return Double.parseDouble(helixAdmin.getConfig(configScope,
+    HelixConfigScope configScope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_helixManager.getClusterName()).build();
+    String value = helixAdmin.getConfig(configScope,
             
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
-        
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
+        .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
+    if (value != null) {
+      return Double.parseDouble(value);
+    } else {
+      return DISABLED_APP_QUOTA;
+    }
   }
 
   /**
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index 131faee022..e2efb05f45 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -338,7 +338,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
   }
 
   @Test
-  public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
+  public void testCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
     Map<String, Double> apps = new HashMap<>();
     apps.put("app1", null);
     apps.put("app2", 1d);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
index db050168fa..d539e6c3ab 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
@@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource {
   PinotHelixResourceManager _pinotHelixResourceManager;
 
   /**
-   * API to get application quota configs. Will return null if application 
quotas are not defined
+   * API to get application quota configs. Will return empty map if 
application quotas are not defined at all.
    */
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -88,7 +88,7 @@ public class PinotApplicationQuotaRestletResource {
   }
 
   /**
-   * API to get application quota configs. Will return null if application 
quotas are not defined
+   * API to get application quota config. Will return null if application 
quotas is not defined.
    */
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -102,17 +102,19 @@ public class PinotApplicationQuotaRestletResource {
       return quotas.get(appName);
     }
 
-    HelixConfigScope scope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
-        _pinotHelixResourceManager.getHelixClusterName()).build();
+    HelixConfigScope scope = new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_pinotHelixResourceManager.getHelixClusterName())
+        .build();
+
     HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
     String defaultQuota =
         helixAdmin.getConfig(scope, 
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
-            
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
+            .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
     return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
   }
 
   /**
-   * API to update the quota configs for application
+   * API to update the quota config for application.
    */
   @POST
   @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index a40cbdf290..d87731961f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -142,10 +142,35 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
   public void testDefaultApplicationQueryQuotaOverride()
       throws Exception {
     addAppQueryQuotaToClusterConfig(25);
+
     // override lower than default quota
     setQueryQuotaForApplication(10);
     testQueryRate(10);
+
     // override higher than default quota
+    setQueryQuotaForApplication(27);
+    testQueryRate(27);
+
+    // disable
+    setQueryQuotaForApplication(-1);
+    verifyQuotaUpdate(Long.MAX_VALUE);
+    runQueries(50, false);
+
+    // verify that default still applies to other application names
+    runQueries(20, false, "other");
+    //increase the qps and some of the queries should be throttled.
+    runQueries(50, true, "other");
+  }
+
+  @Test
+  public void testDisabledDefaultApplicationQueryQuotaOverride()
+      throws Exception {
+    addAppQueryQuotaToClusterConfig(-1);
+
+    verifyQuotaUpdate(Long.MAX_VALUE);
+    runQueries(10, false);
+
+    // override default quota
     setQueryQuotaForApplication(40);
     testQueryRate(40);
   }
@@ -264,48 +289,58 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     }
   }
 
+  private void runQueries(int qps, boolean shouldFail) {
+    runQueries(qps, shouldFail, "default");
+  }
+
   // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
   // is not comparable to sleepMillis, else the actual qps would end up being 
much lower than required qps
-  private void runQueries(int qps, boolean shouldFail) {
+  private void runQueries(int qps, boolean shouldFail, String applicationName) 
{
     int failCount = 0;
+    boolean isLastFail = false;
     long deadline = System.currentTimeMillis() + 1000;
 
+    String query = "SET applicationName='" + applicationName + "'; SELECT 
COUNT(*) FROM " + getTableName();
+
     for (int i = 0; i < qps; i++) {
       sleep(deadline, qps - i);
-      ResultSetGroup resultSetGroup =
-          _pinotConnection.execute("SET applicationName='default'; SELECT 
COUNT(*) FROM " + getTableName());
+      ResultSetGroup resultSetGroup = _pinotConnection.execute(query);
       for (PinotClientException exception : resultSetGroup.getExceptions()) {
         if (exception.getMessage().contains("QuotaExceededError")) {
           failCount++;
+          isLastFail = i == qps - 1;
           break;
         }
       }
     }
 
     if (shouldFail) {
-      assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
+      Assert.assertNotEquals(failCount, 0, "Expected nonzero failures for qps: 
" + qps + " isLastFail: " + isLastFail);
     } else {
-      Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + 
qps);
+      Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + 
qps + " isLastFail: " + isLastFail);
     }
   }
 
-  private static volatile float _quota;
+  private static volatile double _quota;
   private static volatile String _quotaSource;
 
-  private void verifyQuotaUpdate(float quotaQps) {
+  private void verifyQuotaUpdate(double quotaQps) {
     try {
       TestUtils.waitForCondition(aVoid -> {
         try {
-          float tableQuota = Float.parseFloat(sendGetRequest(
-              String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE";, 
_brokerHostPort, getTableName())));
+          double tableQuota = Double.parseDouble(sendGetRequest(
+              "http://"; + _brokerHostPort + "/debug/tables/queryQuota/" + 
getTableName() + "_OFFLINE"));
+          double dbQuota = Double.parseDouble(
+              sendGetRequest("http://"; + _brokerHostPort + 
"/debug/databases/queryQuota/default"));
+          double appQuota = Double.parseDouble(
+              sendGetRequest("http://"; + _brokerHostPort + 
"/debug/applicationQuotas/default"));
+
           tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
-          float dbQuota = Float.parseFloat(
-              
sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default";, 
_brokerHostPort)));
-          float appQuota = Float.parseFloat(
-              
sendGetRequest(String.format("http://%s/debug/applicationQuotas/default";, 
_brokerHostPort)));
           dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
           appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota;
-          float actualQuota = Math.min(Math.min(tableQuota, dbQuota), 
appQuota);
+
+          double actualQuota = Math.min(Math.min(tableQuota, dbQuota), 
appQuota);
+
           _quota = actualQuota;
           if (_quota == dbQuota) {
             _quotaSource = "database";
@@ -314,12 +349,13 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
           } else {
             _quotaSource = "application";
           }
-          return quotaQps == actualQuota || (quotaQps == 0 && tableQuota == 
Long.MAX_VALUE && dbQuota == Long.MAX_VALUE
+          return Math.abs(quotaQps - actualQuota) < 0.01
+              || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == 
Long.MAX_VALUE
               && appQuota == Long.MAX_VALUE);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
-      }, 5000, "Failed to reflect query quota on rate limiter in 5s.");
+      }, 10000, "Failed to reflect query quota on rate limiter in 5s.");
     } catch (AssertionError ae) {
       throw new AssertionError(
           ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + 
_quota + " set on: " + _quotaSource, ae);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to