This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6e8333ab24 Flaky test fix. Query only 1 broker to test quota split
(#13771)
6e8333ab24 is described below
commit 6e8333ab241abcae3ef8a555e3c221b72c0523ed
Author: Shounak kulkarni <[email protected]>
AuthorDate: Fri Sep 6 19:07:17 2024 +0530
Flaky test fix. Query only 1 broker to test quota split (#13771)
---
.../tests/QueryQuotaClusterIntegrationTest.java | 80 +++++++++++++++++++---
1 file changed, 70 insertions(+), 10 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index d1fb956f2c..dfd9d39727 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.integration.tests;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
import java.net.URI;
+import java.util.Iterator;
import java.util.Properties;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
+import org.apache.pinot.client.BrokerResponse;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.PinotClientTransport;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.spi.config.table.QuotaConfig;
@@ -46,6 +51,9 @@ import static org.testng.Assert.assertTrue;
* tested as part of {@link HelixExternalViewBasedQueryQuotaManagerTest}
*/
public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest {
+ private PinotClientTransport _pinotClientTransport;
+ private String _brokerHostPort;
+
@BeforeClass
public void setUp()
throws Exception {
@@ -56,6 +64,7 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
startController();
startBrokers(1);
startServers(1);
+ _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
// Create and upload the schema and table config
Schema schema = createSchema();
@@ -65,9 +74,11 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
Properties properties = new Properties();
properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
+ _pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory()
+ .withConnectionProperties(getPinotConnectionProperties())
+ .buildTransport();
_pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl()
+ "/" + getHelixClusterName(),
- new
JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties())
- .buildTransport());
+ _pinotClientTransport);
}
@AfterMethod
@@ -76,6 +87,8 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
addQueryQuotaToClusterConfig(null);
addQueryQuotaToDatabaseConfig(null);
addQueryQuotaToTableConfig(null);
+ _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
+ verifyQuotaUpdate(0);
}
@Test
@@ -125,12 +138,13 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
addQueryQuotaToTableConfig(10);
// Add one more broker such that quota gets distributed equally among
them
brokerStarter = startOneBroker(2);
- // to allow change propagation to QueryQuotaManager
- Thread.sleep(1000);
- testQueryRate(10);
+ _brokerHostPort = LOCAL_HOST + ":" + brokerStarter.getPort();
+ // query only one broker across the divided quota
+ testQueryRateOnBroker(5);
// drop table level quota so that database quota comes into effect
addQueryQuotaToTableConfig(null);
- testQueryRate(25);
+ // query only one broker across the divided quota
+ testQueryRateOnBroker(12.5f);
} finally {
if (brokerStarter != null) {
brokerStarter.stop();
@@ -143,19 +157,29 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
* Then runs the query load with double the max rate and expects queries to
fail due to quota breach.
* @param maxRate max rate allowed by the quota
*/
- void testQueryRate(int maxRate)
+ void testQueryRate(float maxRate)
throws Exception {
+ verifyQuotaUpdate(maxRate);
runQueries(maxRate, false);
//increase the qps and some of the queries should be throttled.
runQueries(maxRate * 2, true);
}
+ void testQueryRateOnBroker(float maxRate)
+ throws Exception {
+ verifyQuotaUpdate(maxRate);
+ runQueriesOnBroker(maxRate, false);
+ //increase the qps and some of the queries should be throttled.
+ runQueriesOnBroker(maxRate * 2, true);
+ }
+
// try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end up being
much lower than required qps
private void runQueries(double qps, boolean shouldFail)
throws Exception {
int failCount = 0;
long sleepMillis = (long) (1000 / qps);
+ Thread.sleep(sleepMillis);
for (int i = 0; i < qps * 2; i++) {
ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT
COUNT(*) FROM " + getTableName());
for (PinotClientException exception : resultSetGroup.getExceptions()) {
@@ -169,6 +193,45 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
}
+ private void verifyQuotaUpdate(float quotaQps) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ float tableQuota =
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE",
+ _brokerHostPort, getTableName())));
+ tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
+ float dbQuota =
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default",
+ _brokerHostPort)));
+ dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
+ return quotaQps == Math.min(tableQuota, dbQuota)
+ || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota ==
Long.MAX_VALUE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 5000, "Failed to reflect query quota on rate limiter in 5s");
+ }
+
+ private BrokerResponse executeQueryOnBroker(String query) {
+ return _pinotClientTransport.executeQuery(_brokerHostPort, query);
+ }
+
+ private void runQueriesOnBroker(double qps, boolean shouldFail)
+ throws Exception {
+ int failCount = 0;
+ long sleepMillis = (long) (1000 / qps);
+ Thread.sleep(sleepMillis);
+ for (int i = 0; i < qps * 2; i++) {
+ BrokerResponse resultSetGroup = executeQueryOnBroker("SELECT COUNT(*)
FROM " + getTableName());
+ for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements();
it.hasNext(); ) {
+ JsonNode exception = it.next();
+ if (exception.toPrettyString().contains("QuotaExceededError")) {
+ failCount++;
+ break;
+ }
+ }
+ Thread.sleep(sleepMillis);
+ }
+ assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
+ }
public void addQueryQuotaToTableConfig(Integer maxQps)
throws Exception {
@@ -176,7 +239,6 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
tableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null :
maxQps.toString()));
updateTableConfig(tableConfig);
// to allow change propagation to QueryQuotaManager
- Thread.sleep(1000);
}
public void addQueryQuotaToDatabaseConfig(Integer maxQps)
@@ -187,7 +249,6 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new
URI(url), null, null));
// to allow change propagation to QueryQuotaManager
- Thread.sleep(1000);
}
public void addQueryQuotaToClusterConfig(Integer maxQps)
@@ -202,6 +263,5 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
_httpClient.sendJsonPostRequest(new
URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
}
// to allow change propagation to QueryQuotaManager
- Thread.sleep(1000);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]